kafaka

Kafka 是什么?主要应⽤场景有哪些?

Kafka 是⼀个分布式流式处理平台。这到底是什么意思呢? 流平台具有三个关键功能:

  1. 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。

  2. 容错的持久⽅式存储记录消息流: Kafka 会把消息持久化到磁盘,有效避免了消息丢失的⻛险·。

  3. 流式处理平台: 在消息发布的时候进⾏处理,Kafka 提供了⼀个完整的流式处理类库。

Kafka 主要有两⼤应⽤场景:

  1. 消息队列 :建⽴实时流数据管道,以可靠地在系统或应⽤程序之间获取数据。

  2. 数据处理: 构建实时的流数据处理程序来转换或处理数据流。

如何获取 topic 主题的列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

生产者和消费者的命令行是什么?

生产者在主题上发布消息:

bin/kafka-console-producer.sh --broker-list 1968.43.49:9092 --topicHello-Kafka

注意这里的 IP 是 server.properties 中的 listeners 的配置。接下来每个新行就是输入一条新消息。

消费者接受消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topicHello-Kafka --from-beginning

队列模型了解吗?Kafka 的消息模型知道吗?

队列模型:早期的消息模型

img

使⽤队列(Queue)作为消息通信载体,满⾜⽣产者与消费者模式,⼀条消息只能被⼀个消费者使⽤, 未被消费的消息在队列中保留直到被消费或超时。 ⽐如:我们⽣产者发送 100 条消息的话,两个消费者来消费⼀般情况下两个消费者会按照消息发送的顺序各⾃消费⼀半(也就是你⼀个我⼀个的消费。)

队列模型存在的问题:

假如我们存在这样⼀种情况:我们需要将⽣产者产⽣的消息分发给多个消费者,并且每个消费者都能接 收到完成的消息内容。

这种情况,队列模型就不好解决了。很多⽐较杠精的⼈就说:我们可以为每个消费者创建⼀个单独的队 列,让⽣产者发送多份。这是⼀种⾮常愚蠢的做法,浪费资源不说,还违背了使⽤消息队列的⽬的。

发布-订阅模型:Kafka 消息模型

发布-订阅模型主要是为了解决队列模型存在的问题。

img

发布订阅模型(Pub-Sub) 使⽤主题(Topic) 作为消息通信载体,类似于⼴播模式;发布者发布⼀条消息,该消息通过主题传递给所有的订阅者,在⼀条消息⼴播之后才订阅的⽤户则是收不到该条消息的。

在发布 - 订阅模型中,如果只有⼀个订阅者,那它和队列模型就基本是⼀样的了。所以说,发布 - 订阅模型在功能层⾯上是可以兼容队列模型的。

Kafka 采⽤的就是发布 - 订阅模型。

RocketMQ 的消息模型和 Kafka 基本是完全⼀样的。唯⼀的区别是 Kafka 中没有队列这个概念,与之对应的是 Partition(分区)。

什么是Producer、Consumer、Broker、Topic、Partition?

Kafka 将⽣产者发布的消息发送到 Topic(主题) 中,需要这些消息的消费者可以订阅这些

Topic(主题),如下图所示:

image-20210505003610463

上⾯这张图也为我们引出了,Kafka ⽐较重要的⼏个概念:

  1. Producer(⽣产者) : 产⽣消息的⼀⽅。

  2. Consumer(消费者) : 消费消息的⼀⽅。

  3. Broker(代理) : 可以看作是⼀个独⽴的 Kafka 实例。多个 Kafka Broker 组成⼀个 Kafka Cluster。

同时,你⼀定也注意到每个 Broker 中⼜包含了 Topic 以及 Partition 这两个重要的概念:

  • Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题)来消费消息。
  • Partition(分区) : Partition 属于 Topic 的⼀部分。⼀个 Topic 可以有多个 Partition,并且同⼀ Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明⼀个 Topic 可以横跨多个 Broker 。这正如我上⾯所画的图⼀样。

Kafka 的多副本机制了解吗?带来了什么好处?

