Kafaka

概念

传统定义

​ kafka是一个分布式基于发布订阅模式的消息队列,主要应用于大数据实时处理领域

发布订阅

​ 发布者不将消息直接发送给订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息

最新的定义

​ 开源的分布式事件流平台、用于高性能数据管道、流分析、数据集成、关键任务应用。

基础架构

  • 一个topic分为多个分区用来提高吞吐量
  • 配合分区,提出消费者组的概念,组内每个消费者消费一个分区的数据,并行消费
  • 提高可用性每个partition 可以存在若干副本
  • zk中记录partition中 谁是leader,kafka2.80之后也可以不采用zk

名词解释

  • Producer

    生产者,向kafka发送消息的客户端

  • Consumer

    消费者,向kafka取消息的客户端

  • Consumer Group (CG)

    消费者组,由多个consumer组成。组内每个消费者负责消费不同的分区,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某一个消费者组,消费者组就是逻辑上的一个订阅者。

  • Broker

    一台kafka服务器就是一个broker,一个集群由多个broker组成。一个broker可以容纳多个topic

  • Topic

    可以理解成一个队列,生产者和消费者都是面向一个topic

  • Partition

    为了实现拓展性,一个非常大的topic可以分不到多个broker上,一个topic可以分为多个partiton,每个partition都是一个有序队列

  • Replica

    副本。一个topic的每个分区都有若干个副本,一个leader和若干个follower

  • Leader

    每个分区多个副本的“主”,生产者发送数据的对象以及消费者消费数据的对象都是leader

  • Follower

    每个分区多个副本的“从”,实时从leader同步数据,保持和leader的数据同步。leader故障时,某个follower会成为新的leader。

生产者发送流程

发送原理

1个partition对应1个Deque,1个Deque内有多个Batch,1个Batch内可放N条消息。

参数列表

参数名称 描述
bootstrap.servers 生产者连接集群所需的 broker 地址清单。 例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息。
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m。
batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性,默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

生产者分区

优点

  1. 使用分区可以合理存储资源,每个partition在一个broker上存储,可以将海量数据按照分区切割成一块一块的数据存储在多台broker上。合理控制分区任务可以实现负载均衡的效果
  2. 提高并行发送以及消费消息的能力。生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据。

218cd61e-e98b-422b-b377-4afa801c7a48

分区策略

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
*The default partitioning strategy:
*<ul>
*<li>If a partition is specified in the record, use it
*<li>If no partition is specified but a key is present choose a partition based on a hash of the key
*<li>If no partition or key is present choose the sticky partition that changes when the batch is full.
*
*See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {

… …
}
  1. 指明partiton的情况下,就写入指定的partition
  2. 没有指名partition的情况下,使用key的hash值与partition数量取余得到写入partition的值
  3. 既没有指明partition,又没有key值的情况下,采用sticky partition,随机选择一个分区,尽可能的使用这个分区,当这个分区的batch满了之后再随机选择另一个分区

自定义分区器

  1. 实现Partitioner接口
  2. 重写partition()方法

生产经验

提高生产者的吞吐量

修改相关参数:

  • batch.size 批次大小,默认16kb
  • linger.ms 等待时间,修改为5-100ms
  • compression.type: 压缩snappy
  • RecordAccumulator:缓冲区大小,修改为64m

数据可靠性

acks应答级别:

  • 0
    丢数
  • 1
    ​ 丢数
  • -1
    不会丢数,如果follow故障迟迟没有同步leader数据,就会被剔除ISR队列

数据完全可靠的条件:
acks应答级别=-1 + 分区副本 >= 2 + ISR应答的最小副本数量大于等于2
可靠性总结:

acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;

acks=-1,生产者发送过来数据LeaderISR队列里面所有Follwer应答,可靠性高,效率低;

在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。

数据重复

数据传递语义

  • 至少一次

    ack级别设置为 -1 + 分区副本数量 >= 2 + ISR 中应答的最小数量大于等于2

  • 最多一次

    ack级别设置为0

至少一次可以保证数据不丢失,但是不保证数据不重复

