最近一年多写的最虐心的代码。必须好好复习java并发了。搞了一晚上终于测试都跑通过了,特此纪念,以资鼓励!

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; /**
* 实现可调整大小的阻塞队列,支持数据迁移平衡reader,writer读取吸入速度,达到最大吞吐
* @author hanjie
*
*/
public class RecordBuffer { public static final Record CLOSE_RECORD = new Record() { @Override
public Object getColumnValue(String columnName) {
// TODO Auto-generated method stub
return null;
}
}; public static final Record SWITCH_QUEUE_RECORD = new Record() { @Override
public Object getColumnValue(String columnName) {
// TODO Auto-generated method stub
return null;
}
}; public Lock switchingQueueLock = new ReentrantLock();
public Condition readerSwitched = switchingQueueLock.newCondition();
public Condition writerSwitched = switchingQueueLock.newCondition();
public Condition switchFinished = switchingQueueLock.newCondition(); public volatile boolean readerSwitchSuccess = true;
public volatile boolean writerSwitchSuccess = true;
public volatile boolean switchingQueue = false;
public volatile boolean closed = false;
private volatile ArrayBlockingQueue<Record> queue;
private TaskCounter taskCounter; public RecordBuffer(TaskCounter taskCounter, int size) {
this.queue = new ArrayBlockingQueue<Record>(size);
this.taskCounter = taskCounter;
} public void resize(int newSize) {
try { if(closed){
return;
} switchingQueueLock.lock();
try {
//double check下,要不可能writer收到CLOSED_record已经 退出了。writerSwitched.await() 会hang住
if(closed){
return;
}
this.switchingQueue = true; ArrayBlockingQueue<Record> oldQueue = queue;
queue = new ArrayBlockingQueue<Record>(newSize);
this.readerSwitchSuccess = false;
this.writerSwitchSuccess = false; //先拯救下writer,可能writer刚好阻塞到take上,失败也没关系,说明老队列不空,writer不会阻塞到take
oldQueue.offer(SWITCH_QUEUE_RECORD); while (!writerSwitchSuccess) {
writerSwitched.await();
}
//writer先切换队列,然后reader可能阻塞在最后一个put上,清空下老队列拯救reader,让它顺利醒来
transferOldQueueRecordsToNewQueue(oldQueue); while (!readerSwitchSuccess) {
readerSwitched.await();
}
//前面的清空,刚好碰到reader要put最后一个,非阻塞式清空动作就有残留最后一个put
transferOldQueueRecordsToNewQueue(oldQueue); this.switchingQueue = false;
this.switchFinished.signalAll(); } finally {
switchingQueueLock.unlock();
} } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
} private void transferOldQueueRecordsToNewQueue(ArrayBlockingQueue<Record> oldQueue)
throws InterruptedException {
List<Record> oldRecords = new ArrayList<Record>(oldQueue.size());
Record record = null;
while ((record = oldQueue.poll()) != null) {
oldRecords.add(record);
}
// 转移老队列剩下的记录到新队列
for (int i = 0; i < oldRecords.size(); i++) {
queue.put(oldRecords.get(i));
}
} public void close() {
this.closed = true;
switchingQueueLock.lock();
try {
//如果正在切换队列, 等切换做完才能,发送最后一个CLOSE
while (switchingQueue) {
switchFinished.await();
} this.queue.put(CLOSE_RECORD);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
finally{
switchingQueueLock.unlock();
}
} public void put(Record record) {
try { if (!queue.offer(record)) {
taskCounter.incrBufferFullCount();
if (!readerSwitchSuccess) {
notifyReaderSwitchSuccess();
}
queue.put(record);
}
taskCounter.incrReadCount();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
} private void notifyReaderSwitchSuccess() {
System.out.println("reader switch");
switchingQueueLock.lock();
try {
readerSwitchSuccess = true;
readerSwitched.signalAll();
} finally {
switchingQueueLock.unlock();
}
} public Record take() {
try { Record record = queue.poll();
//如果拿到了切换记录,则切换队列重试
if(record == SWITCH_QUEUE_RECORD){
if (!writerSwitchSuccess) {
notifyWriterSwitchSuccess();
}
record = queue.poll();
} if (record == null) {
taskCounter.incrBufferEmptyCount(); //调用take先检查是否正在切换,保证拿到新的队列
if (!writerSwitchSuccess) {
notifyWriterSwitchSuccess();
}
record = queue.take();
//如果很不幸刚好在take阻塞时候,切换,只能发送一个切换记录将其唤醒
if(record == SWITCH_QUEUE_RECORD){
if (!writerSwitchSuccess) {
notifyWriterSwitchSuccess();
}
record = queue.take();
}
}
if (record == CLOSE_RECORD) {
if (!writerSwitchSuccess) {
notifyWriterSwitchSuccess();
}
return null;
}
taskCounter.incrWriteCount();
return record;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
} private void notifyWriterSwitchSuccess() { System.out.println("writer switch");
switchingQueueLock.lock();
try {
writerSwitchSuccess = true;
writerSwitched.signalAll();
} finally {
switchingQueueLock.unlock();
} } }

最新文章

  1. 命名困惑系列之一:关于state和status的粗浅研究
  2. 《UNIX/Linux网络日志分析与流量监控》新书发布
  3. 快速入门系列--JMeter压测工具
  4. AngularJS快速入门指南02:介绍
  5. Java 集合系列 07 List总结(LinkedList, ArrayList等使用场景和性能分析)
  6. windows下将磁盘脱机,并在&quot;我的电脑&quot;下显示
  7. 微软职位内部推荐-Principal DEV Manager for Bing Client
  8. ruby的gem和boundle安装解决办法
  9. C语言结构体中的函数指针
  10. 【转】iOS实时卡顿监控
  11. Kafka小记
  12. CodeForces 13E. Holes 分块处理
  13. JAVA基础第七组(5道题)
  14. OpenCV Python教程(1、图像的载入、显示和保存)
  15. maven 禁止连接外网仓库
  16. 问题10:获取当前页面宽度JS
  17. TF:TF之Tensorboard实践:将神经网络Tensorboard形式得到events.out.tfevents文件+dos内运行该文件本地服务器输出到网页可视化—Jason niu
  18. 1-关于单片机通信数据传输(中断发送,大小端,IEEE754浮点型格式,共用体,空闲中断,环形队列)
  19. 【Spring学习笔记-MVC-12】Spring MVC视图解析器之ResourceBundleViewResolver
  20. C#生成流水号编码[a-z(不包括i和o) 按0-9 a-z的顺序)]

热门文章

  1. 织梦调用文章 ID (来源:百度知道)
  2. BASIC-16_蓝桥杯_分解质因数
  3. bzoj2026: [SHOI2009]Coin
  4. 如何随机从数据库表中抽一条数据的SQL语句
  5. Air test ios类使用
  6. 终端直接执行py文件,不需要python命令
  7. Windows 8的用户模式Shim Engine小探及利用
  8. 自己写的jQuery浮动广告插件
  9. 解决WPF两个图片控件显示相同图片因线程占用,其中一个显示不全的问题
  10. TensorFlow相关的一些技巧