这里我使用Redis的发布、订阅功能实现简单的消息队列,基本的命令有publish、subscribe等。

在Jedis中,有对应的java方法,但是只能发布字符串消息。为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。

使用Redis实现消息队列


封装一个消息对象

public class Message implements Serializable{

private static final long serialVersionUID = 1L;

private String titile;
private String info; public Message(String titile,String info){
this.titile=titile;
this.info=info;
} public String getTitile() {
return titile;
}
public void setTitile(String titile) {
this.titile = titile;
}
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
}

  

为这个消息对象提供序列化方法

public class MessageUtil {

//convert To String
public static String convertToString(Object obj,String charset) throws IOException{ ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
String str = bo.toString(charset);
bo.close();
oo.close();
return str;
} //convert To Message
public static Object convertToMessage(byte[] bytes) throws Exception{
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream sIn = new ObjectInputStream(in);
return sIn.readObject(); }
}

  

从Jedis连接池中获取连接

public class RedisUtil {

/**
* Jedis connection pool
* @Title: config
*/
public static JedisPool getJedisPool(){
ResourceBundle bundle=ResourceBundle.getBundle("redis");
String host=bundle.getString("host");
int port=Integer.valueOf(bundle.getString("port"));
int timeout=Integer.valueOf(bundle.getString("timeout"));
// String password=bundle.getString("password"); JedisPoolConfig config=new JedisPoolConfig();
config.setMaxActive(Integer.valueOf(bundle.getString("maxActive")));
config.setMaxWait(Integer.valueOf(bundle.getString("maxWait")));
config.setTestOnBorrow(Boolean.valueOf(bundle.getString("testOnBorrow")));
config.setTestOnReturn(Boolean.valueOf(bundle.getString("testOnReturn"))); JedisPool pool=new JedisPool(config, host, port, timeout); return pool;
}
}

  

创建Provider类

public class Producer {

private Jedis jedis;
private JedisPool pool; public Producer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
} public void provide(String channel,Message message) throws IOException{
String str1=MessageUtil.convertToString(channel,"UTF-8");
String str2=MessageUtil.convertToString(message,"UTF-8");
jedis.publish(str1, str2);
} //close the connection
public void close() throws IOException {
//将Jedis对象归还给连接池,关闭连接
pool.returnResource(jedis);
}
}

  

创建Consumer类

public class Consumer {

private Jedis jedis;
private JedisPool pool; public Consumer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
} public void consum(String channel) throws IOException{
JedisPubSub jedisPubSub = new JedisPubSub() {
// 取得订阅的消息后的处理
public void onMessage(String channel, String message) {
System.out.println("Channel:"+channel);
System.out.println("Message:"+message.toString());
} // 初始化订阅时候的处理
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("onSubscribe:"+channel);
} // 取消订阅时候的处理
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("onUnsubscribe:"+channel);
} // 初始化按表达式的方式订阅时候的处理
public void onPSubscribe(String pattern, int subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
} // 取消按表达式的方式订阅时候的处理
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
} // 取得按表达式的方式订阅的消息后的处理
public void onPMessage(String pattern, String channel, String message) {
System.out.println(pattern + "=" + channel + "=" + message);
}
}; jedis.subscribe(jedisPubSub, channel);
} //close the connection
public void close() throws IOException {
//将Jedis对象归还给连接池
pool.returnResource(jedis);
}
}

  

测试方法

public static void main(String[] args){

Message msg=new Message("hello!", "this is the first message!");

Producer producer=new Producer();
Consumer consumer=new Consumer();
try {
producer.provide("chn1",msg);
consumer.consum("chn1");
} catch (IOException e) {
e.printStackTrace();
}
}

  

最新文章

  1. CocoaPods 安装 使用
  2. 使用strace 工具跟踪系统调用和信号
  3. Easyui表单之下拉列表的三级联动
  4. 连接英文字符集的ORACLE和调用存储过程问题及64位服务器连接ORACLE问题
  5. Java基础语法总结
  6. 一个Delphi7的BUG
  7. Failed to load JavaHL Library解决方法
  8. nova --debug image-list
  9. ibatis 开发中的经验 (一)ibatis 和hibernate 在开发中的理解
  10. 补充一下sql server(临时表)
  11. Express难点解析
  12. spark-sql
  13. 搭建NDK环境
  14. Xamarin.Android 利用Fragment实现底部菜单
  15. IDA*(以The Ratotion Game POJ--2286 UVa1343为例)
  16. Aptana在Eclipse的安装
  17. Android开发——Fragment知识整理(一)
  18. PAT 1044 火星数字
  19. EnumMap实现类
  20. gdb用法

热门文章

  1. VC常用小知识
  2. jqueryui插件slider的简单使用
  3. 使用OneNote2016发送博客
  4. Linux系统基础优化及常用命令
  5. 数据库-mysql-DDL-表记录操作
  6. 2)实现github自动登陆获取信息
  7. python - 发送html格式的邮件
  8. PHP实现网络Socket及IO多路复用
  9. SpringBoot集成Shiro
  10. Es6对象的扩展和Class类的基础知识笔记