0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

怎样去解决Kafka消息重复的问题呢?

jf_ro2CN3Fa 来源:稀土掘金 2023-02-12 14:18 次阅读

一、前言

数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。

00fc324a-a28b-11ed-bfe3-dac502259ad0.jpg

通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。

整理下消息重复的几个场景:

生产端: 遇到异常,基本解决措施都是 重试

场景一:leader分区不可用了,抛 LeaderNotAvailableException 异常,等待选出新 leader 分区。

场景二:Controller 所在 Broker 挂了,抛 NotControllerException 异常,等待 Controller 重新选举。

场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException 异常,等待网络恢复。

消费端: poll 一批数据,处理完毕还没提交 offset ,机子宕机重启了,又会 poll 上批数据,再度消费就造成了消息重复。

怎么解决?

先来了解下消息的三种投递语义:

最多一次( at most once): 消息只发一次,消息可能会丢失,但绝不会被重复发送。例如:mqtt 中 QoS = 0。

至少一次( at least once): 消息至少发一次,消息不会丢失,但有可能被重复发送。例如:mqtt 中 QoS = 1

精确一次( exactly once): 消息精确发一次,消息不会丢失,也不会被重复发送。例如:mqtt 中 QoS = 2。

了解了这三种语义,再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:

Kafka 幂等性 Producer: 保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话)

Kafka 事务: 保证生产端发送消息幂等。解决幂等 Producer 的局限性。

消费端幂等:保证消费端接收消息幂等。蔸底方案。

1)Kafka 幂等性 Producer

幂等性指 :无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。

幂等性使用示例:在生产端添加对应配置即可

Propertiesprops=newProperties();
props.put("enable.idempotence",ture);//1.设置幂等
props.put("acks","all");//2.当enable.idempotence为true,这里默认为all
props.put("max.in.flight.requests.per.connection",5);//3.注意

设置幂等,启动幂等。

配置 acks,注意:一定要设置 acks=all,否则会抛异常。

配置 max.in.flight.requests.per.connection 需要 <= 5 ,否则会抛异常 OutOfOrderSequenceException。

0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1

Kafka >= 1.1, max.in.flight.request.per.connection <= 5

[**为了更好理解,需要了解下 Kafka 幂等机制:]

010c3212-a28b-11ed-bfe3-dac502259ad0.jpg

Producer 每次启动后,会向 Broker 申请一个全局唯一的 pid。(重启后 pid 会变化,这也是弊端之一)

Sequence Numbe:针对每个 都对应一个从0开始单调递增的 Sequence,同时 Broker端会缓存这个 seq num

判断是否重复: 去 Broker 里对应的队列 ProducerStateEntry.Queue(默认队列长度为 5)查询是否存在

如果 nextSeq == lastSeq + 1,即 服务端seq + 1 == 生产传入seq,则接收。

如果 nextSeq == 0 && lastSeq == Int.MaxValue,即刚初始化,也接收。

反之,要么重复,要么丢消息,均拒绝。

011a794e-a28b-11ed-bfe3-dac502259ad0.jpg

这种设计针对解决了两个问题:

消息重复: 场景 Broker 保存消息后还没发送 ack 就宕机了,这时候 Producer 就会重试,这就造成消息重复。

消息乱序: 避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。

那什么时候该使用幂等:

如果已经使用 acks=all,使用幂等也可以。

如果已经使用 acks=0 或者 acks=1,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。

2)Kafka 事务

使用 Kafka 事务解决幂等的弊端:单会话且单分区幂等。

Tips: 这块篇幅较长,这先稍微提及下使用,之后另起一篇。

事务使用示例:分为生产端 和 消费端

Propertiesprops=newProperties();
props.put("enable.idempotence",ture);//1.设置幂等
props.put("acks","all");//2.当enable.idempotence为true,这里默认为all
props.put("max.in.flight.requests.per.connection",5);//3.最大等待数
props.put("transactional.id","my-transactional-id");//4.设定事务id

Producerproducer=newKafkaProducer(props);

//初始化事务
producer.initTransactions();

