您当前的位置: 首页 >  kafka

墨家巨子@俏如来

暂无认证

  • 0浏览

    0关注

    188博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

五.Kafka入门到精通-深度剖析Kafka执行原理

墨家巨子@俏如来 发布时间:2022-07-18 15:02:15 ,浏览量:0

前言

前面我们学习了SpringBoot整合Kafka进行编码实战,本篇文章我们来研究一下Producer 发送消息 和 Consumer 消费消息的底层执行原理,让大家对Kafka理解得更加深入。

Kafka 整体工作流程

kafka依赖ZooKeeper负责维护整个Kafka集群的状态,存储Kafka各个节点的信息及状态,实现Kafka集群的高可用,协调Kafka的工作内容。工作流程如下: 在这里插入图片描述

这里对图中的概念再做一下解释,虽然前面章节已经有解释过了

  • Producer:消息和数据的生产者,主要负责生产消息,消息发送到指定Broker的Topic/partition中
  • Broker:Kafka节点就是被称为Broker,Broker主要负责创建Topic,存储Producer所发布的消息,记录消息处理的过程,先是将消息保存到内存中,然后持久化到磁盘。
  • Topic:同一个Topic的消息可以分布在一个或多个Broker上,一个Topic包含一个或者多个Partition分区,数据被存储在多个Partition中。
  • Partition:分区;在这里被称为Topic物理上的分组,一个Topic在Broker中被分为1个或者多个Partition,也可以说为每个Topic包含一个或多个Partition,(一般为kafka节. 点数CPU的总核心数量)分区在创建Topic的时候可以指定。分区才是真正存储数据的单元。
  • leader 和 follower : 为了防止brocker挂掉导致数据丢失以及服务不可用,kafka使用冗余机制把数据备份多个 replica副本,replica副本分为leader 和 follower , 生产者把数据写入leader,follower负责从leader中同步数据,这样起到数据备份的作用,如果leader所在的brocker挂掉,Kafka 会从剩余的 replica 中选举出新的 leader 续提供服务
  • Consumer:消息和数据的消费者,主要负责主动到已订阅的Topic中拉取消息并消费,为什么Consumer不能像Producer一样的由Broker去push数据呢?因为Broker不知道Consumer能够消费多少,如果push消息数据量过多,会造成消息阻塞,而由Consumer去主动pull数据的话,Consumer可以根据自己的处理情况去pull消息数据,消费完多少消息再次去取。这样就不会造成Consumer本身已经拿到的数据成为阻塞状态。
  • ZooKeeper:ZooKeeper负责维护整个Kafka集群的状态,存储Kafka各个节点的信息及状态,实现Kafka集群的高可用,协调Kafka的工作内容。

整体工作流程如下:

  1. kafka的brocker启动主动向zk注册,把自己的主机端口等提交给zk,brocker集群信息会保存在zk的/brokers 路径下
  2. 生产者选择指定的brocker发送消息,并指定消息发送到哪个topic下的那个pritition中。并把消息持久化到磁盘
  3. 消费者从zk中获取 topic 和 partition 相关信息 , 其中维护得有 partition和brocker的对应关系
  4. 根据读取到的topic/partition和brocker对应关系,让consumer链接上brocker,从分配的parititon消费消息

注意:这里的Producer并不需要根据ZooKeeper来获取集群状态,而是在配置中指定多个Broker节点进行发送消息,同时跟指定的Broker建立连接,来从该Broker中获取集群的状态信息,这时Producer可以知道集群中有多少个Broker是否在存活状态,每个Broker上的Topic有多少个Partition。然后把消息发送给分配的partition中

Kafka Producer原理 Producer执行流程

我们要探讨的是消息生产者也就是KafkaProducer是如何把消息发送到Kafka Brocker中的,要知道KafkaProducer就是把用户待发送的消息封装成ProducerRecord对象,它由五个字段组成(topic,partition分区, key消息的key,value消息体,timestamp 消息时间戳),然后使用 KafkaProducer#send 方法进行发送,详细流程如下 在这里插入图片描述