还有⼀点我觉得⽐较重要的是 Kafka 为分区(Partition)引⼊了多副本(Replica)机制。分区(Partition)中的多个副本之间会有⼀个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进⾏同步

Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?
  1. Kafka 通过给特定 Topic 指定多个 Partition, ⽽各个 Partition 可以分布在不同的 Broker上, 这样便能提供⽐较好的并发能⼒(负载均衡)。

  2. Partition 可以指定对应的 Replica 数, 这也极⼤地提⾼了消息存储的安全性, 提⾼了容灾能⼒,不过也相应的增加了所需要的存储空间。

consumer 是推还是拉?

Kafka 最初考虑的问题是,customer 应该从 brokes 拉取消息还是 brokers 将消息推送到 consumer, 也就是 pull 还 push。在这方面,Kafka 遵循了一种大部分消息系统共同的传统的设计:producer 将消息推送到 broker,consumer 从broker 拉取消息。

一些消息系统比如 Scribe 和 Apache Flume 采用了 push 模式,将消息推送到下游的 consumer。这样做有好处也有坏处:由 broker 决定消息推送的速率,对于不同消费速率的 consumer 就不太好处理了。消息系统都致力于让 consumer 以最大的速率最快速的消费消息,但不幸的是,push 模式下,当broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了。最终 Kafka 还是选取了传统的 pull 模式。

Pull 模式的另外一个好处是 consumer 可以自主决定是否批量的从 broker 拉取数据 。Push 模式必须在不知道下游 consumer 消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免 consumer 崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull 模式下,consumer 就可以根据自己的消费能力去决定这些策略。

Pull 有个缺点是,如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到 t 达。为了避免这点,Kafka 有个参数可以让 consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发送)。

讲讲 kafka 维护消费状态跟踪的方法

大部分消息系统在 broker 端的维护消息被消费的记录:一个消息被分发到consumer 后 broker 就马上进行标记或者等待 customer 的通知后进行标记。这样也可以在消息在消费后立马就删除以减少空间占用。

但是这样会不会有什么问题呢?如果一条消息发送出去之后就立即被标记为消费过的,旦 consumer 处理消息时失败了(比如程序崩溃)消息就丢失了。为了解决这个问题,很多消息系统提供了另外一个个 功能:当消息被发送出去之后仅仅被标记为已发送状态,当接到 consumer 已经消费成功的通知后才标记为已被消费的状态。这虽然解决了消息丢失的问题,但产生了新问题,首先如果 consumer处理消息成功了但是向 broker 发送响应时失败了,这条消息将被消费两次。第二个问题时,broker 必须维护每条消息的状态,并且每次都要先锁住消息然后更改状态然后释放锁。这样麻烦又来了,且不说要维护大 量的状态数据,比如如果消息发送出去但没有收到消费成功的通知,这条消息将一直处于被锁定的状 态,Kafka 采用了不同的策略。Topic 被分成了若干分区,每个分区在同一时间只被一个 consumer 消费。这意味着每个分区被消费的消息在日志中的位置仅仅是一个简单的整数:offset。这样就很容易标 记每个分区消费状态就很容易了,仅仅需要一个整数而已。这样消费状态的跟踪就很简单了。

这带来了另外一个好处:consumer 可以把 offset 调成一个较老的值,去重新消费老的消息。这对传统的消息系统来说看起来有些不可思议,但确实是非常有用的,谁规定了一条消息只能被消费一次呢?

讲一下主从同步

Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topci配置副本的数量。Kafka会自动在每个个副本上备份数据,所以当一个节点down掉时数据依然是可用的。

Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。

为什么需要消息系统,mysql 不能满足需求吗?

(1) 解耦:

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

(2) 冗余:

消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队 列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出 该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

(3) 扩展性:

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过 程即可。

(4) 灵活性 & 峰值处理能力:

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理 这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的 访问压力,而不会因为突发的超负荷的请求而完全崩溃。

(5) 可恢复性:

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理 消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

(6) 顺序保证:

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按 照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)

(7) 缓冲:

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

(8) 异步通信:

很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入 队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

Zookeeper 对于 Kafka 的作用是什么?

image-20210505003739986