最多一次可以保证数据不重复,但是不能保证数据不丢失

  • 精确一次:重要信息要求数据不重复也不丢失,可以使用kafka0.11版本之后引入的幂等性和事务

幂等性

以http请求为例,幂等性就是指多次http请求,相当于一次请求,不会由于重复请求而造成不良影响。

kafka的幂等性就是需要保证producer发送多条数据,kafka只持久化一条。当生产者生产消息出现retry时就可能造成一条消息发送多次。

  • 幂等性配置
    ​ 开启参数 enable.idempotence 默认为 true,false 关闭。
  • 原理
    kafka引入了 PID和 sequence number的概念与partion形成一个主键(PID, partition, sequence number)
    • producer初始化的时候会分配一个pid
    • sequence number 每个生产者发送到分区的消息都对应一个从0开始递增的sequence number,如果发生重试,发现相同的producerid 和sequence number 后,broker就不会落盘
    • 幂等性只能保证单分区的数据不重复

事务

Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败

Kafka 事务分为生产者事务和消费者事务,但它们并不是强绑定的关系,消费者主要依赖自身对事务进行控制,因此这里我们主要讨论的是生产者事务。

Producer中事务相关的API

1
2
3
4
5
1.initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
2.beginTransaction(开始事务):启动一个Kafka事务
3.sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
4.commitTransaction(提交事务):提交事务
5.abortTransaction(放弃事务):取消事务

数据乱序

保证数据有序的条件

  1. 未开启幂等性

max.in.flight.requests.per.connection需要设置为1。

  1. 开启幂等性

max.in.flight.requests.per.connection需要设置小于等于5。

原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据, 故无论如何,都可以保证最近5个request的数据都是有序的。在kafka集群会对缓存5个以下的请求重新排序

kafka broker工作流程

broker启动后会在zookeeper上存储相关信息。

  • /kafka/brokers/ids 记录有哪些服务器

  • /kafka/brokers/topics/first/partitions/0/state

    {“leader”:1, “isr”:[0,1,2]} 记录leader 和可用服务器

  • /kafka/controller 辅助leader选举

​ {“brokerid”:0}

参数 描述
replica.lag.time.max.ms ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。
auto.leader.rebalance.enable 默认是 true。 自动Leader Partition 平衡。
eader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。
log.retention.hours Kafka 中数据保存的时间,默认 7 天。
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是 5 分钟。
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。

kafka副本

副本的基本信息

  1. Kafka副本作用:提高数据可靠性

  2. 默认1个副本,为了保证数据可靠性,生产环境一般配置2个副本,过多的副本会增加磁盘的存储空间,影响网络数据传输效率。

  3. 副本分为Leader和follower。Producer只会将数据发送给Leader。follower负责同步Leader的数据。

  4. 所有的副本统称 AR (assigned repllicas)

    AR = ISR + OSR

​ OSR 表示 follower 与leader同步时延迟过大的副本

leader & follower 故障处理细节

36f65477-e9db-4be5-80c8-33399f6a428c

LEO (log end offset): 每个副本最后一个offset,等价于最新的offset + 1

HW (HIgh Watermark): 所有副本中最小的LEO

Follower 故障切换

  1. follower 踢出 ISR队列
  2. Leader和其他follower继续同步数据
  3. follower恢复后 读取 本地记录的HW值 将 log 中高于HW部分的数据剔除掉,然后与leader进行数据同步
  4. 当该follower的LEO大于等于最新的HW后就可以重新加入ISR队列

Leader故障

  1. 从ISR队列当中重新选举Leader

  2. 保证副本的一致性 其余Follower将高于HW部分截掉,然后从新的Leader同步数据

    只能保证副本间的数据同步,不保证数据不丢失或者不重复

分区副本的分配

副本尽可能交叉分配,提高数据可用性,防止集中在某一台broker上,当宕机时无副本数据可用。

b98a786b-0b8c-4767-8f5b-b61f19f567f8

手动调整分区副本存储

实际环境中,每一台服务器的性能和配置是不一样的,如果默认按照kafka的副本分配方式可能会导致部分服务器数据存储压力大。所以需要手动分配副本的存储。