try{
//开始事务
producer.beginTransaction();

//发送数据
producer.send(newProducerRecord("Topic","Key","Value"));

//数据发送及Offset发送均成功的情况下,提交事务
producer.commitTransaction();
}catch(ProducerFencedException|OutOfOrderSequenceException|AuthorizationExceptione){
//数据发送或者Offset发送出现异常时,终止事务
producer.abortTransaction();
}finally{
//关闭Producer和Consumer
producer.close();
consumer.close();
}

这里消费端 Consumer 需要设置下配置:isolation.level 参数

read_uncommitted: 这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。

如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。

read_committed: 表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

3)消费端幂等

“如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。

只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。

典型的方案是使用:消息表,来去重:

0129106c-a28b-11ed-bfe3-dac502259ad0.jpg

上述栗子中,消费端拉取到一条消息后,开启事务,将消息Id 新增到本地消息表中,同时更新订单信息

如果消息重复,则新增操作 insert 会异常,同时触发事务回滚。

二、案例:Kafka 幂等性 Producer 使用

准备工作如下:

1、Zookeeper:本地使用 Docker 启动

$dockerrun-d--namezookeeper-p2181:2181zookeeper
a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4

2、Kafka:版本 2.7.1,源码编译启动(看上文源码搭建启动)

3、启动生产者:Kafka 源码中 exmaple 中

4、启动消息者:可以用 Kafka 提供的脚本

>基于SpringCloudAlibaba+Gateway+Nacos+RocketMQ+Vue&Element实现的后台管理系统+用户小程序,支持RBAC动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
>
>*项目地址:
>*视频教程

#举个栗子:topic 需要自己去修改
$cd./kafka-2.7.1-src/bin
$./kafka-console-producer.sh--broker-listlocalhost:9092--topictest_topic

创建 topic : 1副本,2 分区

$./kafka-topics.sh--bootstrap-serverlocalhost:9092--topicmyTopic--create--replication-factor1--partitions2

#查看
$./kafka-topics.sh--bootstrap-serverbroker:9092--topicmyTopic--describe

生产者代码:

013daf5e-a28b-11ed-bfe3-dac502259ad0.jpg

