前置选学课程:观察者设计模式
Pub/Sub(Publish, Subscribe),即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。
redis可以作为一个pub/sub 服务器,在发布者和订阅者之间起到消息路由的功能。
- 发布者通过 publish 命令向 redis 服务器 的 channel 发送消息,订阅该 channel 的全部 client 都会收到此消息。
- 订阅者可以通过 subscribe 和 psubscribe 命令向 redis server 订阅自己感兴趣的channel ;
Client即可以作为消息发送方,也可以作为消息接收方。 一个 client 可以订阅多个 channel,一个client可以向多个 channel 发送消息。
Redis 客户端可以订阅任意数量的频道。 下图展示了频道 channel1,以及订阅这个频道的client2、client5和 client1三个客户端之间的关系: 当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
-
PSUBSCRIBE pattern [pattern …] 订阅一个或多个符合给定模式的频道。 每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等), news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。
-
PUBLISH channel message 将信息 message 发送到指定的频道 channel 。
-
PUNSUBSCRIBE [pattern [pattern …]] 指示客户端退订所有给定模式。 如果没有模式被指定,也即是,一个无参数的 PUNSUBSCRIBE 调用被执行,那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
-
SUBSCRIBE channel [channel …] 订阅给定的一个或多个频道的信息。
-
UNSUBSCRIBE [channel [channel …]] 指示客户端退订给定的频道。 如果没有频道被指定,也即是,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
- 消息发送方
- 消息接收方
发送方
接收方
Redis 的发布与订阅实现也支持模式匹配(pattern matching): 客户端可以订阅一个带 * 号的模式, 如果某个/某些频道的名字和这个模式匹配, 那么当有信息发送给这个/这些频道的时候, 客户端也会收到这个/这些频道的信息。需要注意的是:通过pattern模式而接收到的信息的类型为 pmessage。 客户端将收到来自 news.zhengzhi 、 news.yule 等频道的信息。
注意:
- 因为所有接收到的信息都会包含一个信息来源:当信息来自频道时,来源是某个频道;当信息来自模式时,来源是某个模式。因此, 客户端可以用一个哈希表,将特定来源和处理该来源的回调函数关联起来。 当有新信息到达时, 程序就可以根据信息的来源, 在 O(1) 复杂度内, 将信息交给正确的回调函数来处理。比如 SUBSCRIBE foo PSUBSCRIBE f* 那么当有信息发送到频道 foo 时, 客户端将收到两条信息: 一条来自频道 foo ,信息类型为 message ; 另一条来自模式 f* ,信息类型为 pmessage 。 3、要在单独的线程中订阅,因为subscribe会阻塞当前线程的执行。你可以使用一个PubSub实例来订阅多个Channel。 4、一旦客户端进入订阅状态,客户端就只可接受订阅相关的命令SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE和PUNSUBSCRIBE除了这些命令,其他命令一律失效。 5、使用PUNSUBSCRIBE命令只能退订通过PSUBSCRIBE命令订阅的规则,不会影响SUBSCRIBE订阅的频道。
使用Jedis实现发布订阅需要继承抽象类JedisPubSub,在该类中定义了如下六个方法。分别表示
- public void onMessage(String channel, String message) 监听到订阅模式接受到消息时的回调
- public void onPMessage(String pattern, String channel, String message) 监听到订阅频道接受到消息时的回调
- public void onSubscribe(String channel, int subscribedChannels) 订阅频道时的回调
- public void onPSubscribe(String pattern, int subscribedChannels) 订阅频道模式时的回调
- public void onUnsubscribe(String channel, int subscribedChannels) 取消订阅频道时的回调
- public void onPUnsubscribe(String pattern, int subscribedChannels) 取消订阅模式时的回调
准备工作:加入Maven依赖
redis.clients
jedis
3.1.0
第一步:
public class MyJedisPubSub extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println(String.format(
"线程'%s':收到来自频道的'%s'消息,具体消息为:'%s'",
Thread.currentThread().getName(), channel, message
));
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format(
"线程'%s':成功订阅频道'%s',目前该线程所订阅频道总数为:'%s'",
Thread.currentThread().getName(), channel, subscribedChannels
));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(String.format(
"线程'%s':取消频道订阅频道'%s',目前该线程所订阅频道总数为:'%s'",
Thread.currentThread().getName(), channel, subscribedChannels
));
}
}
第二步:测试代码
-
先启动消息接收方
public class Subscriber { public static void main(String[] args) throws InterruptedException { MyJedisPubSub sub1 = new MyJedisPubSub(); MyJedisPubSub sub2 = new MyJedisPubSub(); //1号订阅频道线程 new Thread(() -> { Jedis jedis = JedisPoolUtil.getJedis(); jedis.subscribe(sub1, "ch"); JedisPoolUtil.release(jedis); }).start(); //2号订阅频道线程 new Thread(() -> { Jedis jedis = JedisPoolUtil.getJedis(); jedis.subscribe(sub2, "ch"); JedisPoolUtil.release(jedis); }).start(); Thread.sleep(10000); } }
-
再启动消息发送方
public class Publisher { public static void main(String[] args) throws InterruptedException { int i = 0; Jedis jedis = JedisPoolUtil.getJedis(); while (i
关注打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?