1
2
3
4
5
6
{
"version":1, "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}

1
2
3
bin/kafka-reassign-partitions.sh -- bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
bin/kafka-reassign-partitions.sh -- bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
hadoop102:9092 --describe --topic three

Leader partiton 负载平衡

由于leader故障重新选举的原因,leader可能会大量分布在某个broker上,因而给服务器造成很大的压力,所以需要对leader进行再平衡

参数名称 描述
auto.leader.rebalance.enable 默认是 true。 自动Leader Partition 平衡。生产环境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为 false 关闭。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。

增加副本

由于某个主题的重要性提升,需要提高该主题的可靠性,所以需要增加副本

文件存储

文件存储机制

Kafka 有 Topic 和 Partition 两个概念,一个 Topic 可以有多个 Partition。在实际存储的时候,Topic + Partition 对应一个文件夹,这个文件夹对应的是这个 Partition 的数据。

在 Kafka 的数据文件目录下,一个 Partition 对应一个唯一的文件夹。如果有 4 个 Topic,每个 Topic 有 5 个 Partition,那么一共会有 4 * 5 = 20 个文件夹。而在文件夹下,Kafka 消息是采用 Segment File 的存储方式进行存储的。

Segment File 的大概意思是:将大文件拆分成小文件来存储,这样一个大文件就变成了一段一段(Segment 段)。这样的好处是 IO 加载速度快,不会有很长的 IO 加载时间。Kafka 的消息存储就采用了这种方式。

14e09808-5d1d-4d69-b94a-d5b5a6820402
在 Kafka 的数据文件夹下,分为两种类型的文件:索引文件(Index File)和数据文件(Data File)。索引文件存的是消息的索引信息,帮助快速定位到某条消息。数据文件存储的是具体的消息

索引文件

partition的第一个segment从0开始,后续每个segment文件名,为上一个segment文件最后一条消息的offset+1

upload successful

索引文件存储的是简单地索引数据,其格式为:「N,Position」。其中 N 表示索引文件里的第几条消息,而 Position 则表示该条消息在数据文件(Log File)中的物理偏移地址。例如下图中的「3,497」表示:索引文件里的第 3 条消息(即 offset 368772 的消息,368772 = 368769+3),其在数据文件中的物理偏移地址为 497。

upload successful

  1. index为稀疏索引,每往log写4kb的数据就会往index文件写入一条索引。 log.index.internal.bytes 默认4kb的数据

  2. index文件保存的是相对offset,这样可以保证offset的值占的空间不会过大,能将offset控制在固定大小。

参数 描述
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。

数据文件

upload successful

实战消息查找

例如我们要读取 Topic 为 Order、Partition 为 1,并且 offset 为 368775 的 Kafka 消息内容,我们应该怎么做呢?假设文件索引图如下:
upload successful
upload successful

  1. 定位到具体的文件夹,以Topic+Partition命名
  2. 定位索引文件范围
    000000.index -> 000000-368769
    368769.index -> 368770-737337
    737337.index -> 737338-1105814
    寻找的是 offset 为 368775 的消息,那其索引数据就存储在 368769.index 这个索引文件中。我们要读取的消息为 368775,那么这条消息在索引文件中是第 6 条消息。第 6 条在数据文件的物理位置为 1407。
  3. 读取消息内容
    结合偏移量以及消息的物理结构,直接读取到 offset 为 368775 的消息内容了

文件清理

默认日志保存的时间是7天,可以通过保存如下参数修改保存时间。

  • log.retention.hours 最低优先级小时,默认7天
  • log.retention.minutes 分钟
  • log.retention.ms 最高优先级毫秒
  • log.retention.check.interval.ms 检查周期,默认 5分钟

清理策略

  • delete

​ log.cleanup.policy = delete 所有数据启用删除策略

    1. 基于时间:默认打开。以 segment 中所有记录中的**最大时间戳作为该文件时间戳**(根据该时间戳判断文件是不是新的)。
  1. 基于大小:默认关闭。超过设置的所有日志的总大小时,删除最老的segment log.retention.bytes默认=-1 表示无穷大

  • compact

将相同的key不同的value保留最小版本,压缩完成后如果offset不连续取不到,就会使用后一个offset对应的消息。这种策略适用于例如key是用户id,value是用户资料,通过压缩可以保证整个消息集合中保存的是最新的用户资料。

高效读取数据

  1. kafka是分布式集群,采用了分区技术,多个分区之间可以并行读取,并行度高

  2. 数据存储采用了segment分段以及稀疏索引,可以快速定位需要读取的记录

  3. 采用顺序写盘的策略,producer生产的信息都是在log文件尾部追加的。避免了机械硬盘的磁头寻址。

  4. 页缓存 + 零拷贝技术

    依赖操作系统内核的 pagecache 技术,存数据时会将数据存入页缓存中,度数据时先去页缓存中查询,查不到时 再去磁盘查然后缓存到页缓存,返回给消费者。

    零拷贝技术就是 kafka集群只负责数据的存储,不会对数据进行加工、序列化、反序列化等操作,这些操作全部由生产者和消费者自己实现,消费者来取数据的时候查到了直接从网卡拿数据,效率更高。

5e6d5e62-f43c-45ea-9961-2ddaa3140fcf

消费者

消费方式

  • 推模式

    消息队列主动推送消息给消费者

  • 拉模式

    消费者主动拉取消息

kafka采用的是拉模式,因为每个消费者的消费速率不同,如果过采用推模式会导致一些消费者跟不上消费速率。采用拉模式,消费者可以按照自己的消费速率来消费消息。缺点就是消息队列为空时会空转。

Kafka消费工作流程

  • 每个分区只能由消费者组当中的一个消费者消费
  • 一个消费者可以消费多个分区

消费者组原理

在同一消费者组中的消费者共用一个groupId

  • 消费组中不同的消费者消费不同的分区,一个分区只能被一个消费者消费
  • 消费者组之间不受影响,消费者组就是逻辑上的一个订阅者。一个消费者可以看成只有一个消费者的消费者组。所以所有的消费者都有所属的消费者组。
  • 如果消费者组中的消费者数量大于分区数,分区分配完成后就会有闲置的消费者,不会接收任何消息。

消费者组的初始化流程:

消费者组详细消费流程:

消费者相关参数

参数名称 描述
bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和value.deserializer 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
group.id 标记消费者所属的消费者组。
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向Kafka 提交的频率,默认 5s。
auto.offset.reset 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions consumer_offsets 的分区数,默认是 50 个分区。
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms , 也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes ( brokerconfig)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

分区分配的策略

Range策略

range策略是针对每一个topic的, 首先对topic中的分区进行排序,然后对消费者按照字典序排序,通过partition数/consumer数来计算每个consumer消费几个partition,有余数则将余数平均的分配给开头几个消费者

对于一个topic而言 c0多消费一个分区影响不大,但是如果有N个topic,会导致c0多消费N个分区,容器产生数据倾斜

RoundRobin策略

针对所有topic,分区依次进行分配

Sticky策略

针对所有topic的分区,在执行分配之前尽可能的考虑上一次的分配结果。首先尽可能将分区均匀分配给每一个消费者,尽量保持原有分配的分区没有变化

重复消费和漏消费

重复消费

kafka自动提交默认5s一次,假设5s第一次提交的offset 为 2 ,在第二个5s提交之前Consumer在继续消费,在第二次自动提交之前consumer宕机了,这时候offset就没有提交成功,consumer恢复的时候还是从offset = 2的位置消费,这样就会导致重复消费的问题。

漏消费

offset为手动提交,如果提交offset之后,消费者的数据还没有落盘就挂了,实际上就是漏消费了

消费者事务

如果需要consumer精确一次消费就需要将kafka消费过程和提交offset过程做原子绑定。

数据积压

  1. 如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数= 分区数。(两者缺一不可)

  2. 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。

参数名称 描述
fetch.max.bytes 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (brokerconfig)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条

相关文档

Kafka与RocketMq的对比 https://www.cnblogs.com/BYRans/p/6100653.html