博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ROCKETMQ——NameServ源码分析
阅读量:6844 次
发布时间:2019-06-26

本文共 17205 字,大约阅读时间需要 57 分钟。

hot3.png

摘要

  1. 端口9876是写死的。在这个方法中 org.apache.rocketmq.namesrv.NamesrvStartup.createNamesrvController(String[])
  2. properties 文件可以在启动参数 -c 中传入,主要配置netty、nameser的属性。详见NettyServerConfig属性配置、NamesrvConfig属性配置
  3. 初始化netty相关的类,启动netty,注册注册以下事件 HandshakeHandler、NettyEncoder、NettyDecoder、IdleStateHandler、NettyConnectManageHandler、NettyServerHandler,其中NettyServerHandler是处理请求和响应的。

源码阅读

启动执行顺序

/** * 可以通过启动参数 -c 来配置启动参数文件,然后把这些文件注入到 namesrvConfig、NettyServerConfig */org.apache.rocketmq.namesrv.NamesrvStartup.createNamesrvController(String[]) 创建建 NamesrvController,读取properties配置--- org.apache.rocketmq.namesrv.NamesrvStartup.start(NamesrvController)  启动NamesrvController------org.apache.rocketmq.namesrv.NamesrvController.initialize() ---------org.apache.rocketmq.namesrv.kvconfig.KVConfigManager.load() 1、初始化参数 ${user.home}/namesrv/kvConfig.json---------org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyRemotingServer(NettyServerConfig, ChannelEventListener) 启动netty------java.lang.Runtime.addShutdownHook(Thread) ---------org.apache.rocketmq.namesrv.NamesrvController.shutdown() 关闭nameserver------org.apache.rocketmq.namesrv.NamesrvController.start()---------org.apache.rocketmq.remoting.netty.NettyRemotingServer.start() 启动 netty,pipline上注册以下事件 HandshakeHandler、NettyEncoder、NettyDecoder、IdleStateHandler、NettyConnectManageHandler、NettyServerHandler。除此之外,还起了一些异步线程检查一些broker状态之类的。

重要类、方法说明

NettyServerConfig属性配置

public class NettyServerConfig implements Cloneable {	//nameserver的netty侦听端口,实际上是9876,外面被重新赋值了    private int listenPort = 8888;//实际上是9876	//nameserver工作线程数    private int serverWorkerThreads = 8;	//nameserver 回调处理线程数    private int serverCallbackExecutorThreads = 0;	//nameserver 处理netty的selector事件的线程数    private int serverSelectorThreads = 3;	//    private int serverOnewaySemaphoreValue = 256;    private int serverAsyncSemaphoreValue = 64;    private int serverChannelMaxIdleTimeSeconds = 120;	//65535    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;	//65535    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;    private boolean serverPooledByteBufAllocatorEnable = true;}

NamesrvConfig属性配置

public class NamesrvConfig {    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);	//rocketmq.home.dir 示例 D:\zhongwangspace\rocketmq-master\distribution    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));	//${user.home}/namesrv/kvConfig.json 示例 C:\Users\fengpc\namesrv\kvConfig.json    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";	//${user.home}/namesrv/namesrv.properties 示例 C:\Users\fengpc\namesrv\namesrv.properties    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";    private String productEnvName = "center";

HandshakeHandler

org.apache.rocketmq.remoting.netty.NettyRemotingServer的内部类,源代码:

class HandshakeHandler extends SimpleChannelInboundHandler
{ private final TlsMode tlsMode; private static final byte HANDSHAKE_MAGIC_CODE = 0x16; HandshakeHandler(TlsMode tlsMode) { this.tlsMode = tlsMode; } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // mark the current position so that we can peek the first byte to determine if the content is starting with // TLS handshake msg.markReaderIndex(); byte b = msg.getByte(0); if (b == HANDSHAKE_MAGIC_CODE) { switch (tlsMode) { case DISABLED: ctx.close(); log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode"); break; case PERMISSIVE: case ENFORCING: if (null != sslContext) { ctx.pipeline() .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc())) .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder()); log.info("Handlers prepended to channel pipeline to establish SSL connection"); } else { ctx.close(); log.error("Trying to establish a SSL connection but sslContext is null"); } break; default: log.warn("Unknown TLS mode"); break; } } else if (tlsMode == TlsMode.ENFORCING) { ctx.close(); log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode"); } // reset the reader index so that handshake negotiation may proceed as normal. msg.resetReaderIndex(); try { // Remove this handler ctx.pipeline().remove(this); } catch (NoSuchElementException e) { log.error("Error while removing HandshakeHandler", e); } // Hand over this message to the next . ctx.fireChannelRead(msg.retain()); } }

整理逻辑是,根据第一个字符是不是0,如果是0,则判断是不是TLS mode。

NettyEncoder

整体逻辑是对远程指令remotingCommand加一个消息header。remotingCommand 是封装request、response、header、消息的body等内容。 NettyEncoder 源码如下:

public class NettyEncoder extends MessageToByteEncoder
{ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } }}

