一、创建event类 Order

public class Order {

    private String id;
private String name;
private double price; public String getId() {
return id;
} public void setId(String id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public double getPrice() {
return price;
} public void setPrice(double price) {
this.price = price;
}
}

二、创建消费者类 Consumer

import com.lmax.disruptor.WorkHandler;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; public class Consumer implements WorkHandler<Order> { private String consumerId; private static AtomicInteger count = new AtomicInteger(0); private Random random = new Random(); public Consumer(String consumerId) {
this.consumerId = consumerId;
} @Override
public void onEvent(Order event) throws Exception {
Thread.sleep(1 * random.nextInt(5));
System.out.println("当前消费者:" + this.consumerId + ",消费信息ID:"+event.getId());
count.incrementAndGet();
} public int getCount() {
return count.get();
}
}

三、创建生产者类 Producer

import com.lmax.disruptor.RingBuffer;

public class Producer {

    private RingBuffer<Order> ringBuffer;

    public Producer(RingBuffer<Order> ringBuffer) {
this.ringBuffer = ringBuffer;
} public void sendData(String data) {
long sequnce = ringBuffer.next(); try {
Order order = ringBuffer.get(sequnce);
order.setId(data);
} finally {
ringBuffer.publish(sequnce);
}
}

四、创建测试类

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; public class TestMain {
public static void main(String[] args) throws Exception{ //1 创建ringbuffer
RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI,
new EventFactory<Order>() {
@Override
public Order newInstance() {
return new Order();
}
},
1024 * 1024,
new YieldingWaitStrategy()); //2 通过ringbuffer 创建一个屏障
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //3 创建多个消费者
Consumer[] consumers = new Consumer[10];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer("C" + i);
} //4 构建多消费者工作池
WorkerPool<Order> workerPool = new WorkerPool<Order>(
ringBuffer,
sequenceBarrier,
new EventExceptionHandler(),
consumers); //5 设置多个消费者的sequence 序号用于单独统计消费进度,并且设置到ringbuffer中
ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); //6 启动workPool
workerPool.start(Executors.newFixedThreadPool(10)); //设置异步生产 100个生产者
CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < 100; i++) {
Producer producer = new Producer(ringBuffer);
new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} for (int j = 0; j < 100; j++) {
producer.sendData(UUID.randomUUID().toString());
}
}
}).start();
} Thread.sleep(2000);
System.out.println("--------线程创建完毕,开始生产数据----------");
latch.countDown(); Thread.sleep(10000);
System.out.println("消费者处理的任务总数:" + consumers[0].getCount());
} //创建exception类
static class EventExceptionHandler implements ExceptionHandler<Order> { @Override
public void handleEventException(Throwable ex, long sequence, Order event) { } @Override
public void handleOnStartException(Throwable ex) { } @Override
public void handleOnShutdownException(Throwable ex) { }
}
}

最新文章

  1. CentOS7 + mono +Jexus 环境的搭建
  2. python基础——递归函数
  3. CentOS 7.0系统安装配置图解教程
  4. hibernate 一对多 多对一 关系表 增删改查大礼包ps二级查也有
  5. App上线Check List
  6. 记一次idea启动tomcat后控制台乱码的坑
  7. 可拖动div
  8. 在 uniGUI 中实现自动弹窗后延迟几秒关闭 — Toast 功能
  9. anaconda3/lib/libcrypto.so.1.0.0: no version information available (required by wget)
  10. 第10章:MongoDB-CRUD操作--文档--修改--修改器
  11. C#:struct的陷阱:无法修改“xxx”的返回值,因为它不是变量
  12. lr11_Run-time Settings选项介绍:
  13. 实现简单容器模板类Vec(vector capacity 增长问题、allocator 内存分配器)
  14. python+opencv链接摄像头
  15. 【原】Coursera—Andrew Ng机器学习—课程笔记 Lecture 9_Neural Networks learning
  16. 小程序开发-10-新版Music组件、组件通信与wxss样式复用
  17. yii2 RESTful API 405 Method Not Allowed
  18. 【Android】Android 学习记录贴
  19. C# 对DataTable中按条件进行筛选和更新。
  20. CAD参数绘制多段线(com接口)

热门文章

  1. 关于 SQLServer Express 2012 的连接字符串
  2. Office 365管理中心门户
  3. QTP基本循环异常遍历(代码方式实现)
  4. JS代码,从一个数组中得到连号的数并显示
  5. Reading
  6. [LC] 485. Max Consecutive Ones
  7. sql语句查询成绩表各科前三名
  8. 关于unicode汉字范围正则表达式的写法
  9. &lt;JZOJ5904&gt;刺客信条
  10. Docker For Mac 下安装 Rancher