1 层次结构

负责进行网络IO请求的是NetworkClient,主要层次结构如下

ClusterConnectionStates报存了每个节点的状态,以node为key,以node的状态为value;inFlightRequets中保存了每个节点已经发送的请求,但是还没有返回的请求,以node为key,以List<ClientRequest>为value。inFlightRequets从名字也可以看出,表示“正在空中飞”的请求。

2 如何保证每次只发送一个请求

sender线程启动后,如果RecordBatch中有消息,会将消息按照所在节点重新排列,每个节点会创建一个ClientRequest,用来发送,每个节点每次只能发送一个ClientRequest,如下

KafkaChannel#setSend(..)

public void setSend(Send send) {
if (this.send != null) // 如果已经有send,会抛出异常
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

那么kafka是如何保证避免setSend的时候KafkaChannel中已经没有send了呢,这个关键就是在sender线程中会调用NetworkClient#ready(..),会将没有ready的节点去除掉,从而不会在该节点上setSend:

while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) { // 关键
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}

3 NetworkClient#ready(..)

NetworkClient#ready(..)检查节点是否准备好,从而决定是否可以将消息封装成ClientRequest放到KafkaChannel上。

public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node); if (isReady(node, now)) // 关键
return true; if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now); return false;
}

我们来分析下isReady

public boolean isReady(Node node, long now) {
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
}

isReady主要两个条件,一个是判断metadata是否到了更新的时候了,如果metadata需要更新,那么就不发送本次请求,也就是metadata更新优先级高。第二个是判断这个节点是否canSendRequest。

private boolean canSendRequest(String node) {
return connectionStates.isConnected(node) && selector.isChannelReady(node)
&& inFlightRequests.canSendMore(node); // 重点
}

inFlightRequests保存的是“正在空中飞”的请求

public boolean canSendMore(String node) {
Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}

满足以下几个条件,表示可以继续send

  1. queue是空,即该节点没有“正在空中飞”的request
  2. queue不为空。queue中排在最开头的request已经completed 并且queue的大小小于允许的最大值。如何理解呢?queue是一个双端队列,每次add的时候都会在queue的头部插入,所以queue中第一个就是正在发送的,同样也是KafkaChannel中的send。只有当send发送到网络中的时候才可以继续发送。这就保证了前面说的“如何保证每次只发送一个请求”。

4 inFlightRequests

inFlightRequests保存了"正在空中飞"的请求,所谓“正在空中飞”的意思就是request已经发送到了网络上,但是服务端还没有回ack。NetworkClient#doSend会往inFlightRequests头部放置一个request,同时会在KafkaChannel中放置一个request.send。

public void add(ClientRequest request) {
Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
if (reqs == null) {
reqs = new ArrayDeque<>();
this.requests.put(request.request().destination(), reqs);
}
reqs.addFirst(request); // 重点,插入到头部
}

5 Selector#pollSelectionKeys

Selector#pollSelectionKeys用来处理读写事件。先看写事件

if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send); // 请求写完了会放到completedSends中
this.sensors.recordBytesSent(channel.id(), send.size());
}
}

往网络中写的时候,会调用KafkaChannel#write来写。

public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null; // kafkaChannel中的send被置为null,这时候新的request可以发送了
}
return result;
}
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed();
}

发送可能一次不能完全发送完毕,需要发送好几次才能将request全部发送到网络上,只有这个request发送完毕了才能将KafkaChannel中的send置为null,新的request才可以setSend。但此时inFlightRequests并没有移除该request,也就是说该request还在"飞",但是新的request可以添加。发送完毕后会将channel的SelectionKey.OP_WRITE移除,没有发送完毕不会移除,下次轮询的时候该节点没有ready,不会添加新的request,会继续发送没有发完的request。

对于ack=0,不要求服务端ack就表示发送成功。该处理是在NetworkClient#handleCompletedSends(..)中进行的

private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
ClientRequest request = this.inFlightRequests.lastSent(send.destination());
if (!request.expectResponse()) { // ack = 0不需要服务端response
this.inFlightRequests.completeLastSent(send.destination()); // request从inFlightRequests中移除,表示此次请求完毕
responses.add(new ClientResponse(request, now, false, null));
}
}
}

对于ack !=0 ,则要求服务端ack才表示发送成功,该处理是在

NetworkClient#handleCompletedReceives(..)中进行

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
Struct body = parseResponse(receive.payload(), req.request().header());
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}

5 参考

https://blog.csdn.net/chunlongyu/article/details/52651960

最新文章

  1. 线性时间O(n)内求数组中第k大小的数
  2. springMvc的第一个demo
  3. 【bzoj1499】 NOI2005—瑰丽华尔兹
  4. VS一般设置(字体,背景色)
  5. Java学习笔记(六)——google java编程风格指南(下)
  6. scala函数定义的四种方式
  7. 零碎记录Hadoop平台各组件使用
  8. 006 列表的三种删除方法 remove,pop,del
  9. Natas Wargame Level 13 Writeup(文件上传漏洞,篡改file signature,Exif)
  10. Java jvm级别native关键词、JNI详解
  11. AugularJS从入门到实践(二)
  12. 查看crontab运行状态
  13. JarvisOJ BASIC -.-字符串
  14. 创建Car类,包含name,price属性,构造器等方法,创建测试类,在main方法中创建Set接口的实现类,添加5个以上的Car对象,遍历集合元素,验证重复元素是否过滤了; 如果没有过滤,实现过滤功能;把每个小车的price降10000元,再遍历,查看price是否已改变
  15. CEF 文件下载
  16. python第三方库,你要的这里都有
  17. 浪漫程序员 HTML5爱心表白动画
  18. 使用jsonp跨域发送请求
  19. [转]RosBridge小结
  20. 用Jq遍历一个div里面的所有input 并判断是否为空?

热门文章

  1. DoS,DDoS,DRoS攻击
  2. 思迈特软件Smartbi:传统BI被“革命”,AI是BI技术未来的发展趋势
  3. .NET 5+ 中已过时的功能
  4. el-dialog设置为点击弹窗以外的区域不自动关闭弹窗
  5. websocket原理和基于c/c++实现的websocket协议栈(更新中)
  6. webstorm安装vue插件及安装过程出现的问题
  7. Tableau绘制K线图、布林线、圆环图、雷达图
  8. HDFS连接JAVA,HDFS常用API
  9. 5分钟学会 gRPC
  10. 为游戏编写boss剧情