NettyDecoder

这里接收到消息,调用 RemotingCommand.decode 将消息内容解码,源码如下:

public class NettyDecoder extends LengthFieldBasedFrameDecoder {    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);    private static final int FRAME_MAX_LENGTH =        Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));    public NettyDecoder() {        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);    }    @Override    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {        ByteBuf frame = null;        try {            frame = (ByteBuf) super.decode(ctx, in);            if (null == frame) {                return null;            }            ByteBuffer byteBuffer = frame.nioBuffer();            return RemotingCommand.decode(byteBuffer);        } catch (Exception e) {            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);            RemotingUtil.closeChannel(ctx.channel());        } finally {            if (null != frame) {                frame.release();            }        }        return null;    }}

IdleStateHandler

NettyConnectManageHandler

这里主要处理netty连接相关事件的,源码如下:

/**     * 处理netty连接,     * channelRegistered 是把连接过来的地址注册起来     * channelUnregistered 取消连接注册     */    class NettyConnectManageHandler extends ChannelDuplexHandler {        @Override        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());            log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);            super.channelRegistered(ctx);        }        @Override        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());            log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);            super.channelUnregistered(ctx);        }        @Override        public void channelActive(ChannelHandlerContext ctx) throws Exception {            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);            super.channelActive(ctx);            if (NettyRemotingServer.this.channelEventListener != null) {                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));            }        }        @Override        public void channelInactive(ChannelHandlerContext ctx) throws Exception {            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);            super.channelInactive(ctx);            if (NettyRemotingServer.this.channelEventListener != null) {                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));            }        }        @Override        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {            if (evt instanceof IdleStateEvent) {                IdleStateEvent event = (IdleStateEvent) evt;                if (event.state().equals(IdleState.ALL_IDLE)) {                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);                    RemotingUtil.closeChannel(ctx.channel());                    if (NettyRemotingServer.this.channelEventListener != null) {                        NettyRemotingServer.this                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));                    }                }            }            ctx.fireUserEventTriggered(evt);        }        @Override        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);            if (NettyRemotingServer.this.channelEventListener != null) {                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));            }            RemotingUtil.closeChannel(ctx.channel());        }    }

NettyServerHandler

这个是最重要的,处理netty的请求和响应的。源码如下:

org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler    // 处理netty收到的消息,分为请求消息  和 响应消息    class NettyServerHandler extends SimpleChannelInboundHandler
{ @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } /** 处理发送过来的消息 * 分请求消息 和 响应消息 */ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } } /** * 处理请求 */ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair
matched = this.processorTable.get(cmd.getCode()); final Pair
pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } } /** * 处理响应的 */ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }

转载于:https://my.oschina.net/liangxiao/blog/2993762

你可能感兴趣的文章
[译] 5 个有趣的 Linux 命令行技巧
查看>>
Core Data的基本使用简介
查看>>
ECCV 2018 最佳论文名单公布,何恺明再添一项论文奖
查看>>
JAVA模板方法设计模式——Java设计模式,写漂亮的代码——
查看>>
Debian 包维护者不满 Debian 开发流程,宣布退出
查看>>
有趣的Tensorflow游乐场以及有趣的思考
查看>>
spring源码-bean之初始化-1
查看>>
不为人知的网络编程(七):如何让不可靠的UDP变的可靠?
查看>>
Android--面试题整理(五)
查看>>
php continue break 用例
查看>>
Scrapy小解
查看>>
移动端小项目的小总结~
查看>>
【新知】 量子技术初探
查看>>
CentOS 6.9关闭NetworkManager服务
查看>>
大型分布式C++框架《二:大包处理过程》
查看>>
当前深度神经网络模型压缩和加速都有哪些方法?
查看>>
高并发场景之RabbitMQ篇
查看>>
改变你对世界看法的五大计算机视觉技术
查看>>
探寻教育信息化着力点,创新四川省教育厅IT管理
查看>>
iptables实现IP地址重定向(转发)
查看>>