是否有同样的经历?面试官问你做过啥项目,我一顿胡侃,项目利用到了消息队列,kafka,rocketMQ等等。

好的,那请开始你的表演,面试官递过一支笔:给我手写一个消息队列!!WHAT?

为了大家遇到这种场景还能愉快的zhuangbi,所以写一篇文章,凑合用一下。

想要实现一个消息队列,我们需要关组以下几点:

1.首先有一个队列(FIFO)来存放消息

2.消息队列容量有限

3.需要入队,出队方法

4.需要考虑多线程并发情况

<1>.简单版:用LinkedList实现一个简单的消息队列

这里用LinkedList来实现队列,然后通过synchronized关键字来实现多线程的互斥,用LinkedList的addLast方法实现队列的push,用LinkedList的removeFirst实现队列的remove方法

//实现FIFO队列
public class MyList<T> {
private LinkedList<T> storage = new LinkedList<T>();
private int statckSize = 2000;
public synchronized void push(T e) {//需要加上同步
storage.addLast(e);
} public synchronized T peek() {
if(storage!=null&&storage.size()>0){
return storage.peekFirst();
}
return null; } public void remove() {
storage.removeFirst();
} public boolean empty() {
return storage.isEmpty();
}
}

测试类:

public class ListTest {
public static void main(String[] args) {
MyList<String> myList = new MyList<String>();
for(String s : "the prefect code".split(" ")){//LIFO
myList.push(s);
}
while(!myList.empty()){
System.out.print(myList.peek()+" ");
myList.remove();
}
} }

<2>.进阶版,仍然用LinkedList来实现队列,给出仓库的概念(消息队列仓库),生产者和消费者分别在独立线程中实现,使用object的wait(),notify()和synchronized()实现线程操作的同步与互斥(Obj.wait(),与Obj.notify()必须要与synchronized(Obj)一起使用,也就是wait,与notify是针对已经获取了Obj锁进行操作,从语法角度来说就是Obj.wait(),Obj.notify必须在synchronized(Obj){...}语句块内。)

抽象仓库类:

public interface AbstractStorage {
void consumer(int num);
void producer(int num);
}

生产者线程:

class Producer extends Thread {
//生产数量
private int num;
//仓库
private AbstractStorage abstractStorage; public Producer(AbstractStorage abstractStorage,int num){
this.abstractStorage=abstractStorage;
this.num=num;
}
// 调用仓库Storage的生产函数
public void produce(int num){
abstractStorage.producer(num);
}
// 线程run函数
@Override
public void run(){
produce(num);
} }

消费者线程:

class Consumer extends Thread {
//消费数量
private int num;
//仓库
private AbstractStorage abstractStorage; public Consumer(AbstractStorage abstractStorage,int num){
this.abstractStorage=abstractStorage;
this.num=num;
} public void consume(int num){
abstractStorage.consumer(num);
} @Override
public void run(){
consume(num);
} }

消息队列(仓库)实现类:

public class Storage1 implements AbstractStorage {
//最大容量
private final int MAX_SIZE = 100;
//存储载体
private LinkedList list =new LinkedList(); @Override
public void consumer(int num) {
synchronized (list) {
while (num > list.size()) {
try {
list.wait();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("【阻塞】当前要消费的数量:" + num + ",当前库存量:" + list.size() + "当前消费阻塞");
}
for (int i = 0; i < num; i++) {
list.removeFirst();
}
System.out.println("【consumer】 "+Thread.currentThread().getName()+" 已消费产品数:" + num + ",现库存数:" + list.size());
list.notifyAll();
}
} //生产
@Override
public void producer(int num) {
synchronized (list) {
while (list.size() + num > MAX_SIZE) {
try {
list.wait();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("【阻塞】当前队列已满,生产阻塞");
}
for (int i = 0; i < num; i++) {
list.addLast(new Object());
}
System.out.println("【producer】 "+Thread.currentThread().getName()+ " 已生产产品数:" + num + ",现库存数:" + list.size());
list.notifyAll();
}
} }

测试类:

public class Test {
public static void main(String[] args) {
AbstractStorage abstractStorage =new Storage1(); //生产者对象
Producer p1 = new Producer(abstractStorage,10);
Producer p2 = new Producer(abstractStorage,10);
Producer p3 = new Producer(abstractStorage,10);
Producer p4 = new Producer(abstractStorage,10);
Producer p5 = new Producer(abstractStorage,10);
Producer p6 = new Producer(abstractStorage,10);
Producer p7 = new Producer(abstractStorage,10);
Producer p8 = new Producer(abstractStorage,50);
//消费者对象
Consumer c1 = new Consumer(abstractStorage,20);
Consumer c2 = new Consumer(abstractStorage,30);
Consumer c3 = new Consumer(abstractStorage,50); c1.start();
c2.start();
c3.start(); p1.start();
p2.start();
p3.start();
p4.start();
p5.start();
p6.start();
p7.start();
p8.start(); } }

最终结果显示,我们能实现简单的生产消费,并且是线程同步的。

最新文章

  1. ubuntu下非root用户下获得使用wireshark的权限
  2. Console命令详解,让调试js代码变得更简单
  3. WPS显示无法创建对象,请确认对象已在系统注册表中注册
  4. php写入txt换行符
  5. 浩瀚科技PDA移动开单|盘点机 数据采集器 条码扫描开单微POS软件 现场打印开单
  6. JAVA类与对象(三)----类定义关键字详解
  7. bzoj3211,bzoj3038
  8. Metrics
  9. numpy初识
  10. mysql中外键的创建与删除
  11. qsort代码(pascal/c/c++)与思想及扩展(随机化,TopK)
  12. Vue系列之 =&gt; 自定义键盘修饰符
  13. [UE4]UMG编辑器:控件作为变量、预设锚点和自由锚点
  14. django-权限验证场景
  15. mpvue上手教程
  16. Part 1 - Getting Started(1-3)
  17. redis 概述、windows版本下载启动访问退出安装、中文乱码、RedisDesktopManager下载
  18. GrideVlew提供点击按钮添加新数据,单击项目修改,长按删除功能
  19. Android 逆向project 实践篇
  20. Java 爬虫(获取指定页面中所有的邮箱地址)

热门文章

  1. CentOS7实现Nginx+Tomcat 负载均衡
  2. ajax实现异步操作实例1
  3. C# 实体之间转换
  4. vue中v-if和v-for优先级
  5. js鼠标点击特效,有关参数设置
  6. upupw : Apache Php5.5 的使用
  7. 因xhost命令和DISPLAY环境变量操作不当导致无法启动Oracle图形化安装界面
  8. python 有用的库
  9. linux上如何安装git
  10. oracle 数据库启动停止小结