摘要
- 端口9876是写死的。在这个方法中 org.apache.rocketmq.namesrv.NamesrvStartup.createNamesrvController(String[])
- properties 文件可以在启动参数 -c 中传入,主要配置netty、nameser的属性。详见NettyServerConfig属性配置、NamesrvConfig属性配置
- 初始化netty相关的类,启动netty,注册注册以下事件 HandshakeHandler、NettyEncoder、NettyDecoder、IdleStateHandler、NettyConnectManageHandler、NettyServerHandler,其中NettyServerHandler是处理请求和响应的。
源码阅读
启动执行顺序
/** * 可以通过启动参数 -c 来配置启动参数文件,然后把这些文件注入到 namesrvConfig、NettyServerConfig */org.apache.rocketmq.namesrv.NamesrvStartup.createNamesrvController(String[]) 创建建 NamesrvController,读取properties配置--- org.apache.rocketmq.namesrv.NamesrvStartup.start(NamesrvController) 启动NamesrvController------org.apache.rocketmq.namesrv.NamesrvController.initialize() ---------org.apache.rocketmq.namesrv.kvconfig.KVConfigManager.load() 1、初始化参数 ${user.home}/namesrv/kvConfig.json---------org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyRemotingServer(NettyServerConfig, ChannelEventListener) 启动netty------java.lang.Runtime.addShutdownHook(Thread) ---------org.apache.rocketmq.namesrv.NamesrvController.shutdown() 关闭nameserver------org.apache.rocketmq.namesrv.NamesrvController.start()---------org.apache.rocketmq.remoting.netty.NettyRemotingServer.start() 启动 netty,pipline上注册以下事件 HandshakeHandler、NettyEncoder、NettyDecoder、IdleStateHandler、NettyConnectManageHandler、NettyServerHandler。除此之外,还起了一些异步线程检查一些broker状态之类的。
重要类、方法说明
NettyServerConfig属性配置
public class NettyServerConfig implements Cloneable { //nameserver的netty侦听端口,实际上是9876,外面被重新赋值了 private int listenPort = 8888;//实际上是9876 //nameserver工作线程数 private int serverWorkerThreads = 8; //nameserver 回调处理线程数 private int serverCallbackExecutorThreads = 0; //nameserver 处理netty的selector事件的线程数 private int serverSelectorThreads = 3; // private int serverOnewaySemaphoreValue = 256; private int serverAsyncSemaphoreValue = 64; private int serverChannelMaxIdleTimeSeconds = 120; //65535 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; //65535 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true;}
NamesrvConfig属性配置
public class NamesrvConfig { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); //rocketmq.home.dir 示例 D:\zhongwangspace\rocketmq-master\distribution private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); //${user.home}/namesrv/kvConfig.json 示例 C:\Users\fengpc\namesrv\kvConfig.json private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; //${user.home}/namesrv/namesrv.properties 示例 C:\Users\fengpc\namesrv\namesrv.properties private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center";
HandshakeHandler
org.apache.rocketmq.remoting.netty.NettyRemotingServer的内部类,源代码:
class HandshakeHandler extends SimpleChannelInboundHandler{ private final TlsMode tlsMode; private static final byte HANDSHAKE_MAGIC_CODE = 0x16; HandshakeHandler(TlsMode tlsMode) { this.tlsMode = tlsMode; } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // mark the current position so that we can peek the first byte to determine if the content is starting with // TLS handshake msg.markReaderIndex(); byte b = msg.getByte(0); if (b == HANDSHAKE_MAGIC_CODE) { switch (tlsMode) { case DISABLED: ctx.close(); log.warn("Clients intend to establish a SSL connection while this server is running in SSL disabled mode"); break; case PERMISSIVE: case ENFORCING: if (null != sslContext) { ctx.pipeline() .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc())) .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder()); log.info("Handlers prepended to channel pipeline to establish SSL connection"); } else { ctx.close(); log.error("Trying to establish a SSL connection but sslContext is null"); } break; default: log.warn("Unknown TLS mode"); break; } } else if (tlsMode == TlsMode.ENFORCING) { ctx.close(); log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode"); } // reset the reader index so that handshake negotiation may proceed as normal. msg.resetReaderIndex(); try { // Remove this handler ctx.pipeline().remove(this); } catch (NoSuchElementException e) { log.error("Error while removing HandshakeHandler", e); } // Hand over this message to the next . ctx.fireChannelRead(msg.retain()); } }
整理逻辑是,根据第一个字符是不是0,如果是0,则判断是不是TLS mode。
NettyEncoder
整体逻辑是对远程指令remotingCommand加一个消息header。remotingCommand 是封装request、response、header、消息的body等内容。 NettyEncoder 源码如下:
public class NettyEncoder extends MessageToByteEncoder{ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } }}
NettyDecoder
这里接收到消息,调用 RemotingCommand.decode 将消息内容解码,源码如下:
public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final int FRAME_MAX_LENGTH = Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); public NettyDecoder() { super(FRAME_MAX_LENGTH, 0, 4, 0, 4); } @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } ByteBuffer byteBuffer = frame.nioBuffer(); return RemotingCommand.decode(byteBuffer); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); } finally { if (null != frame) { frame.release(); } } return null; }}
IdleStateHandler
NettyConnectManageHandler
这里主要处理netty连接相关事件的,源码如下:
/** * 处理netty连接, * channelRegistered 是把连接过来的地址注册起来 * channelUnregistered 取消连接注册 */ class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress); super.channelActive(ctx); if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel())); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress); super.channelInactive(ctx); if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress); RemotingUtil.closeChannel(ctx.channel()); if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel())); } } } ctx.fireUserEventTriggered(evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress); log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause); if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel())); } RemotingUtil.closeChannel(ctx.channel()); } }
NettyServerHandler
这个是最重要的,处理netty的请求和响应的。源码如下:
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler // 处理netty收到的消息,分为请求消息 和 响应消息 class NettyServerHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } /** 处理发送过来的消息 * 分请求消息 和 响应消息 */ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } } /** * 处理请求 */ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair matched = this.processorTable.get(cmd.getCode()); final Pair pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } } /** * 处理响应的 */ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }