18.并发类容器MQ
2024-08-27 16:59:19
package demo7.MQ;
public class QueueData {
private int id;
private String name;
private String taskCode;
public QueueData() {
}
public QueueData(int id, String name, String taskCode) {
this.id = id;
this.name = name;
this.taskCode = taskCode;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTaskCode() {
return taskCode;
}
public void setTaskCode(String taskCode) {
this.taskCode = taskCode;
}
}
package demo7.MQ;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Provider implements Runnable {
//共享缓存区
private BlockingQueue<QueueData> queue;
//多线程释放启动?
private volatile boolean isRunning = true;
//ID生成器
private static AtomicInteger count = new AtomicInteger();
//生产随机对象
private static Random random = new Random();
public Provider(BlockingQueue<QueueData> queue) {
this.queue = queue;
}
@Override
public void run() {
while (isRunning){
try {
//随机休眠 - 1000 表示读取数据、生产数据的耗时
Thread.sleep(random.nextInt(1000));
//incrementAndGet 进行累加
int id = count.incrementAndGet();
QueueData queueData = new QueueData(id,"任务"+String.valueOf(id),String.valueOf(id).hashCode()+"");
System.err.println("线程:"+Thread.currentThread().getName()+"\t生产task:"+queueData.getName()+"\t"+queueData.getId());
if (!queue.offer(queueData,2, TimeUnit.SECONDS)){
System.err.println("!!!!!!!!!生产数据失败 error");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop(){
this.isRunning=false;
}
}
package demo7.MQ;
import java.util.Random;
import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<QueueData> queue;
public Consumer(BlockingQueue<QueueData> queue) {
this.queue = queue;
}
private static Random random = new Random();
@Override
public void run() {
while (true){
try {
//take:无阻塞
QueueData queueData = this.queue.take();
Thread.sleep(random.nextInt(1000));
System.err.println("线程:"+Thread.currentThread().getName()+"\t消费task->:"+queueData.getName()+"\t"+queueData.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package demo7.MQ;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class MainMQ {
/**
* 生产者、消费者(多线程模式)
* 1.生产、消费:通常由2类线程,即若干了生产者的线程、若干个消费者的线程、
* 2.生产者线程负责提交用户请求、消费者线程负责处理生产者提交的任务请求
* 3.生产者、消费者之间通过共享内存缓存进行通信
*/
public static void main(String[] args) {
//1.内存缓存区
BlockingQueue<QueueData> queueData = new LinkedBlockingQueue<QueueData>();
//2.生产者
Provider p1 = new Provider(queueData);
Provider p2 = new Provider(queueData);
Provider p3 = new Provider(queueData);
//3.消费者
Consumer c1 = new Consumer(queueData);
Consumer c2 = new Consumer(queueData);
Consumer c3 = new Consumer(queueData);
//创建【线程池】运行,可以创建n个线程,没有任务的时候不创建线程,空闲线程存活时间为60s(默认)
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(p1);
executorService.execute(p2);
executorService.execute(p3);
executorService.execute(c1);
executorService.execute(c2);
executorService.execute(c3);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
最新文章
- U-boot与linux的关系
- swift中的结构体和枚举
- 调用数据库函数CallableStatement
- Codeforces 234D Cinema
- 函数lock_rec_get_n_bits
- SVG 动画实现弹性的页面元素效果
- Swiper之滑块4
- JS实现定时器(类似工行网银支付限时操作)
- C# 填充Excel图表、图例背景色
- win7 64 位操作系统,进程System,PID为4,扫描连接局域网ip地址的139和445端口
- jupyter notebook快捷键使用指南
- uni-app 点击切换图标
- UVA1374-Power Calculus(迭代加深搜索)
- Linux系统查看本机ip地址
- 主机性能监控之wmi 获取磁盘信息
- C++进阶--Named Parameter Idiom
- Mysql数据类型DECIMAL(M,D)用法
- Farey Sequence (欧拉函数+前缀和)
- beego配置文件
- C++: 多态 虚函数