ZooKeeper 主要为 Kafka 提供元数据的管理的功能。 从图中我们可以看出,Zookeeper 主要为 Kafka 做了下⾯这些事情:

  1. Broker 注册 :在 Zookeeper 上会有⼀个专⻔⽤来进⾏ Broker 服务器列表记录的节点。每个Broker 在启动时,都会到 Zookeeper 上进⾏注册,即到/brokers/ids 下创建属于⾃⼰的节点。每个 Broker 就会将⾃⼰的 IP 地址和端⼝等信息记录到该节点中去
  2. Topic 注册 : 在 Kafka 中,同⼀个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。⽐如我创建了⼀个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些⽂件夹: /brokers/topics/my-topic/Partitions/0 、 /brokers/topics/my-topic/Partitions/1
  3. 负载均衡 :上⾯也说过了 Kafka 通过给特定 Topic 指定多个 Partition, ⽽各个 Partition 可以分布在不同的 Broker 上, 这样便能提供⽐较好的并发能⼒。 对于同⼀个 Topic 的不同Partition,Kafka 会尽⼒将这些 Partition 分布到不同的 Broker 服务器上。当⽣产者产⽣消息后也会尽量投递到不同 Broker 的 Partition ⾥⾯。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。

Zookeeper 是一个开放源码的、高性能的协调服务,它用于 Kafka 的分布式应用。

Zookeeper 主要用于在集群中不同节点之间进行通信

在 Kafka 中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取除此之外,它还执行其他活动,如: leader 检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。

数据传输的事务定义有哪三种? 和 MQTT 的事务定义一样都是 3 种。

(1) 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输

(2) 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.

(3) 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的

Kafka 判断一个节点是否还活着有那两个条件?

(1) 节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接

(2) 如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久

Kafka 与传统 MQ 消息系统之间有三个关键区别

(1).Kafka 持久化日志,这些日志可以被重复读取和无限期保留

(2).Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性

(3).Kafka 支持实时的流式处理

和其他消息队列相⽐,Kafka的优势在哪⾥?

我们现在经常提到 Kafka 的时候就已经默认它是⼀个⾮常优秀的消息队列了,我们也会经常拿它给

RocketMQ、RabbitMQ 对⽐。我觉得 Kafka 相⽐其他消息队列主要的优势如下:

  1. 极致的性能 :基于 Scala 和 Java 语⾔开发,设计中⼤量使⽤了批量处理和异步的思想,最⾼可以每秒处理千万级别的消息。

  2. ⽣态系统兼容性⽆可匹敌 :Kafka 与周边⽣态系统的兼容性是最好的没有之⼀,尤其在⼤数据和流计算领域。

实际上在早期的时候 Kafka 并不是⼀个合格的消息队列,早期的 Kafka 在消息队列领域就像是⼀个⾐衫褴褛的孩⼦⼀样,功能不完备并且有⼀些⼩问题⽐如丢失消息、不保证消息可靠性等等。当然,这也 和 LinkedIn 最早开发 Kafka ⽤于处理海量的⽇志有很⼤关系,哈哈哈,⼈家本来最开始就不是为了作为消息队列滴,谁知道后⾯误打误撞在消息队列领域占据了⼀席之地。

随着后续的发展,这些短板都被 Kafka 逐步修复完善。所以,Kafka 作为消息队列不可靠这个说法已经过时!

讲一讲 kafka 的 ack 的三种机制

request.required.acks 有三个值 0 1 -1(all)

0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据。

1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader挂掉后他不确保是否复制完成新 leader 也会导致数据丢失。

-1(all):服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的ack,这样数据不会丢失

消费者如何不自动提交偏移量,由应用提交?

将 auto.commit.offset 设为 false,然后在处理一批消息后 commitSync() 或者异步提交commitAsync()

即:

