Kafka
Kafaka
概念
传统定义
kafka是一个分布式基于发布订阅模式的消息队列,主要应用于大数据实时处理领域
发布订阅
发布者不将消息直接发送给订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息
最新的定义
开源的分布式事件流平台、用于高性能数据管道、流分析、数据集成、关键任务应用。
基础架构
- 一个topic分为多个分区用来提高吞吐量
- 配合分区,提出消费者组的概念,组内每个消费者消费一个分区的数据,并行消费
- 提高可用性每个partition 可以存在若干副本
- zk中记录partition中 谁是leader,kafka2.80之后也可以不采用zk
名词解释
Producer
生产者,向kafka发送消息的客户端
Consumer
消费者,向kafka取消息的客户端
Consumer Group (CG)
消费者组,由多个consumer组成。组内每个消费者负责消费不同的分区,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某一个消费者组,消费者组就是逻辑上的一个订阅者。
Broker
一台kafka服务器就是一个broker,一个集群由多个broker组成。一个broker可以容纳多个topic
Topic
可以理解成一个队列,生产者和消费者都是面向一个topic
Partition
为了实现拓展性,一个非常大的topic可以分不到多个broker上,一个topic可以分为多个partiton,每个partition都是一个有序队列
Replica
副本。一个topic的每个分区都有若干个副本,一个leader和若干个follower
Leader
每个分区多个副本的“主”,生产者发送数据的对象以及消费者消费数据的对象都是leader
Follower
每个分区多个副本的“从”,实时从leader同步数据,保持和leader的数据同步。leader故障时,某个follower会成为新的leader。
生产者发送流程
发送原理
1个partition对应1个Deque,1个Deque内有多个Batch,1个Batch内可放N条消息。
参数列表
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的 broker 地址清单。 例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。 |
buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m。 |
batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
生产者分区
优点
- 使用分区可以合理存储资源,每个partition在一个broker上存储,可以将海量数据按照分区切割成一块一块的数据存储在多台broker上。合理控制分区任务可以实现负载均衡的效果
- 提高并行发送以及消费消息的能力。生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据。
分区策略
1 | /** |
- 指明partiton的情况下,就写入指定的partition
- 没有指名partition的情况下,使用key的hash值与partition数量取余得到写入partition的值
- 既没有指明partition,又没有key值的情况下,采用sticky partition,随机选择一个分区,尽可能的使用这个分区,当这个分区的batch满了之后再随机选择另一个分区
自定义分区器
- 实现Partitioner接口
- 重写partition()方法
生产经验
提高生产者的吞吐量
修改相关参数:
- batch.size 批次大小,默认16kb
- linger.ms 等待时间,修改为5-100ms
- compression.type: 压缩snappy
- RecordAccumulator:缓冲区大小,修改为64m
数据可靠性
acks应答级别:
- 0
丢数 - 1
丢数 - -1
不会丢数,如果follow故障迟迟没有同步leader数据,就会被剔除ISR队列
数据完全可靠的条件:
acks应答级别=-1 + 分区副本 >= 2 + ISR应答的最小副本数量大于等于2
可靠性总结:
acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。
数据重复
数据传递语义
至少一次
ack级别设置为 -1 + 分区副本数量 >= 2 + ISR 中应答的最小数量大于等于2
最多一次
ack级别设置为0
至少一次可以保证数据不丢失,但是不保证数据不重复
最多一次可以保证数据不重复,但是不能保证数据不丢失
- 精确一次:重要信息要求数据不重复也不丢失,可以使用kafka0.11版本之后引入的幂等性和事务
幂等性
以http请求为例,幂等性就是指多次http请求,相当于一次请求,不会由于重复请求而造成不良影响。
kafka的幂等性就是需要保证producer发送多条数据,kafka只持久化一条。当生产者生产消息出现retry时就可能造成一条消息发送多次。
- 幂等性配置
开启参数 enable.idempotence 默认为 true,false 关闭。 - 原理
kafka引入了 PID和 sequence number的概念与partion形成一个主键(PID, partition, sequence number)- producer初始化的时候会分配一个pid
- sequence number 每个生产者发送到分区的消息都对应一个从0开始递增的sequence number,如果发生重试,发现相同的producerid 和sequence number 后,broker就不会落盘
- 幂等性只能保证单分区的数据不重复
事务
Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败
Kafka 事务分为生产者事务和消费者事务,但它们并不是强绑定的关系,消费者主要依赖自身对事务进行控制,因此这里我们主要讨论的是生产者事务。
Producer中事务相关的API
1 | 1.initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作 |
数据乱序
保证数据有序的条件
- 未开启幂等性
max.in.flight.requests.per.connection
需要设置为1。
- 开启幂等性
max.in.flight.requests.per.connection
需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据, 故无论如何,都可以保证最近5个request的数据都是有序的。在kafka集群会对缓存5个以下的请求重新排序
kafka broker工作流程
broker启动后会在zookeeper上存储相关信息。
/kafka/brokers/ids 记录有哪些服务器
/kafka/brokers/topics/first/partitions/0/state
{“leader”:1, “isr”:[0,1,2]} 记录leader 和可用服务器
/kafka/controller 辅助leader选举
{“brokerid”:0}
参数 | 描述 |
---|---|
replica.lag.time.max.ms | ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。 |
auto.leader.rebalance.enable | 默认是 true。 自动Leader Partition 平衡。 |
eader.imbalance.per.broker.percentage | 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。 |
log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 |
log.retention.hours | Kafka 中数据保存的时间,默认 7 天。 |
log.retention.minutes | Kafka 中数据保存的时间,分钟级别,默认关闭。 |
log.retention.ms | Kafka 中数据保存的时间,毫秒级别,默认关闭。 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是 5 分钟。 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。 |
log.cleanup.policy | 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。 |
num.io.threads | 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。 |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的 50%的 1/3 |
num.network.threads | 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。 |
kafka副本
副本的基本信息
Kafka副本作用:提高数据可靠性
默认1个副本,为了保证数据可靠性,生产环境一般配置2个副本,过多的副本会增加磁盘的存储空间,影响网络数据传输效率。
副本分为Leader和follower。Producer只会将数据发送给Leader。follower负责同步Leader的数据。
所有的副本统称 AR (assigned repllicas)
AR = ISR + OSR
OSR 表示 follower 与leader同步时延迟过大的副本
leader & follower 故障处理细节
LEO (log end offset): 每个副本最后一个offset,等价于最新的offset + 1
HW (HIgh Watermark): 所有副本中最小的LEO
Follower 故障切换
- follower 踢出 ISR队列
- Leader和其他follower继续同步数据
- follower恢复后 读取 本地记录的HW值 将 log 中高于HW部分的数据剔除掉,然后与leader进行数据同步
- 当该follower的LEO大于等于最新的HW后就可以重新加入ISR队列
Leader故障
从ISR队列当中重新选举Leader
保证副本的一致性 其余Follower将高于HW部分截掉,然后从新的Leader同步数据
只能保证副本间的数据同步,不保证数据不丢失或者不重复
分区副本的分配
副本尽可能交叉分配,提高数据可用性,防止集中在某一台broker上,当宕机时无副本数据可用。
手动调整分区副本存储
实际环境中,每一台服务器的性能和配置是不一样的,如果默认按照kafka的副本分配方式可能会导致部分服务器数据存储压力大。所以需要手动分配副本的存储。1
2
3
4
5
6{
"version":1, "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}1
2
3bin/kafka-reassign-partitions.sh -- bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
bin/kafka-reassign-partitions.sh -- bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
hadoop102:9092 --describe --topic three
Leader partiton 负载平衡
由于leader故障重新选举的原因,leader可能会大量分布在某个broker上,因而给服务器造成很大的压力,所以需要对leader进行再平衡
参数名称 | 描述 |
---|---|
auto.leader.rebalance.enable | 默认是 true。 自动Leader Partition 平衡。生产环境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为 false 关闭。 |
leader.imbalance.per.broker.percentage | 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。 |
增加副本
由于某个主题的重要性提升,需要提高该主题的可靠性,所以需要增加副本
文件存储
文件存储机制
Kafka 有 Topic 和 Partition 两个概念,一个 Topic 可以有多个 Partition。在实际存储的时候,Topic + Partition 对应一个文件夹,这个文件夹对应的是这个 Partition 的数据。
在 Kafka 的数据文件目录下,一个 Partition 对应一个唯一的文件夹。如果有 4 个 Topic,每个 Topic 有 5 个 Partition,那么一共会有 4 * 5 = 20 个文件夹。而在文件夹下,Kafka 消息是采用 Segment File 的存储方式进行存储的。
Segment File 的大概意思是:将大文件拆分成小文件来存储,这样一个大文件就变成了一段一段(Segment 段)。这样的好处是 IO 加载速度快,不会有很长的 IO 加载时间。Kafka 的消息存储就采用了这种方式。
在 Kafka 的数据文件夹下,分为两种类型的文件:索引文件(Index File)和数据文件(Data File)。索引文件存的是消息的索引信息,帮助快速定位到某条消息。数据文件存储的是具体的消息
索引文件
partition的第一个segment从0开始,后续每个segment文件名,为上一个segment文件最后一条消息的offset+1
索引文件存储的是简单地索引数据,其格式为:「N,Position」。其中 N 表示索引文件里的第几条消息,而 Position 则表示该条消息在数据文件(Log File)中的物理偏移地址。例如下图中的「3,497」表示:索引文件里的第 3 条消息(即 offset 368772 的消息,368772 = 368769+3),其在数据文件中的物理偏移地址为 497。
index为稀疏索引,每往log写4kb的数据就会往index文件写入一条索引。
log.index.internal.bytes
默认4kb的数据index文件保存的是相对offset,这样可以保证offset的值占的空间不会过大,能将offset控制在固定大小。
参数 | 描述 |
---|---|
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。 |
log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。 |
数据文件
实战消息查找
例如我们要读取 Topic 为 Order、Partition 为 1,并且 offset 为 368775 的 Kafka 消息内容,我们应该怎么做呢?假设文件索引图如下:
- 定位到具体的文件夹,以Topic+Partition命名
- 定位索引文件范围
000000.index -> 000000-368769
368769.index -> 368770-737337
737337.index -> 737338-1105814
寻找的是 offset 为 368775 的消息,那其索引数据就存储在 368769.index 这个索引文件中。我们要读取的消息为 368775,那么这条消息在索引文件中是第 6 条消息。第 6 条在数据文件的物理位置为 1407。 - 读取消息内容
结合偏移量以及消息的物理结构,直接读取到 offset 为 368775 的消息内容了
文件清理
默认日志保存的时间是7天,可以通过保存如下参数修改保存时间。
- log.retention.hours 最低优先级小时,默认7天
- log.retention.minutes 分钟
- log.retention.ms 最高优先级毫秒
- log.retention.check.interval.ms 检查周期,默认 5分钟
清理策略
- delete
log.cleanup.policy = delete 所有数据启用删除策略
1. 基于时间:默认打开。以 segment 中所有记录中的**最大时间戳作为该文件时间戳**(根据该时间戳判断文件是不是新的)。
基于大小:默认关闭。超过设置的所有日志的总大小时,删除最老的segment
log.retention.bytes
默认=-1 表示无穷大
- compact
将相同的key不同的value保留最小版本,压缩完成后如果offset不连续取不到,就会使用后一个offset对应的消息。这种策略适用于例如key是用户id,value是用户资料,通过压缩可以保证整个消息集合中保存的是最新的用户资料。
高效读取数据
kafka是分布式集群,采用了分区技术,多个分区之间可以并行读取,并行度高
数据存储采用了segment分段以及稀疏索引,可以快速定位需要读取的记录
采用顺序写盘的策略,producer生产的信息都是在log文件尾部追加的。避免了机械硬盘的磁头寻址。
页缓存 + 零拷贝技术
依赖操作系统内核的 pagecache 技术,存数据时会将数据存入页缓存中,度数据时先去页缓存中查询,查不到时 再去磁盘查然后缓存到页缓存,返回给消费者。
零拷贝技术就是 kafka集群只负责数据的存储,不会对数据进行加工、序列化、反序列化等操作,这些操作全部由生产者和消费者自己实现,消费者来取数据的时候查到了直接从网卡拿数据,效率更高。
消费者
消费方式
推模式
消息队列主动推送消息给消费者
拉模式
消费者主动拉取消息
kafka采用的是拉模式,因为每个消费者的消费速率不同,如果过采用推模式会导致一些消费者跟不上消费速率。采用拉模式,消费者可以按照自己的消费速率来消费消息。缺点就是消息队列为空时会空转。
Kafka消费工作流程
- 每个分区只能由消费者组当中的一个消费者消费
- 一个消费者可以消费多个分区
消费者组原理
在同一消费者组中的消费者共用一个groupId
- 消费组中不同的消费者消费不同的分区,一个分区只能被一个消费者消费
- 消费者组之间不受影响,消费者组就是逻辑上的一个订阅者。一个消费者可以看成只有一个消费者的消费者组。所以所有的消费者都有所属的消费者组。
- 如果消费者组中的消费者数量大于分区数,分区分配完成后就会有闲置的消费者,不会接收任何消息。
消费者组的初始化流程:
消费者组详细消费流程:
消费者相关参数
参数名称 | 描述 |
---|---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 |
key.deserializer 和value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向Kafka 提交的频率,默认 5s。 |
auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | consumer_offsets 的分区数,默认是 50 个分区。 |
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms , 也不应该高于session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes ( brokerconfig)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。 |
分区分配的策略
Range策略
range策略是针对每一个topic的, 首先对topic中的分区进行排序,然后对消费者按照字典序排序,通过partition数/consumer数来计算每个consumer消费几个partition,有余数则将余数平均的分配给开头几个消费者
对于一个topic而言 c0多消费一个分区影响不大,但是如果有N个topic,会导致c0多消费N个分区,容器产生数据倾斜
RoundRobin策略
针对所有topic,分区依次进行分配
Sticky策略
针对所有topic的分区,在执行分配之前尽可能的考虑上一次的分配结果。首先尽可能将分区均匀分配给每一个消费者,尽量保持原有分配的分区没有变化
重复消费和漏消费
重复消费
kafka自动提交默认5s一次,假设5s第一次提交的offset 为 2 ,在第二个5s提交之前Consumer在继续消费,在第二次自动提交之前consumer宕机了,这时候offset就没有提交成功,consumer恢复的时候还是从offset = 2的位置消费,这样就会导致重复消费的问题。
漏消费
offset为手动提交,如果提交offset之后,消费者的数据还没有落盘就挂了,实际上就是漏消费了
消费者事务
如果需要consumer精确一次消费就需要将kafka消费过程和提交offset过程做原子绑定。
数据积压
如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数= 分区数。(两者缺一不可)
如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。
参数名称 | 描述 |
---|---|
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (brokerconfig)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条 |
相关文档
Kafka与RocketMq的对比 https://www.cnblogs.com/BYRans/p/6100653.html