消息的发送过程被分为两个不同的线程:主线程和Sender I/0线程

  1. KafkaProducer拿到消息首先会进入拦截器,通过拦截器可以实现对消 息的扩展

  2. 随后对消息进行序列化,Kafka有自己的虚拟化器,不使用Java自带的序列化是因为Java的序列化后数据太过臃肿,性能较差。

  3. 接着通过Partitioner分区器会根据一定的路由策略确定Topic下的某个分区。

  4. 然后把消息写入消息缓冲区(RecordAccumuator)。producer 创建时会创建 个默认 32MB (由 buffer.memory 参数指定〉的 accumulator 缓冲区,专门保存待发送的消息。消息是分批次发送的,发往不同分区的消息保存在对应分区下的 batch 队列中

  5. KafkaProducer中还有 个专门的 Sender 线程负责将缓冲区中的消息分批次(batch)发送给 Kafka broker,消息是分批(batch)发送的性能更好。Sender发送数据有几个条件 batch.size : 批次大小,消息累计到batch.size(默认16k)大小之后sender才会发送 linger.ms :延迟发送时间,如果消息并没有达到batch.size 但是等待时间达到了linger.ms,数据也会被发送。linger.ms默认 0 毫秒。

    需要特别说明的是:kafka 在设计底层网络库时采用了 Java Selector 机制(NIO),实现高效的网络传输。

  6. Brocker收到消息之后做出应答(acks),应答模式有: 0 ,1 ,-1/all 0 :不用等到KafkaBrocker的leader和follower副本是否成功存储消息到磁盘 1 :Kafka Brocker 的Leader 副本收到producer 发送的消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他lollowrer副本写入该消息 -1(或者all) : Leader broker 不仅会将消息写入本地日志,同时还会等待所有其他followrer副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低

  7. 如果发送成功,这正常返回结果,否则会进行重试,重试次数是Interger的最大值,可以修改 spring.kafka.producer.retries

    Kafka服务器会返回一个RecordMetadata(元数据)给客户端,内容包含:offset 消息的位置 ;timestamp消息的时间戳 ;topic/partition 消息分区 ;serializedKeySize :序列化后的消息 key字节数 ;serializedValueSize :序列化后的消息 value 字节数

拦截器

producer 拦截器 interceptor是 个相当新的功能,它和 consumer interceptor 是在 Kafka 0.10.0.0 版本中被引入的,主要用于实现 clients 端的定制化控制逻辑,对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做些定制化需求,比如修改消息等,同时 producer 允许用户指定多个 interceptor 按序作用于同条消息从而形成 个拦截链( interceptor chain ),拦截器的接口为.Producerlnterceptor,实现该接口即可定义自己的拦截器。

序列化器

在网络中发送数据都是以字节的方式, Kafka 也不例外。 Kafka支持用户给 broker发送各种类型的消息。它可以是一个字符串、一个整数、一个数组或是其他任意的对象类型。序列化器( serializer )负责在 producer 发送前将消息转换成字节数组:而与之相反,解序列化器( deserializer )则用于将 consumer 接收到的字节数组转换成相应的对象。 Kafka 1.0.0 默认提供了十几种序列化器,其中常用的 serializer 如下

  • ByteArraySerializer :本质上什么都不用做,因为己经是字节数组了。
  • ByteBufferSerializer :序列化 ByteBuffer
  • BytesSerializer :序列化 Kafka 自定义的 Bytes 类
  • DoubleSerializer :序列化 Double 类型
  • IntegerSerializer :序列化 Integer 类型
  • LongSerializer :序列化 Long 类型
  • StringSerializer :序列化 String 类型

当然用户也可以自定义序列化器。

生产者分区

Kafka的一个topic由多个partition组成, partition就是分区,这些partition分散在不同brocker中,以实现请求负载的目的提高吞吐量。producer 提供了分区策略以及对应的分区器 partitioner )供用户使用 Kafka 发布的默认 partitioner 会尽力确保具有相同 key 的所有消息都会被发送到相同的分区上

  • 若没有为消息指定 key:则该 partitioner 会选择轮询的方式来确保消息在 topic 的所有分区上均匀分配
  • 若有为消息指定 key :producer 自带的 partitioner 会根据 murmur2 算法计算消息key 的哈希值,然后对总分区数求模得到消息要被发送到的目标分区号 但是有的时候用户可能想实现自己的分区策略就可以使用 producer 提供的自定义分区策略了。
Kafka Brocker 设计原理 Kafka的架构

第一篇文章就介绍过, Kafka 是分布式的消息引擎集群环境,它支持自动化的服务发现与成员管理。那么它是如何做到的呢? 学过SpringCloud的同学应该很容易理解 ,Kafka 是依赖 Apache ZooKeeper 实现的 。每当一个broker 启动时(一个brocker就是一个kafka服务器),它会将自己注册到 ZooKeeper(注册:主机,端口,启动时间等,以JSON格式保存),Zookeeper注册中心会形成一个Brocker的注册信息表:

在这里插入图片描述

我这里用来一个ZooView工具查看了一下zookeeper中的注册信息

在这里插入图片描述

  • /brokers :里面保存了 Kafka 集群的所有信息,包括每台 broker 注册信息,集群上所有topic 的信息等。
  • /controller :保存了 Kafka controller 组件的注册信息( controller 责集群的领导者选举,同时也负责 controller 的动态选举。)
  • /admin :保存管理脚本的输出结果,比如删除 topic ,对分区进行重分配等操作。
  • /isr_change_ notification :保存 ISR 列表发生变化的分区列表。 controller 会注册一个监听器实时监控该节点下子节点的变更。
  • config :保存了 Kafka 集群下各种资源的定制化配置信息,比如每个 topic 可能有自己专属的 组配置,那么就保存在/config/topics/
关注
打赏
1651329177
查看更多评论
立即登录/注册

微信扫码登录

0.0366s