凌云时刻 · 技术
导读:这一节来看看使用命令行启动Consumer接收消息。
作者 | 计缘
来源 | 凌云时刻(微信号:linuxpk)
Consumer CLI
通过如下的命令启动Consumer:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic
kafka-console-consumer.sh
是启动Consumer的命令。--bootstrap-server
指定要连接的Broker地址。--topic
指定Topic名称,既要从哪个Topic里读数据。
如上图所示,左边启动的是Consumer,右边启动的是Producer。Producer发送的消息可以实时的被Consumer接收到。但是有一个问题,那就是在上一节中,我们已经给first_topic
这个Topic发送了一些数据。但是现在Consumer启动后并没有收到。这是因为通过上面的命令启动的Consumer接收的是最新的消息,如果想接收所有的消息,还需要带一个参数:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning
--from-beginning
表示启动的Consumer要接收所有的消息。
前文中说过,Consumer一般都是以组的形式存在,所以可以再加一个参数来创建一个Consumer Group:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group consumer_group_1
--group
可以指定Consumer Group的名称。
如上图所示,左边启动了三个Consumer,这三个Consumer都在同一个名为consumer_group_1
的组里。因为first_topic
这个Topic有三个Partitions,所以当一个Consumer Group中有三个Consumer时,他们的收到的信息不会重复。
又如上图所示,左边启动了三个Consumer,但是前两个在consumer_group_1
的组里,最后一个在consumer_group_2
的组里,所以前两个Consumer是以轮询的方式收到消息的,而最后一个Consumer可以收到全部的消息。
上面两个示例也充分证明了前文中所说的,不同的Consumer Group可以消费同一个Topic中相同的Partition的消息,但是Consumer Group内的Consumer不能消费同一个Topic中相同的Partition的消息。
上面的命令是显示的创建Consumer Group。上文中说到过,Kafka中,Consumer都是以组的形式连接Broker消费数据的。那么如果只有一个Consumer的情况下,是否有Consumer Group呢?其实,当只有一个Consumer时,也会自动创建一个Consumer Group。我们可以通过另外一组Consumer Group CLI来看一下:
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
console-consumer-40439
console-consumer-81216
consumer_group_1c
console-consumer-14387
consumer_group_2
consumer_group_1
console-consumer-40563
可以看到,已经存在的Consumer Group中,除了我们之前创建的,还有以console-consumer-xxxxx
这种命名格式存在的Consumer Group。这就是当我们只启动一个Consumer时Kafka自动为这个Consumer创建的Consumer Group。这里可以做个实验,先启动一个Consumer:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic
然后再来看看Consumer Group是否有增加:
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
console-consumer-96752
console-consumer-40439
console-consumer-81216
consumer_group_1c
console-consumer-14387
consumer_group_2
consumer_group_1
console-consumer-40563
我们看到增加了一个Consumer Groupconsole-consumer-96752
。
Consumer Group列表看完了,再来看看某一个Consumer Group的详细信息,比如查看consumer_group_1
的详细信息。可以使用如下命令:
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1
Consumer group 'consumer_group_1' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first_topic 0 17 17 0 - - -
first_topic 2 17 17 0 - - -
first_topic 1 18 18 0 - -
首先会告诉我们该Consumer Group中是否有正在活跃的Consumer,目前没有启动任何Consumer,所以提示我们Consumer group 'consumer_group_1' has no active members.
然后会列出该Consumer Group消费的Topic、Partition情况、Offset情况、延迟(LAG)情况、处于活跃状态的Consumer信息。
可以看到consumer_group_1
这个Consumer Group正在消费first_topic
这个Topic中的Message,一共从三个Partition中消费了52条Messages,并且目前已经消费了全部的数据,因为每个Partition的延迟都是0,说明没有还未接收的Message。
现在我们再往first_topic
中发送一条Message,再来看看情况如何:
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=1
>this is another message.
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1
Consumer group 'consumer_group_1' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first_topic 0 17 17 0 - - -
first_topic 2 17 17 0 - - -
first_topic 1 18 19 1 - - -
可以看到Partition 1 的LOG-END-OFFSET
是19,而CURRENT-OFFSET
是18,并且Partition 1 的LAG
是1,说明现在first-topic
一共接收到了19条Message,而consumer-group-1
只消费了18条,有1条延迟。
我们再启动consumer_group_1
中的Consumer,然后再看看数据:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --group consumer_group_1
this is another message.
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first_topic 0 17 17 0 consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1
first_topic 1 19 19 0 consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1
first_topic 2 17 17 0 consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1
可以看到,目前有一个处于活跃的Consumer,并且Messages全部被消费。
总结
这一章节主要介绍了如何使用Kafka的Consumer CLI接收Producer生产的Message。同时能更直观的印证之前介绍概念时提到的内容,比如Consumer Group的机制、Offset机制等。下一章节进一步介绍Offset的操作以及Config CLI。希望能给小伙伴们带来帮助。
END
往期精彩文章回顾
Kafka CLI:Topic CLI & Producer CLI
Kafka从上手到实践 - 实践真知:搭建单机Kafka
Kafka从上手到实践 - 庖丁解牛:Consumer
Kafka从上手到实践 - 庖丁解牛:Producer
Kafka从上手到实践 - 庖丁解牛:Partition
Kafka从上手到实践 - 庖丁解牛:Topic & Broker | 凌云时刻
Kafka从上手到实践 - 初步认知:MQ系统
进阶之路:深入解读 Java 堆外内存
干货:一文看懂Apache Ranger
吴翰清:有变革的需求,才有技术的诞生
长按扫描二维码关注凌云时刻
每日收获前沿技术与科技洞见