【并发】6、借助FQueue 实现多线程生产消费队列
2024-08-26 16:33:08
1.这里先要说一下为什么会想到fqueue,因为这个是一个轻量级的消息队列框架,并且速度很快,用起来很方便,就是这样
当然后期考虑使用redis,这里先上一个fqueue的版本,后面有时间我再吧他改成redis版本吧,感觉可能redis版本可能更适合
package queue.fqueue.vo; /**
* @ProjectName: cutter-point
* @Package: queue.fqueue.vo
* @ClassName: EventVo
* @Author: xiaof
* @Description: ${description}
* @Date: 2019/6/11 10:30
* @Version: 1.0
*/
public interface EventVo { public void doOperater(); }
package queue.fqueue.vo; import java.io.Serializable; /**
* @ProjectName: cutter-point
* @Package: queue.fqueue.vo
* @ClassName: TempVo
* @Author: xiaof
* @Description: ${description}
* @Date: 2019/6/11 10:18
* @Version: 1.0
*/
public class TempVo implements Serializable, EventVo { private String name; public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} @Override
public String toString() {
return "TempVo{name='" + name + "'}";
} @Override
public void doOperater() {
System.out.println(name + " : say hello fqueue!");
}
}
package queue.fqueue; import net.apexes.fqueue.FQueue;
import queue.fqueue.vo.TempVo; import java.io.*; /**
* @ProjectName: cutter-point
* @Package: queue.fqueue
* @ClassName: FqueueProducter
* @Author: xiaof
* @Description: ${description}
* @Date: 2019/6/11 10:36
* @Version: 1.0
*/
public class FqueueProducter implements Runnable { private FQueue fQueue; public FqueueProducter(FQueue fQueue) {
this.fQueue = fQueue;
} @Override
public void run() { while(true) {
try {
Thread.sleep(2000); TempVo tempVo = new TempVo();
tempVo.setName(Thread.currentThread().getName() + ",time is:" + System.currentTimeMillis());
//序列化为字节
OutputStream arrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(arrayOutputStream);
objectOutputStream.writeObject(tempVo);
arrayOutputStream.flush(); fQueue.add(((ByteArrayOutputStream) arrayOutputStream).toByteArray()); } catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
} }
}
package queue.fqueue; import net.apexes.fqueue.FQueue;
import queue.fqueue.vo.EventVo; import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream; /**
* @ProjectName: cutter-point
* @Package: queue.fqueue
* @ClassName: FqueueProducter
* @Author: xiaof
* @Description: ${description}
* @Date: 2019/6/11 9:40
* @Version: 1.0
*/
public class FqueueConsume implements Runnable { private FQueue fQueue; public FqueueConsume(FQueue fQueue) {
this.fQueue = fQueue;
} @Override
public void run() { while(true) { byte bytes[] = fQueue.poll(); //反序列化对象
if(bytes == null || bytes.length <= 0) {
Thread.yield();
continue;
} ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
try {
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
EventVo eventVo = (EventVo) objectInputStream.readObject(); eventVo.doOperater(); } catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
} }
}
测试代码:
@Test
public void test3() throws IOException, FileFormatException, InterruptedException {
FQueue queue1 = new FQueue("db1"); //读写取数据
for(int i = 0; i < 5; ++i) {
System.out.println("输出测试" + i);
FqueueProducter producter = new FqueueProducter(queue1); Thread t = new Thread(producter);
t.start();
} //读写取数据
for(int i = 0; i < 2; ++i) {
System.out.println("输出测试" + i);
FqueueConsume fqueueConsume = new FqueueConsume(queue1); Thread t = new Thread(fqueueConsume);
t.setDaemon(true);
t.start();
} while(true) {
Thread.sleep(1000);
} }
效果展示:
最新文章
- 《JS设计模式笔记》 3,观察者模式
- hibernate笔记--继承映射关系的三种实现方式
- JS简介
- windows 下 gvim/vim lua支持问题,neocomplete等插件支持
- 【转】扫盲 同步利器、分布式网盘--BT Sync
- 对java多线程的认识
- SQL数据库完全复制
- C# MVC 实现登录的5种方式
- Android的那些轮子
- union 与struct的空间计算
- jQuery学习笔记(一)——基础选择器、过滤选择器、表单选择器
- 【转载】CentOS日志系统组成详解
- A. Mike and Cellphone(Round 361 Div.2)
- Html5模拟通讯录人员排序(sen.js)
- React UI 组件库uiw v1.2.8 发布
- R-CNN论文翻译——用于精确物体定位和语义分割的丰富特征层次结构
- linux常用的内核镜像格式
- Grafana介绍
- python中的count
- windows10操作系统中cmd窗口下telnet功能失效的解决方案