使用 netty 监听端口

// org.apache.rocketmq.remoting.netty.NettyRemotingServer#start
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
// 绑定端口
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
// 分发请求
new NettyServerHandler()
);
}
});
// 启动
ChannelFuture sync = this.serverBootstrap.bind().sync();

接收请求

// org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}

分发请求

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// NettyRequestProcessor 是处理具体请求的处理器,ExecutorService 是对应的线程池
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque(); if (pair != null) {
// 创建任务
Runnable run = new Runnable() {
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
// 调用处理器处理收到的请求
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
} if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else { }
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString()); if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
}; if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
} try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 把任务提交到对应的线程池
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
} if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}

broker 启动时,把不同类型的请求对应到各自的处理器

// org.apache.rocketmq.broker.BrokerController#BrokerController
// 创建队列
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
// org.apache.rocketmq.broker.BrokerController#initialize
// 创建线程池
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
// org.apache.rocketmq.broker.BrokerController#registerProcessor
// 把不同的请求,请求处理器,线程池对应起来
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
}

不同的线程池处理不同的请求,做到很好的隔离。

最新文章

  1. C#冒泡排序算法
  2. html 空链接 href=&quot;#&quot;与href=&quot;javascript:void(0)&quot;的区别
  3. 利用session_set_save_handler()函数将session保存到MySQL数据库中
  4. 最小瓶颈路 Uva 534 Frogger
  5. 有限状态机HDL模板
  6. 【解题报告】POJ-1108 Split Windows
  7. Oracle 常用命令大全
  8. 关于PHP Websocket 错误: &quot;stream_select(): You MUST recompile PHP with a larger value of FD_SETSIZE&quot; 的解决方案
  9. 改进基于Boost.Asio的聊天服务
  10. Session 转台服务器的使用方法
  11. struts 1.x 原理
  12. java基础练习 11
  13. wow.js中各种特效对应的类名
  14. mock打桩之EasyMock
  15. 支付宝沙箱测试-ALI40247
  16. [复试机试]c++读取/写入文本文件
  17. Spark2.2出现异常:ERROR SparkUI: Failed to bind SparkUI
  18. stage的划分
  19. hdu 1754 I Hate It(树状数组区间求最值)2007省赛集训队练习赛(6)_linle专场
  20. CSDN学院升级公告

热门文章

  1. Ubuntu中用sudo apt-get install makeinfo时,出错:Unable to locate package
  2. Excel 曝Power Query安全漏洞
  3. zk和eureka的区别(CAP原则)
  4. Atcoder Regular 099 暴力区间扩张 n/dig(n)极值打表 团分割背包
  5. Java使用freemarker导出word文档
  6. python深浅copy
  7. [人物存档]【AI少女】【捏脸数据】写实系列
  8. eclipse切换 package explorer
  9. hdu 1208 Ignatius and the Princess III 划分数,dp
  10. [LibreOJ 3119]【CTS2019】随机立方体【计数】【容斥】