池化技术简介

在我们使用数据库的过程中,我们往往使用数据库连接池而不是直接使用数据库连接进行操作,这是因为每一个数据库连接的创建和销毁的代价是昂贵的,而池化技术则预先创建了资源,这些资源是可复用的,这样就保证了在多用户情况下只能使用指定数目的资源,避免了一个用户创建一个连接资源,造成程序运行开销过大。关于Java并发编程的总结和思考

连接池实现原理

这里只实现一个简易的连接池,更多复杂的需求可根据该连接池进行改进,该连接池主要参数如下:

  1. 一个繁忙队列busy
  2. 一个空闲队列idle
  3. 连接池最大活动连接数maxActive
  4. 连接池最大等待时间maxWait
  5. 连接池的活动连接数activeSize

程序流程图如下:

代码实现

泛型接口ConnectionPool.java


public interface ConnectionPool<T> { /**
* 初始化池资源
* @param maxActive 池中最大活动连接数
* @param maxWait 最大等待时间
*/
void init(Integer maxActive, Long maxWait); /**
* 从池中获取资源
* @return 连接资源
*/
T getResource() throws Exception; /**
* 释放连接
* @param connection 正在使用的连接
*/
void release(T connection) throws Exception; /**
* 释放连接池资源
*/
void close(); }

以zookeeper为例,实现zookeeper连接池,ZookeeperConnectionPool.java


public class ZookeeperConnectionPool implements ConnectionPool<ZooKeeper> {
//最大活动连接数
private Integer maxActive;
//最大等待时间
private Long maxWait;
//空闲队列
private LinkedBlockingQueue<ZooKeeper> idle = new LinkedBlockingQueue<>();
//繁忙队列
private LinkedBlockingQueue<ZooKeeper> busy = new LinkedBlockingQueue<>();
//连接池活动连接数
private AtomicInteger activeSize = new AtomicInteger(0);
//连接池关闭标记
private AtomicBoolean isClosed = new AtomicBoolean(false);
//总共获取的连接记数
private AtomicInteger createCount = new AtomicInteger(0);
//等待zookeeper客户端创建完成的计数器
private static ThreadLocal<CountDownLatch> latchThreadLocal = ThreadLocal.withInitial(() -> new CountDownLatch(1)); public ZookeeperConnectionPool(Integer maxActive, Long maxWait) {
this.init(maxActive, maxWait);
} @Override
public void init(Integer maxActive, Long maxWait) {
this.maxActive = maxActive;
this.maxWait = maxWait;
} @Override
public ZooKeeper getResource() throws Exception {
ZooKeeper zooKeeper;
Long nowTime = System.currentTimeMillis();
final CountDownLatch countDownLatch = latchThreadLocal.get(); //空闲队列idle是否有连接
if ((zooKeeper = idle.poll()) == null) {
//判断池中连接数是否小于maxActive
if (activeSize.get() < maxActive) {
//先增加池中连接数后判断是否小于等于maxActive
if (activeSize.incrementAndGet() <= maxActive) {
//创建zookeeper连接
zooKeeper = new ZooKeeper("localhost", 5000, (watch) -> {
if (watch.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
});
countDownLatch.await();
System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接:" + createCount.incrementAndGet() + "条");
busy.offer(zooKeeper);
return zooKeeper;
} else {
//如增加后发现大于maxActive则减去增加的
activeSize.decrementAndGet();
}
}
//若活动线程已满则等待busy队列释放连接
try {
System.out.println("Thread:" + Thread.currentThread().getId() + "等待获取空闲资源");
Long waitTime = maxWait - (System.currentTimeMillis() - nowTime);
zooKeeper = idle.poll(waitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new Exception("等待异常");
}
//判断是否超时
if (zooKeeper != null) {
System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接:" + createCount.incrementAndGet() + "条");
busy.offer(zooKeeper);
return zooKeeper;
} else {
System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接超时,请重试!");
throw new Exception("Thread:" + Thread.currentThread().getId() + "获取连接超时,请重试!");
}
}
//空闲队列有连接,直接返回
busy.offer(zooKeeper);
return zooKeeper;
} @Override
public void release(ZooKeeper connection) throws Exception {
if (connection == null) {
System.out.println("connection 为空");
return;
}
if (busy.remove(connection)){
idle.offer(connection);
} else {
activeSize.decrementAndGet();
throw new Exception("释放失败");
}
} @Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
idle.forEach((zooKeeper) -> {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
busy.forEach((zooKeeper) -> {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}

测试用例

这里创建20个线程并发测试连接池,Test.java


public class Test { public static void main(String[] args) throws Exception {
int threadCount = 20;
Integer maxActive = 10;
Long maxWait = 10000L;
ZookeeperConnectionPool pool = new ZookeeperConnectionPool(maxActive, maxWait);
CountDownLatch countDownLatch = new CountDownLatch(20);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
countDownLatch.countDown();
try {
countDownLatch.await();
ZooKeeper zooKeeper = pool.getResource();
Thread.sleep(2000);
pool.release(zooKeeper);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} }).start();
}
while (true){ }
}
}

来源:https://segmentfault.com/a/1190000017926611

最新文章

  1. webmagic 增量爬取
  2. LLBL Gen Pro 4.2 Lite 免费的对象关系映射开发框架与工具
  3. WCF初探-9:WCF服务承载 (下)
  4. html3秒跳转
  5. solr学习之入门篇
  6. zabbix3.0 安装方法,一键实现短信、电话、微信、APP 告警
  7. Hadoop应用开发实战案例 第1周
  8. php截取小时和分钟,在进行和其它时间段的比较
  9. C语言-07其它相关
  10. 程序员欢呼:微软Bing开始支持搜索源码、可直接运行!
  11. Android中view的事件
  12. HDU3257 Hello World!
  13. 【算法】论平衡二叉树(AVL)的正确种植方法
  14. Docker系列01—容器的发展历程---Docker的生态圈
  15. Unity3D 物体移动方法总结
  16. mysql之整合ssm多数据源配置
  17. Python基础之带你快速掌握列表的常用方法
  18. 黄聪:浓缩的才是精华:浅析GIF格式图片的存储和压缩(转)
  19. Client Dataset Basics
  20. 如何通过Dreamweaver批量对整个站点或目录进行代码搜索或部分全部替换

热门文章

  1. Tesseract-OCR-03-图片文字识别
  2. Git学习-Git时光机之版本回退(二)
  3. EF多实体对应单表
  4. JavaScript中模块化工具require.js
  5. 利用canvas来绘制一个会动的图画,欢迎指教
  6. [翻译] 单例(Singleton)
  7. leetcode Ch1-search 2014
  8. [问题记录]cocos2dx编译打包apk过程&问题记录
  9. Win7系统托盘解决出现CH图标的方法
  10. [EffectiveC++]item35:考虑virtual函数以外的其他选择