介绍

ZookeeperClient 是 kafka 新写的客户端,它允许用户流水线式(并行)访问 zookeeper。

为什么放弃了 zkClient?

zkClient 是一个第三方的客户端。

它的优点:

  1. 在session loss和session expire时自动创建新的ZooKeeper实例进行重连。
  2. 将一次性watcher包装为持久watcher。后者的具体做法是简单的在watcher回调中,重新读取数据的同时再注册相同的watcher实例。[1]

它的缺点:

  1. zkClient 在处理请求的时候,只能同步的访问处理。当 kafka 的 partition 个数过多的时候,同时请求 zookeeper 就会造成性能的瓶颈。

因为上述的缺点,ZookeeperClient 在访问的时候采用了异步的访问方式,并且采用了批量处理的方式。

如何批量并行访问

1. 获取消息请求队列。
2. 并行处理每个请求。
3. 将所有的请求结果保存在一个列表中返回。

这里需要考虑几种情况?

  1. 多线程并发请求,如何等待所有请求处理完成再返回?

    CountDownLatch。

  2. 假如一个请求队列中的请求太多了,一次性访问 zookeeper 容易过载。怎么办?

    控制同时访问 zookeeper 的请求个数。 使用 Semaphore 来实现。

  3. 如何异步访问呢?

    org.apache.zookeeper 已经为我们实现了。不需要考虑了。

综上,再看 zookeeperClient 的实现:

// 设定同时访问 zookeeper 的最大请求个数
private val inFlightRequests = new Semaphore(maxInFlightRequests) // 这里 inReadLock(initializationLock) 在某种情况下会产生死锁。4551 修复了这个问题
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock) {
if (requests.isEmpty)
Seq.empty
else {
// 设定 CountDownLatch,当前队列的所有请求处理完再返回
val countDownLatch = new CountDownLatch(requests.size)
// 保存处理结果
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request =>
// 通过 semaphore 控制多个线程同时访问的最大请求
inFlightRequests.acquire()
try {
// 异步访问
send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
}
} catch {
case e: Throwable =>
inFlightRequests.release()
throw e
}
}
// 等待所有请求处理完
countDownLatch.await()
// 返回
responseQueue.asScala.toBuffer
}
}

session 如何自动重连?

通过重写 watcher 的 process 函数,在函数中判断当前 zookeeper 对象是否过期,如果过期,就关闭老的,并重新创建一个新的。

  // package level visibility for testing only
private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
override def process(event: WatchedEvent): Unit = {
debug(s"Received event: $event")
Option(event.getPath) match {
case None =>
... 发现过期了
} else if (state == KeeperState.Expired) {
inWriteLock(initializationLock) {
info("Session expired.")
// 初始化
initialize() }
}
... 如果是其他类型的event, 调用相应的handler
}
}
} private def initialize(): Unit = {
if (!connectionState.isAlive) {
zooKeeper.close()
info(s"Initializing a new session to $connectString.")
// retry forever until ZooKeeper can be instantiated
var connected = false
while (!connected) {
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
connected = true
} catch {
case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short sleep", e)
Thread.sleep(1000)
}
}
}
}

持久 watcher

持久 watcher 就是指在每次请求的时候,都添加相应的 watcher。 kafka 的做法是将所有需要添加 watcher 的路径保存在一个集合中,当请求 zookeeper 的时候, 判断集合中是否包含相应的路径,如果包含就添加 watcher。

1. 保存对应的路径
private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala 2. 添加路径
def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
zNodeChangeHandlers.put(zNodeChangeHandler.path, zNodeChangeHandler)
} 3. 判断是否存在
private def shouldWatch(request: AsyncRequest): Boolean = request match {
case _: GetChildrenRequest => zNodeChildChangeHandlers.contains(request.path)
case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path)
case _ => throw new IllegalArgumentException(s"Request $request is not watchable")
} 4. 请求的时候做判断
private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = {
// Safe to cast as we always create a response of the right type
def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response]) def responseMetadata(sendTimeMs: Long) = new ResponseMetadata(sendTimeMs, receivedTimeMs = time.hiResClockMs()) val sendTimeMs = time.hiResClockMs()
request match {
case ExistsRequest(path, ctx) =>
zooKeeper.exists(path, shouldWatch(request), new StatCallback {
override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs)))
}, ctx.orNull)
}
}

参考

[1] ZooKeeper(四)-- 第三方客户端 ZkClient的使用

最新文章

  1. Linux初学 - Elasticsearch环境安装
  2. iphone H5 input type=&quot;search&quot; 不显示搜索 解决办法
  3. Android 虚拟机Dalvik、Android各种java包功能、Android相关文件类型、应用程序结构分析、ADB
  4. 跨平台的WatiForSingleObject实现
  5. Java Web项目的一般目录结构解析(eclipse)
  6. Qt信号和槽的个人总结
  7. LogBoy logo
  8. 教你怎么用Mono Cecil - 动态注入 (注意代码的注释)
  9. iOS关于图片点到像素转换之杂谈
  10. Mybatis学习---连接MySQL数据库
  11. JAVA-Clone 对象拷贝
  12. Sphinx实时索引
  13. 让Ubuntu可以压缩/解压缩RAR文件
  14. 移动端rem布局雪碧图解决方案 以及分享腾讯团队的在线雪碧图工具
  15. 20个实用便捷的CSS3工具、库及实例
  16. datagrid返回记录为0时显示“没有记录”
  17. Centos6.5使用yum安装软件的时候 Another app is currently holding the yum lock; waiting for it to exit...
  18. Rozor视图(c#代码与html混合编程原则)
  19. python-综合练习题(if条件语句,while循环,奇数偶数
  20. python 发布

热门文章

  1. BZOJ 3624 并查集 (Kruskal)
  2. 爬虫概念与编程学习之如何爬取视频网站页面(用HttpClient)(二)
  3. marge into操作
  4. jquery分页点击后页面置顶
  5. php 生成不重复的随机字符串
  6. firefox工具
  7. jQuery $.ajax跨域-JSONP获取JSON数据(转载)
  8. 路飞学城Python-Day40(第四模块复习题)
  9. MongoDB入门 常用命令以及增删改查的简单操作
  10. Android S5PV210 fimc驱动分析 - fimc_capture.c