您当前的位置: 首页 >  redis

梁云亮

暂无认证

  • 3浏览

    0关注

    1211博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Redis发布订阅

梁云亮 发布时间:2019-10-29 09:02:16 ,浏览量:3

Redis 发布订阅

前置选学课程:观察者设计模式

Pub/Sub(Publish, Subscribe),即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。

redis可以作为一个pub/sub 服务器,在发布者和订阅者之间起到消息路由的功能。

  • 发布者通过 publish 命令向 redis 服务器 的 channel 发送消息,订阅该 channel 的全部 client 都会收到此消息。
  • 订阅者可以通过 subscribe 和 psubscribe 命令向 redis server 订阅自己感兴趣的channel ;

在这里插入图片描述

Client即可以作为消息发送方,也可以作为消息接收方。 一个 client 可以订阅多个 channel,一个client可以向多个 channel 发送消息。

Redis 客户端可以订阅任意数量的频道。 下图展示了频道 channel1,以及订阅这个频道的client2、client5和 client1三个客户端之间的关系: 在这里插入图片描述 当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端: 在这里插入图片描述

语法:
  • PSUBSCRIBE pattern [pattern …] 订阅一个或多个符合给定模式的频道。 每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等), news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。

  • PUBLISH channel message 将信息 message 发送到指定的频道 channel 。

  • PUNSUBSCRIBE [pattern [pattern …]] 指示客户端退订所有给定模式。 如果没有模式被指定,也即是,一个无参数的 PUNSUBSCRIBE 调用被执行,那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。

  • SUBSCRIBE channel [channel …] 订阅给定的一个或多个频道的信息。

  • UNSUBSCRIBE [channel [channel …]] 指示客户端退订给定的频道。 如果没有频道被指定,也即是,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。

示例1:Redis模拟
  • 消息发送方 在这里插入图片描述
  • 消息接收方 在这里插入图片描述
示例2:

发送方

在这里插入图片描述 接收方 在这里插入图片描述

Redis 的发布与订阅实现也支持模式匹配(pattern matching): 客户端可以订阅一个带 * 号的模式, 如果某个/某些频道的名字和这个模式匹配, 那么当有信息发送给这个/这些频道的时候, 客户端也会收到这个/这些频道的信息。需要注意的是:通过pattern模式而接收到的信息的类型为 pmessage。 在这里插入图片描述 客户端将收到来自 news.zhengzhi 、 news.yule 等频道的信息。

注意:

  • 因为所有接收到的信息都会包含一个信息来源:当信息来自频道时,来源是某个频道;当信息来自模式时,来源是某个模式。因此, 客户端可以用一个哈希表,将特定来源和处理该来源的回调函数关联起来。 当有新信息到达时, 程序就可以根据信息的来源, 在 O(1) 复杂度内, 将信息交给正确的回调函数来处理。比如 SUBSCRIBE foo PSUBSCRIBE f*   那么当有信息发送到频道 foo 时, 客户端将收到两条信息: 一条来自频道 foo ,信息类型为 message ; 另一条来自模式 f* ,信息类型为 pmessage 。 3、要在单独的线程中订阅,因为subscribe会阻塞当前线程的执行。你可以使用一个PubSub实例来订阅多个Channel。 4、一旦客户端进入订阅状态,客户端就只可接受订阅相关的命令SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE和PUNSUBSCRIBE除了这些命令,其他命令一律失效。 5、使用PUNSUBSCRIBE命令只能退订通过PSUBSCRIBE命令订阅的规则,不会影响SUBSCRIBE订阅的频道。

使用Jedis实现发布订阅需要继承抽象类JedisPubSub,在该类中定义了如下六个方法。分别表示

  • public void onMessage(String channel, String message) 监听到订阅模式接受到消息时的回调
  • public void onPMessage(String pattern, String channel, String message) 监听到订阅频道接受到消息时的回调
  • public void onSubscribe(String channel, int subscribedChannels) 订阅频道时的回调
  • public void onPSubscribe(String pattern, int subscribedChannels) 订阅频道模式时的回调
  • public void onUnsubscribe(String channel, int subscribedChannels) 取消订阅频道时的回调
  • public void onPUnsubscribe(String pattern, int subscribedChannels) 取消订阅模式时的回调
示例3 :Jedis模拟-发送方不停地发送消息,接收方不断地接收消息。

准备工作:加入Maven依赖


    redis.clients
    jedis
    3.1.0

第一步:

public class MyJedisPubSub extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println(String.format(
                "线程'%s':收到来自频道的'%s'消息,具体消息为:'%s'",
                Thread.currentThread().getName(), channel, message
        ));
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format(
                "线程'%s':成功订阅频道'%s',目前该线程所订阅频道总数为:'%s'",
                Thread.currentThread().getName(), channel, subscribedChannels
        ));
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format(
                "线程'%s':取消频道订阅频道'%s',目前该线程所订阅频道总数为:'%s'",
                Thread.currentThread().getName(), channel, subscribedChannels
        ));
    }
}

第二步:测试代码

  1. 先启动消息接收方

    public class Subscriber {
        public static void main(String[] args) throws InterruptedException {
            MyJedisPubSub sub1 = new MyJedisPubSub();
            MyJedisPubSub sub2 = new MyJedisPubSub();
    
            //1号订阅频道线程
            new Thread(() -> {
                Jedis jedis = JedisPoolUtil.getJedis();
                jedis.subscribe(sub1, "ch");
                JedisPoolUtil.release(jedis);
            }).start();
    
            //2号订阅频道线程
            new Thread(() -> {
                Jedis jedis = JedisPoolUtil.getJedis();
                jedis.subscribe(sub2, "ch");
                JedisPoolUtil.release(jedis);
            }).start();
    
            Thread.sleep(10000);
        }
    }
    
  2. 再启动消息发送方

    public class Publisher {
        public static void main(String[] args) throws InterruptedException {
            int i = 0;
            Jedis jedis = JedisPoolUtil.getJedis();
            while (i             
关注
打赏
1665409997
查看更多评论
0.1890s