Redis的发布订阅
什么是发布订阅
- 什么是发布订阅
- Redis 的发布和订阅
- 发布订阅命令行实现
- 代码实现
- 导入依赖包
- 编写订阅者
- 编写一个子线程
- 编写发布者
- 测试
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
Redis 客户端可以订阅任意数量的频道。
Redis 的发布和订阅1.客户端可以订阅频道如下图 2.当给这个频道发布消息后,消息就会发送给订阅的客户端
-
打开一个客户端订阅 channel1 subscribe channel1
-
打开另一个客户端,给 channel1 发布消息 hello publish channel1 hello
-
打开第一个客户端可以看到发送的消息
注意:发布的消息没有持久化,如果在订阅的客户端收不到hello,只能厚道订阅后发布的消息。也就是说,订阅前的消息是收不到的。
代码实现 导入依赖包
redis.clients
jedis
2.9.0
编写订阅者
创建订阅者需要继承抽象类JedisPubSub,然后重写以下三个方法
package com.wpp.redis_publish_subscribe.subscrib;
import redis.clients.jedis.JedisPubSub;
/**
* @Author wpp
* @Date 2022/05/07/11:40
* @Description 订阅者
*/
public class Subscriber extends JedisPubSub {
public Subscriber(){}
@Override
public void onMessage(String channel, String message) {//收到消息会调用
System.out.println("从频道:"+ channel + "收到发布的消息:" + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {//订阅了频道会调用
System.out.println("订阅频道成功");
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {//取消订阅 会调用
System.out.println("取消订阅");
}
}
编写一个子线程
这个子线程主要用于创建一个订阅者并去订阅某个频道
package com.wpp.redis_publish_subscribe.channerl;
import com.wpp.redis_publish_subscribe.subscrib.Subscriber;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* @Author wpp
* @Date 2022/05/07/13:14
* @Description 订阅频道
*/
public class SubThread extends Thread{
private final JedisPool jedisPool;
private final Subscriber subscriber = new Subscriber();
private final String channel = "mychannel";
public SubThread(JedisPool jedisPool) {
super("SubThread");
this.jedisPool = jedisPool;
}
@Override
public void run() {
System.out.println("准备订阅");
Jedis jedis = null;
try {
jedis = jedisPool.getResource();//取出一个连接
jedis.subscribe(subscriber,channel);//通过subscribe 的api去订阅,入参是订阅者和频道名
} catch (Exception e) {
System.out.println("订阅失败");
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
编写发布者
我们的发布者需要继承线程类
package com.wpp.redis_publish_subscribe.publish;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* @Author wpp
* @Date 2022/05/07/11:31
* @Description 发布者
*/
public class Publisher extends Thread {
private final JedisPool jedisPool;
public Publisher(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public void run() {
System.out.println("发布者。。。");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
Jedis jedis = jedisPool.getResource();//连接池中取出一个连接
while (true) {
String line = null;
try {
line = reader.readLine();
if (!"quit".equals(line)) {
jedis.publish("mychannel",line);//从 mychannel 的频道上推送消息
} else {
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
测试
最后我们来做个测试喽
package com.wpp.redis_publish_subscribe;
import com.wpp.redis_publish_subscribe.channerl.SubThread;
import com.wpp.redis_publish_subscribe.publish.Publisher;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* @Author wpp
* @Date 2022/05/07/14:20
* @Description
*/
public class Demo {
public static void main(String[] args) {
JedisPool jedisPool = new JedisPool(new JedisPoolConfig(),"Redis地址",6379,0,"Redis密码",0);
SubThread subThread = new SubThread(jedisPool);//订阅者
subThread.start();
Publisher publisher = new Publisher(jedisPool);//发布者
publisher.start();
}
}
运行项目之后: 在控制台输入hello,结果如下:
ok,完美!为了模拟更真实点,完全可以把发布者和订阅者写在两个不同的项目中试验。