ConsumerRecords<> records = consumer.poll(); for (ConsumerRecord<> record : records){
。。。
tyr{
consumer.commitSync()
}
。。。

消费者故障,出现活锁问题如何解决?

出现“活锁”的情况,是它持续的发送心跳,但是没有处理。为了预防消费者在这种情况下一直持有分 区,我们使用 max.poll.interval.ms 活跃检测机制。 在此基础上,如果你调用的 poll 的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。 发生这种情况时,你会看到 offset 提交失败(调用commitSync()引发的 CommitFailedException)。这是一种安全机制,保障只有活动成员能够提交 offset。所以要留在组中,你必须持续调用 poll。

消费者提供两个配置设置来控制 poll 循环:

max.poll.interval.ms:增大 poll 的间隔,可以为消费者提供更多的时间去处理返回的消息(调用

poll(long)返回的消息,通常返回的消息都是一批)。缺点是此值越大将会延迟组重新平衡。

max.poll.records:此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。通过调整此值,可以减少 poll 间隔,减少重新平衡分组的

对于消息处理时间不可预测地的情况,这些选项是不够的。 处理这种情况的推荐方法是将消息处理移到另一个线程中,让消费者继续调用 poll。 但是必须注意确保已提交的 offset 不超过实际位置。另外, 你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)。 还要注意,你需要 pause 暂停分区,不会从 poll 接收到新消息,让线程处理完之前返回的消息(如果你的处理能力比拉取消息的慢,那创建新线程将导致你机器内存溢出)。

如何控制消费的位置

kafka 使用 seek(TopicPartition, long)指定新的消费位置。用于查找服务器保留的最早和最新的 offset

的特殊的方法也可用(seekToBeginning(Collection) 和seekToEnd(Collection))

kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?

Kafka 分布式的单位是 partition,同一个 partition 用一个 write ahead log 组织,所以可以保证 FIFO 的顺序。不同 partition 之间不能保证顺序。但是绝大多数用户都可以通过 message key 来定义,因为同一个 key 的 message 可以保证只发送到同一个 partition。

Kafka 中发送 1 条消息的时候,可以指定(topic, partition, key) 3 个参数。partiton 和 key 是可选的。如果你指定了 partition,那就是所有消息发往同 1个 partition,就是有序的。并且在消费端,Kafka 保证,1 个 partition 只能被1 个 consumer 消费。或者你指定 key( 比如 order id),具有同 1 个 key 的所有消息,会发往同 1 个 partition。

我们在使⽤消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,⽐如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是:更改⽤户会员等级、根据会员等级计算订单价格。假如这两条消息的消费顺序不⼀样造成的最终结果就会截然不同。

我们知道 Kafka 中 Partition(分区)是真正保存消息的地⽅,我们发送的消息都被放在了这⾥。⽽我们的 Partition(分区) ⼜存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个Partition。

image-20210505003903948

每次添加消息到 Partition(分区) 的时候都会采⽤尾加法,如上图所示。Kafka 只能为我们保证

Partition(分区) 中的消息有序,⽽不能保证 Topic(主题) 中的 Partition(分区) 的有序。

所以,我们就有⼀种很简单的保证消息消费顺序的⽅法:1 个 Topic 只对应⼀个 Partition。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。

Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同⼀个key 的消息可以保证只发送到同⼀个 partition,这个我们可以采⽤表/对象的 id 来作为 key 。

总结⼀下,对于如何保证 Kafka 中消息消费的顺序,有了下⾯两种⽅法:

  1. 1 个 Topic 只对应⼀个 Partition。

  2. (推荐)发送消息的时候指定 key/Partition。

当然不仅仅只有上⾯两种⽅法,上⾯两种⽅法是我觉得⽐较好理解的,

kafka 的高可用机制是什么?

这个问题比较系统,回答出 kafka 的系统特点,leader 和 follower 的关系,消息读写的顺序即可。

Kafka 如何保证消息不丢失

⽣产者丢失消息的情况

⽣产者(Producer) 调⽤ send ⽅法发送消息之后,消息可能因为⽹络问题并没有发送过去。 所以,我们不能默认在调⽤ send ⽅法发送消息之后消息消息发送成功了。为了确定消息是发送成功, 我们要判断消息发送的结果。但是要注意的是 Kafka ⽣产者(Producer) 使⽤ ⽅法发送消息实 际上是异步的操作,我们可以通过 get() ⽅法获取调⽤结果,但是这样也让它变为了同步操作,示例代码如下:

