- 1. consume的各种subscription mode订阅模式
- 1.1 介绍
- 1.2 topic数据准备
- 1.3 subscription mode
- 1.3.1 Exclusive(默认)
- 1.3.2 Failover
- 1.3.3 Shared
- 1.3.4 Key_Shared
- 2. consume消费者的消费规则
- 一个subscription name有时可以有多个consume客户端
- 一个consume只能有一个subscription name
向my-topic以单条方式发送如下10条数据
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();
1.3 subscription mode
1.3.1 Exclusive(默认)
subscription name只能被第一个consume订阅。其它consume订阅会报错
consume的示例代码
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionType
val pulsarClient:PulsarClient = _
val consumer:Consumer[String] = pulsarClient.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("subscription-exclusive")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe()
1.3.2 Failover
consume1和consume2的示例代码一样,如下所示:
val pulsarClient:PulsarClient = _
val consumer:Consumer[String] = pulsarClient.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("subscription-failover")
.subscriptionType(SubscriptionType.Failover)
.subscribe()
先启动consume1,consume1是active consume。再启动consume2,consume2是standby consume。如果consume1消费了5条数据之后挂掉了,consume2变成active consume消费剩余的5条数据
注意:如果topic是分区的,假如my-topic有2个partition,可能consume1对于partition1是active consume,consume2对于partition1是standby consume;consume2对于partition2是active consume,consume1对于partition2是standby consume
1.3.3 Sharedconsume1和consume2的示例代码一样,如下所示:
val pulsarClient:PulsarClient = _
val consumer:Consumer[String] = pulsarClient.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("subscription-shared")
.subscriptionType(SubscriptionType.Shared)
.subscribe()
先启动consume1,再启动consume2。consume1和consume2都是active consume,消息以round-robin的方式被consume1和consume2消费。此消费方式不能保证消息的顺序性
1.3.4 Key_Sharedconsume1和consume2的示例代码一样,如下所示:
val pulsarClient:PulsarClient = _
val consumer:Consumer[String] = pulsarClient.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("subscription-key-shared")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
先启动consume1,再启动consume2。consume1和consume2都是active consume。相同key的消息只能被一个consume消费,但一个key被哪个consume消费是随机的。例如key-1和key-3被consume1消费;key-2和key-4被consume2消费
注意:默认produce以batch的方式发送消息,一个batch可能包含不同的key。consume也是以batch的方式进行消费。这和Key_Shared的订阅模式是相冲突的。可以通过下面两种方式进行解决:
方式一:produce使用KeyBasedBatcher
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.BatcherBuilder
val produce:Producer[String] = pulsarClient.newProducer(Schema.STRING)
.topic("my-topic")
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create()
方式二:produce关闭batch发送消息
val produce:Producer[String] = pulsarClient.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(false)
.create()
注意:如果消息没有指定key,则消息只能被一个consume消费
2. consume消费者的消费规则- 默认是从Latest位置消费,也可以从Earliest位置消费
- 一个subscription消费过了一条消息,就算开启从Earliest位置消费,也不会重复消费该消息,而是从subscription保持的位置消费
- 被一个subscription消费的一条消息,不会立刻删除。其它subscription开启从Earliest位置消费,也能消费该消息