Jedis实现发布订阅功能
Redis为我们提供了publish/subscribe(发布/订阅)功能。我们可以对某个channel(频道)进行subscribe(订阅),当有人在这个channel上publish(发布)消息时,redis就会通知我们,这样我们可以收到别人发布的消息。
作为Java的redis客户端,Jedis提供了publish/subscribe的接口。本文讲述如何使用Jedis来实现redis的publish/subscribe。
定义Subscriber类
Jedis定义了抽象类JedisPubSub
,在这个类中,定义publish/subsribe的回调方法。通过继承JedisPubSub
类并重新实现这些回调方法,当publish/subsribe事件发生时,我们可以定制自己的处理逻辑。
在以下例子中,我们定义了Subscriber
类,这个类继承了JedisPubSub
类,并重新实现了其中的回调方法。
import redis.clients.jedis.JedisPubSub; public class Subscriber extends JedisPubSub {
public Subscriber() {
} public void onMessage(String channel, String message) {
System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
} public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
channel, subscribedChannels));
} public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
channel, subscribedChannels)); }
}
定义SubThread线程类
由于Jedis的subscribe
操作是阻塞的,因此,我们另起一个线程来进行subscribe操作。
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool; public class SubThread extends Thread {
private final JedisPool jedisPool;
private final Subscriber subscriber = new Subscriber(); private final String channel = "mychannel"; public SubThread(JedisPool jedisPool) {
super("SubThread");
this.jedisPool = jedisPool;
} @Override
public void run() {
System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.subscribe(subscriber, channel);
} catch (Exception e) {
System.out.println(String.format("subsrcibe channel error, %s", e));
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
在上面的代码中,我们从JedisPool
获取一个Jedis
实例,并使用这个Jedis
实例进行subscribe
的操作。Jedis
的subscribe
的声明如下:
public void subscribe(final JedisPubSub jedisPubSub, final String… channels)
第一个参数接受一个JedisPubSub
对象,第二个参数指定对哪个频道进行订阅。上例中,我们把我们定义的Subscriber
对象传给subscribe
方法。
当publish/subscribe的事件发生时,会自动调用我们Subscriber
的方法。
定义Publisher类
Publisher
类接受用户的输入,并将输入发布到channel。当用户输入”quit”后,输入结束。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader; import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool; public class Publisher {
private final JedisPool jedisPool; public Publisher(JedisPool jedisPool) {
this.jedisPool = jedisPool;
} public void start() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
Jedis jedis = jedisPool.getResource();
while (true) {
String line = null;
try {
line = reader.readLine();
if (!"quit".equals(line)) {
jedis.publish("mychannel", line);
} else {
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
定义入口代码
如下是我们的程序入口代码。
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig; public class PubSubDemo
{
public static void main( String[] args )
{
// 替换成你的reids地址和端口
String redisIp = "192.168.229.154";
int reidsPort = 6379;
JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), redisIp, reidsPort);
System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", redisIp, reidsPort)); SubThread subThread = new SubThread(jedisPool);
subThread.start(); Publisher publisher = new Publisher(jedisPool);
publisher.start();
}
}
在上面的代码中,我们首先生成了一个JedisPool
的redis连接池,这是由于Jedis
不是线程安全的,JedisPool
是线程安全的。而我们的程序在主线程和订阅线程(SubThread)均需要使用Jedis
,故在程序中我们使用JedisPool
。具体也可以参考在多线程环境中使用Jedis。
由于Jedis
的subcribe操作是阻塞的,故我们另起了一个线程来进行subcribe操作。
通过调用Publisher::start()
方法,接受用户的输入,并publish到指定的channel。
输出
redis pool is starting, redis ip 192.168.229.154, redis port 6379
subscribe redis, channel mychannel, thread will be blocked
subscribe redis channel success, channel mychannel, subscribedChannels 1
这时输入
hello
控制窗口中输出
receive redis published message, channel mychannel, message hello
参考资料
- https://github.com/xetorthio/jedis/wiki/AdvancedUsage
- http://basrikahveci.com/a-simple-jedis-publish-subscribe-example/
最新文章
- CentOS个人目录下中文路径转英文路径
- oracle 11g RAC 补丁升级方法
- Hash
- 关于C转汇编(转自网上)
- 突然出现 -bash: pod: command not found 的解决方法
- BZOJ 2595 斯坦那树
- winston日志管理1
- 那些不能错过的Xcode插件
- [ 兼容 ] IE和Firefox的Javascript兼容性总结
- Windows桌面开发者的必备软件
- 备忘-zTree v3.5 Demo 演示
- WPF 中如何使用第三方控件 ,可以使用WindowsFormsHost 类
- Delphi ADOQuery连接数据库的查询、插入、删除、修改
- ZJOI2008树的统计Count
- typedef如何显示变量类型名
- 从git中删除 .idea 目录
- 如何将.SQL文件的数据导入到Mysql的数据库中
- mfc CListBox
- ST3 插件和技巧
- Spring 网路搜集的情报
热门文章
- [转]How to build a data storage and VM Server using comodity hardware and free software
- 【C基础】const用法
- DevExpress 学习使用之 ComboBoxEdit
- ios学习笔记第四天之官方文档总结
- SPI and API
- CSS hack大全&;详解(什么是CSS hack)
- convert.c:7:3: warning: incompatible implicit declaration of built-in function ‘printf’ [enabled by
- 三千万数据量下redis2.4的一统计情况
- Linux:备份
- Dreamer 框架 比Struts2 更加灵活