ListenableFuture<SendResult<String, Objectjk future = kafkaTemplate.send(topic, o);
future.addCallback(result i> logger.info("⽣产者成功发送消息到topic:{} partition:{}的消息",result.getRecordMetadata().topic(),result.getRecordMetadata().partition()),ex  i>  logger.error("⽣产者发送消失败,原因:{}", ex.getMessage()));

如果消息发送失败的话,我们检查失败的原因之后重新发送即可!

另外这⾥推荐为 Producer 的 retries (重试次数)设置⼀个⽐较合理的值,⼀般是 3 ,但是为了保证消息不丢失的话⼀般会设置⽐较⼤⼀点。设置完成之后,当出现⽹络问题之后能够⾃动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太⼩的话重试的效果就不明显了,⽹络波 动⼀次你3次⼀下⼦就重试完了

消费者丢失消息的情况

我们知道消息在被追加到 Partition(分区)的时候都会分配⼀个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

image-20210505004222772

当消费者拉取到了分区的某个消息之后,消费者会⾃动提交了 offset。⾃动提交的话会有⼀个问题, 试想⼀下,当消费者刚拿到这个消息准备进⾏真正消费的时候,突然挂掉了,消息实际上并没有被消 费,但是 offset 却被⾃动提交了。

解决办法也⽐较粗暴,我们⼿动关闭闭⾃动提交 offset,每次在真正消费完消息之后之后再⾃⼰⼿动提交 offset 。 但是,细⼼的朋友⼀定会发现,这样会带来消息被重新消费的问题。⽐如你刚刚消费完消息之后,还没提交 offset,结果⾃⼰挂掉了,那么这个消息理论上就会被消费两次。

Kafka 弄丢了消息

我们知道 Kafka 为分区(Partition)引⼊了多副本(Replica)机制。分区(Partition)中的多个副本之间会有⼀个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进⾏同步。⽣产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷⻉,它们的存在只是为了保证消息存储的安全性。

试想⼀种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出⼀个leader ,但是 leader 的数据还有⼀些没有被 follower 副本的同步的话,就会造成消息丢失。
设置 acks = all

解决办法就是我们设置 acks = all。acks 是 Kafka ⽣产者(Producer) 很重要的⼀个参数。acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。

设置 replication.factor ÄÅ 3

为了保证 leader 副本能有 follower 副本能同步消息,我们⼀般会为 topic 设置replication.factor ÄÅ 3。这样就可以保证每个 分区(partition) ⾄少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。

设置 min.insync.replicas > 1

⼀般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息⾄少要被写⼊到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际⽣产中应尽量避免默认值 1。但是,为了保证整个 Kafka 服务的⾼可⽤性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想⼀下加⼊两者相等的话,只要是有⼀个副本挂掉,整个分区就⽆法正常⼯作了。这明显违反⾼可⽤性!⼀般推荐设置成 replication.factor = min.insync.replicas + 1。

设置 unclean.leader.election.enable = false

Kafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true 改为false

我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进⾏同步。多个 follower 副本之间的消息同步情况不⼀样,当我们配置了unclean.leader.election.enable = false 的话,当 leader 副本发⽣故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。

kafka 如何减少数据丢失

Kafka到底会不会丢数据(data loss)? 通常不会,但有些情况下的确有可能会发生。下面的参数配置及

Best practice列表可以较好地保证数据的持久性(当然是trade-off,牺牲了吞吐量)。

  • block.on.buffer.full = true
  • acks = all

  • retries = MAX_VALUE

  • max.in.flight.requests.per.connection = 1

  • 使用KafkaProducer.send(record, callback)

  • callback逻辑中显式关闭producer:close(0)
  • unclean.leader.election.enable=false
  • replication.factor = 3

  • eplication.factor > min.insync.replicas

  • enable.auto.commit=false

  • 消息处理完成之后再提交位移

kafka 如何不消费重复数据?比如扣款,我们不能重复的扣。

其实还是得结合业务来思考,我这里给几个思路:

比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。

比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。

比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全 局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会 报错,不会导致数据库中出现脏数据。

results matching ""

    No results matching ""