publicclassKafkaProducerApplication{

privatefinalProducerproducer;
finalStringoutTopic;

publicKafkaProducerApplication(finalProducerproducer,
finalStringtopic){
this.producer=producer;
outTopic=topic;
}

publicvoidproduce(finalStringmessage){
finalString[]parts=message.split("-");
finalStringkey,value;
if(parts.length>1){
key=parts[0];
value=parts[1];
}else{
key=null;
value=parts[0];
}
finalProducerRecordproducerRecord
=newProducerRecord<>(outTopic,key,value);
producer.send(producerRecord,
(recordMetadata,e)->{
if(e!=null){
e.printStackTrace();
}else{
System.out.println("key/value"+key+"/"+value+"	writtentotopic[partition]"+recordMetadata.topic()+"["+recordMetadata.partition()+"]atoffset"+recordMetadata.offset());
}
}
);
}

publicvoidshutdown(){
producer.close();
}

publicstaticvoidmain(String[]args){

finalPropertiesprops=newProperties();

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
props.put(ProducerConfig.ACKS_CONFIG,"all");

props.put(ProducerConfig.CLIENT_ID_CONFIG,"myApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

finalStringtopic="myTopic";
finalProducerproducer=newKafkaProducer<>(props);
finalKafkaProducerApplicationproducerApp=newKafkaProducerApplication(producer,topic);

StringfilePath="/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
try{
ListlinesToProduce=Files.readAllLines(Paths.get(filePath));
linesToProduce.stream().filter(l->!l.trim().isEmpty())
.forEach(producerApp::produce);
System.out.println("Offsetsandtimestampscommittedinbatchfrom"+filePath);
}catch(IOExceptione){
System.err.printf("Errorreadingfile%sdueto%s%n",filePath,e);
}finally{
producerApp.shutdown();
}
}
}

启动生产者后,控制台输出如下:

014ce7da-a28b-11ed-bfe3-dac502259ad0.jpg

启动消费者:

$./kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicmyTopic
015c4680-a28b-11ed-bfe3-dac502259ad0.jpg

修改配置 acks

``

启用幂等的情况下,调整 acks 配置,生产者启动后结果是怎样的:

修改配置 acks = 1

修改配置 acks = 0

会直接报错:

Exceptioninthread"main"org.apache.kafka.common.config.ConfigException:Mustsetackstoallinordertousetheidempotentproducer.
Otherwisewecannotguaranteeidempotence.
0173c8b4-a28b-11ed-bfe3-dac502259ad0.jpg

修改配置 max.in.flight.requests.per.connection

``

启用幂等的情况下,调整此配置,结果是怎样的:

将 max.in.flight.requests.per.connection > 5 会怎样?

0182e556-a28b-11ed-bfe3-dac502259ad0.jpg

当然会报错:

Causedby:org.apache.kafka.common.config.ConfigException:Mustsetmax.in.flight.requests.per.connectiontoatmost5tousetheidempotentproducer.
01987f24-a28b-11ed-bfe3-dac502259ad0.jpg






审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • MQTT协议
    +关注

    关注

    0

    文章

    91

    浏览量

    5254
  • kafka
    +关注

    关注

    0

    文章

    49

    浏览量

    5169

原文标题:一碰就头疼的 Kafka 消息重复问题,立马解决!

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    kafka数据可靠性深度解读

    At least once,可以保证不丢,但是可能会重复,为了解决重复需要引入唯一标识和重机制,kafka提供了GUID实现了唯一标识,但是并没有提供自带的
    发表于 05-08 16:29

    基于闪存存储的Apache Kafka性能提升方法

    据生态系统中最常用的分布式消息传递系统之一的Apache Kafka进行评估,测试如何以最佳方式将美光固态存储应用于 Apache Kafka,以及将产生怎样的收益。A
    发表于 07-24 06:58

    Kafka集群环境的搭建

    :2181,zk02:2181,zk03:2181注意:broker.id安装集群服务个数编排即可,集群下不能重复。5、启动kafka集群# 启动命令[root@node02 kafka2.11]# bin
    发表于 01-05 17:55

    怎样设置数值元件的格式

    怎样设置数值元件怎样设置数值元件的格式
    发表于 09-26 09:16

    怎样获取Android的电池电压

    怎样获取Android的电池电压怎样获取Android的电池电流
    发表于 10-09 08:39

    怎样使用springboot

    怎样使用springboot?学习springboot需要懂得哪些?
    发表于 10-25 07:13

    怎样使用HSE/HSI配置RCC的时钟

    怎样使用HSE/HSI配置RCC的时钟怎样设置系统时钟的库函数
    发表于 11-10 07:08

    怎样使用STM32自带DMA传输数据

    怎样使用STM32自带DMA传输数据怎样使用STM32 HAL库对DMA进行配置
    发表于 11-18 07:51

    怎样快捷的操作AT指令

    怎样快捷的操作AT指令怎样编写自己的AT指令
    发表于 12-07 07:27

    怎样配置设备树的leds节点

    配置设备树leds节点,sys文件系统中没有出现相应设备文件,引脚没有查出有重复定义的?怎样配置设备树的leds节点
    发表于 01-07 06:15

    socket通信该怎样实现

    socket通信该怎样实现怎样实现socket AES-CBC加密
    发表于 01-20 07:41

    怎样实现STM32 IAP升级的设计

    STM32 IAP的实现原理是什么?怎样实现STM32 IAP升级的设计怎样编写Bootloader的升级程序
    发表于 01-27 06:02

    怎样配置Android的SDIO部分

    怎样配置Android的电源部分怎样配置Android的SDIO部分
    发表于 02-10 07:00

    Ubuntu固件的编译该怎样使用

    怎样编译Ubuntu固件?Ubuntu固件的编译该怎样使用
    发表于 02-15 06:18

    怎样写回调函数怎样使用回调函数

    回调函数的作用是什么?单片机怎么用回调函数在不同文件之间传递数据怎样写回调函数怎样使
    发表于 02-23 07:40