1、定义事件
事件(Event)就是通过 Disruptor 进行交换的数据类型。

package com.ljq.disruptor;

import java.io.Serializable;

/**
* 定义事件数据,本质是个普通JavaBean
*
* @author jqlin
*/
@SuppressWarnings("serial")
public class LongEvent implements Serializable {
private long value; public LongEvent() {
super();
} public LongEvent(long value) {
super();
this.value = value;
} public long getValue() {
return value;
} public void setValue(long value) {
this.value = value;
} @Override
public String toString() {
return "LongEvent [value=" + value + "]";
} }

2、LongEvent事件生产者

package com.ljq.disruptor;

import com.lmax.disruptor.RingBuffer;

/**
* LongEvent事件生产者,生产LongEvent事件
*
* @author jqlin
*/
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
} public void produceData(long value) {
long sequence = ringBuffer.next(); // 获得下一个Event槽的下标
try {
// 给Event填充数据
LongEvent event = ringBuffer.get(sequence);
event.setValue(value); } finally {
// 发布Event,激活观察者去消费, 将sequence传递给该消费者
// 注意,最后的 ringBuffer.publish() 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
ringBuffer.publish(sequence);
}
}
}

3、LongEvent事件消息者

package com.ljq.disruptor;

import com.lmax.disruptor.WorkHandler;

/**
* LongEvent事件消息者,消息LongEvent事件
*
* @author Administrator
*
*/
public class LongEventConsumer implements WorkHandler<LongEvent> { @Override
public void onEvent(LongEvent event) throws Exception {
System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event.getValue() );
} }

4、ProducerConsumerMain 
消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。

package com.ljq.disruptor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType; /**
* Disruptor多个消费者不重复处理生产者发送过来的消息
*
* @author Administrator
*
*/
public class ProducerConsumerMain {
public static void main(String[] args) throws InterruptedException {
Long time = System.currentTimeMillis(); // 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能
int bufferSize = 1024 * 1024;;
//固定线程数
int nThreads = 10; ExecutorService executor = Executors.newFixedThreadPool(nThreads); EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}; // 创建ringBuffer
RingBuffer<LongEvent> ringBuffer = RingBuffer.create(ProducerType.MULTI, factory, bufferSize, new YieldingWaitStrategy());
SequenceBarrier barriers = ringBuffer.newBarrier();
// 创建10个消费者来处理同一个生产者发送过来的消息(这10个消费者不重复消费消息)
LongEventConsumer[] consumers = new LongEventConsumer[50];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new LongEventConsumer();
}
WorkerPool<LongEvent> workerPool = new WorkerPool<LongEvent>(ringBuffer, barriers,
new EventExceptionHandler(), consumers);
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(executor); LongEventProducer producer = new LongEventProducer(ringBuffer);
for (int i = 0; i < 20000; i++) {
producer.produceData(i);
} Thread.sleep(1000); //等上1秒,等消费都处理完成
workerPool.halt(); //通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)
executor.shutdown();
System.out.println("总共耗时(单位毫秒) :" + (System.currentTimeMillis() - time));
}
}

5、EventExceptionHandler

package com.ljq.disruptor;

import com.lmax.disruptor.ExceptionHandler;

public class EventExceptionHandler implements ExceptionHandler {

    @Override
public void handleEventException(Throwable ex, long sequence, Object event) {
System.out.println("handleEventException:" + ex);
} @Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("handleEventException:" + ex);
} @Override
public void handleOnStartException(Throwable ex) {
System.out.println("handleOnStartException:" + ex);
} }

最新文章

  1. failover机制的小讨论
  2. 新学习到的vi的一些命令
  3. java UDP 简单实现编程
  4. Android课程---单选框与复选框的实现
  5. VS代码片段(snippet)创作工具——Snippet Editor(转)
  6. 提取图像(tif)中水体的矢量数据(shp)研究
  7. progressBar 自定义
  8. GridView点击行,选中模版列中CheckBox
  9. GDB调试一
  10. python运维开发(八)----面向对象(下)
  11. 【计算机网络】 一个小白的DNS学习笔记
  12. 用Zmq实现网关与游戏服全互连
  13. 类和对象,以及 LeetCode 每日一题
  14. tomcat catalina.out乱码
  15. 【gearman】gearmand -d 无反应解决
  16. Codeforces 757D - Felicity&#39;s Big Secret Revealed
  17. JS中lambda表达式的优缺点和使用场景(转)
  18. vue.js如何实现点击按钮动态添加li
  19. Beta任务项录入
  20. 浅谈Linux系统中如何查看进程

热门文章

  1. uwsgi_read_timeout超时处理
  2. 20155326刘美岑 2016-2017-2 《Java程序设计》第二周学习总结
  3. Mac 下netstat和linux下不一样
  4. Python3 安装 PyQt5 -pycharm 环境搭建
  5. PHP后台评论 接口
  6. What mind mapping software applications do you recommend.
  7. PCA和Whitening
  8. IIS日志存入数据库之二:ETW
  9. jQuery---ajax---error函数及其参数详解
  10. JAVA学习笔记1——环境配置