说明
- 本文基于 kafka 2.7 编写。
- @author blog.jellyfishmix.com / JellyfishMIX - github
- LICENSE GPL-2.0
消息语意
消息语义(诉求)有三种。分别是: 消息最多传递一次(消息不重复), 消息最少传递一次(消息不丢失), 消息有且仅有一次传递(消息不重复且不丢失)。
- 消息最多传递一次(消息不重复),: 消息最多传递一次,可能会丢,但不会重复。适用于高并发量、高吞吐,但是对于消息的丢失不是很敏感的场景。
- 消息最少传递一次(消息不丢失): 消息不丢失,但有可能重复。适用于并发量一般,对于消息重复传递不敏感的场景。
- 消息有且仅有一次传递(消息不重复且不丢失): 适用于对消息可靠性要求高,且对吞吐量要求不高的场景。
kafka 如何做到消息不丢失
producer 不少生产消息,borker 服务端不丢失消息,consumer 也不能少消费消息。
producer: 不少生产消息
以下是为了保证消息不丢失,生产端需要配置的参数和相关使用方法。
- 要使用带回调方法的 api:
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
使用带有回调方法的 API 时,我们可以根据回调函数得知消息是否发送成功,如果发送失败了,我们要进行异常处理,比如把失败消息存储到本地硬盘或远程数据库,等应用正常了再发送,这样才能保证消息不丢失。
- 设置参数 acks=-1。acks 这个参数是指分区 leader 收到多少分区副本收到消息后,producer 才认为消息发送成功了,可选的参数值有 0, 1 和 -1。
- acks=0,表示生产者不等待任何服务器节点的响应,只要发送消息就认为成功。
- acks=1(默认值),表示生产者收到 leader 分区的响应就认为发送成功。
- acks=-1,表示只有当 ISR(ISR 的含义后面我会详细介绍)中的副本全部收到消息时,生产者才会认为消息生产成功了。这种配置是最安全的,因为如果 leader 副本挂了,当 follower 副本被选为 leader 副本时,消息也不会丢失。但是系统吞吐量会降低,因为生产者要等待所有副本都收到消息后才能再次发送消息。
borker 服务端: 不丢失消息
- 设置 replication.factor >1。replication.factor 这个参数表示分区副本的个数,这里我们要将其设置为大于 1 的数,这样当 leader 副本挂了,follower 副本还能被选为 leader 副本继续接收消息。
- 设置 min.insync.replicas >1。min.insync.replicas 指的是 ISR 最少的副本数量,原理同上,也需要大于 1 的副本数量来保证消息不丢失。
- tips: ISR。ISR 是一个分区部分副本的集合,每个分区都有自己的一个 ISR 集合。不是所有的副本都会在这个集合里,首先 leader 自身是在 ISR 集合里的,ISR 集合里的 follower 和 leader 消息是保持一致的,落后于 leader 的 follower 会被从 ISR 集合里被淘汰出去。
- 设置 unclean.leader.election.enable=false。指是否能把非 ISR 集合中的副本选举为 leader 副本。true 表示允许非 ISR 集合中的 follower 副本成为 leader 副本。非 ISR 集合 follower 和 leader 消息不一致,如果获得选举资格被选举为新 leader 后会丢失消息。
consumer: 不能少消费消息
- 设置 enable.auto.commit=false。表示 consuemr 消费偏移量是否自动提交。
- 消费者消费消息是有两个步骤的,首先拉取消息,然后再处理消息。向服务端提交消费偏移量可以手动提交也可以自动提交。如果为 true 表示 offset 是由 consumer 自动提交,由异步线程去完成的,业务线程无法控制。如果刚拉取了消息之后,kafka 异步线程自动提交了消费偏移量,此时业务处理还没执行完 consumer 挂了,这就造成还没进行完业务处理的消息 offset 已经被提交了,下次再消费时消费不到此条消息,造成消息的丢失。因此防止消息丢失需要要设置 enable.auto.commit=false,由业务手动提交消息偏移量。
- enable.auto.commit=false, 业务消费完成后手动提交消费 offset 有重复消费的风险。风险触发时机是在业务消费完成后,手动提交 offset 前 consumer 挂了。由于 borker 上此 partition 的 consume offset 还没被提交,下次 consumer 拉取时还会拉取到此条消息。
kafka 如何做到消息不重复
producer: 不重复发送消息
- producer 如遇网络问题未获得响应,就无法判断该消息是否成功提交到了 broker,如果配置了重试次数,会引发生产端重新发送同一条消息,从而造成消息发送重复。
- producer 会获得 borker 分配的一个唯一 id,同一个 producer 中每条消息会获得一个唯一 id,如有需要可以用 redis senx 对唯一 id 做检查去重。或者业务线在消息内自己生成唯一 id 通过 redis setnx 来去重。
consumer: 关闭
- consumer 需要关闭 consume offset 自动提交。offset 自动提交是异步执行的,可能出现业务线程消费完成了,提交 offset 的线程挂了。
- 业务线在消息内自己生成唯一 id。
kafka 一个 partition 分区只能被一个 consumer 消费的原因
- offset 保存在 partition 处,每个 partition leader 仅有一个 conusme offset 属性,无法满足多个 consumer 共同访问。
- 如果允许一个 partition 被多个 consumer 同时消费,会有并发问题等。解决这些问题会让 partition 和 consume offset 设计变得复杂,开发维护成本变高,且可能引入新的问题。