RocketMQ之连接以及连接缓存
2024-08-25 00:30:28
发现rabbitmq有一个ConnectionFactory
。发现rocketmq好像没这个东西。按道理来说如果每次发送消息都新建一条连接肯定是不可能的。
ps:其实之所以是有上面的疑问是因为数据库连接池那个地方来的,因为数据库连接connection并没有说是线程安全的,所以为了线程安全会为每个事物单独分配一个连接。但是rocketmq用的是netty的长连接channel,Java 上关于SocketChannel 的注释 说明是安全的。
《Netty : writeAndFlush的线程安全及并发问题》
可见,writeAndFlush如果在Netty线程池内执行,则是直接write;否则,将作为一个task插入到Netty线程池执行。
这个问题归根到底是netty的问题。既然是线程安全的,那么整个系统就可以只用一个连接了。
所以我们从源码角度来分析一下rocketmq是如何做得。
下面我们追踪一条消息发送的过程,下面是发送消息时涉及io的模块。
rocketmq就是在channelTables里面维护的连接。
private Channel getAndCreateNameserverChannel() throws InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
} final List<String> addrList = this.namesrvAddrList.get();
if (this.lockNamesrvChannel.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
try {
addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
} if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index); this.namesrvAddrChoosed.set(newAddr);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null)
return channelNew;
}
}
} catch (Exception e) {
log.error("getAndCreateNameserverChannel: create name server channel exception", e);
} finally {
this.lockNamesrvChannel.unlock();
}
} else {
log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LockTimeoutMillis);
} return null;
}
private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
} //lockChannelTables是ReentrantLock锁。tryLock() 尝试获取锁,不管成功失败,都立即返回true、false
if (this.lockChannelTables.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection = false;
cw = this.channelTables.get(addr);
if (cw != null) { if (cw.isOK()) {
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
} if (createNewConnection) {
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
//连接被保存到channelTables中。
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
log.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("createChannel: try to lock channel table, but timeout, {}ms", LockTimeoutMillis);
} if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
}
} else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
} return null;
}
/**
* @see {@link #connect()}
*/
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
} final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
} return promise;
}
private static void doConnect0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
//io.netty.channel.connect(remoteAddress,promise);
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
连接这块就点到netty为止,进一步了解可以参考netty这方面的资料。
下面分析rocketmq对channelTables的维护。
1,关闭系统时清空:
2.关闭channel时
最新文章
- MVC5+EF6 入门完整教程八
- (转载)android炫酷实用的开源框架(UI框架)
- Angular 2 要来了,Wijmo 已准备好迎接
- 【Git】笔记3
- 前端自动化工具 -- fis 使用简介
- WINDOWS Composer &; PHPUnit 安装记录
- Android实现贪吃蛇游戏
- 安卓系统浏览器中select下拉按钮无法弹出选择面板奇怪问题解决
- JDBCTemplate
- 学习web前端技术的笔记,仅供自己查阅备忘,图片上传预览
- Left join on where 区别
- SSH方式连接Git服务器需要注意的地方
- CentOS 每个版本的区别
- 吴裕雄 17-MySQL 排序
- codeforce 853A Planning
- docker 私有仓库 harbor docker-compose
- Task 6.2冲刺会议五 /2015-5-18
- .NET设计模式(4):建造者模式(Builder Pattern)(转载)
- linux下安装dovecot
- Vsual Studio 高版本打开低版本项目的方法
热门文章
- twitter api取出的日期格式化
- k8s 组件架构
- uestc1633
- MySQL 将某个字段值的记录排在最后,其余记录单独排序
- Scala实战高手****第4课:零基础彻底实战Scala控制结构及Spark源码解析
- Jetty错误:java.lang.IllegalStateException: Form too large 270468>;200000的问题解决
- JavaScript:this是什么
- 让旧的的Mac也能免费安装keynote
- WPF附加属性的Set函数不调用的问题
- Linq 简明教程