最近業(yè)務(wù)上用到了Spring Kafka,所以系統(tǒng)性的探索了下Spring Kafka的各種用法,發(fā)現(xiàn)了很多實(shí)用的特性,下面介紹下Spring Kafka的消息重試機(jī)制。
0. 前言
原生 Kafka 是不支持消息重試的。但是 Spring Kafka 2.7+ 封裝了 Retry Topic 這個(gè)功能。
1. @RetryableTopic
使用注解的方式啟用 Retry Topic,在 @KafkaListener 方法上添加 @RetryableTopic 即可:
@Slf4j
@Component
public class DemoConsumer {
@RetryableTopic
@KafkaListener(topics = "topic1", groupId = "group1")
public void onMsg(ConsumerRecord< String, String > record) {
log.info("topic: {}", record.topic());
throw new RuntimeException("kafka exception");
}
}
這樣就開啟了 Spring Kafka 的消息重試機(jī)制:默認(rèn)重試 3 次,間隔為 1 秒。
我們?cè)诜椒ɡ?a href="http://m.xsypw.cn/analog/" target="_blank">模擬了拋出異常,運(yùn)行后可以發(fā)現(xiàn)打印了 3 條日志,間隔時(shí)間大約為 1 秒,重試的topic為原topic加上后綴“-retry”
2022-11-12 12:14:10.230 INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1
2022-11-12 12:14:11.315 INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-0
2022-11-12 12:14:12.310 INFO 1023 --- [ner#3-dlt-0-C-1] c.b.b.demo.retrytopic.KafkaListener: topic: topic1-retry-1
2. DLT死信隊(duì)列
如果 3 次重試后依舊失敗,會(huì)將消息發(fā)送到 DLT,
默認(rèn)情況,消息被發(fā)送到死信隊(duì)列后,會(huì)輸出一條日志。
2022-11-12 12:14:13.324 INFO 1023 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer : Received message in dlt listener: topic1-dlt@233
DLT的topic為原topic加上后綴“-dlt”
我們可以使用@DltHandler注解來(lái)定義進(jìn)入死信隊(duì)列后的操作:
@DltHandler
public void dltHandler(ConsumerRecord< String, String > record) {
log.info("topic:{}, key:{}, value:{}", record.topic(), record.key(), record.value());
}
3. 自定義@RetryableTopic
可以自定義重試次數(shù)、延遲時(shí)間、topic命名策略等等,支持使用 Spring EL 表達(dá)式讀取配置。
@Slf4j
@Component
public class DemoConsumer {
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = "5000", multiplier = "2"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC
)
@KafkaListener(topics = "topic2", groupId = "group1")
public void onMsg2(ConsumerRecord< String, String > record) {
log.info("topic: {}", record.topic());
throw new RuntimeException("kafka exception");
}
}
注解屬性說(shuō)明:
attempts :重試次數(shù),默認(rèn)為3。
@Backoff delay :消費(fèi)延遲時(shí)間,單位為毫秒。
@Backoff multiplier :延遲時(shí)間系數(shù),此例中 attempts = 4, delay = 5000, multiplier = 2 ,則間隔時(shí)間依次為5s、10s、20s、40s,最大延遲時(shí)間受 maxDelay 限制。
fixedDelayTopicStrategy :可選策略包括:SINGLE_TOPIC 、MULTIPLE_TOPICS
4. 配置類
以上介紹的是注解的方式,只對(duì)注解下的方法有效。如果想讓多個(gè)方法都用相同的消息重試配置,那么可以使用配置類方式:
@Bean
public RetryTopicConfiguration retryTopic(KafkaTemplate< String, String > template) {
return RetryTopicConfigurationBuilder
.newInstance()
.maxAttempts(4)
.fixedBackOff(5000)
.includeTopic("topic1")
.create(template);
}
小結(jié)
以上就是Spring Kafka消息重試機(jī)制的簡(jiǎn)單應(yīng)用~希望能夠幫助那些正在使用Spring Kafka或即將使用的人少走一些彎路、少踩一點(diǎn)坑。
-
spring
+關(guān)注
關(guān)注
0文章
340瀏覽量
14737 -
日志
+關(guān)注
關(guān)注
0文章
140瀏覽量
10779 -
機(jī)制
+關(guān)注
關(guān)注
0文章
24瀏覽量
9895 -
DLT
+關(guān)注
關(guān)注
0文章
16瀏覽量
5371
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
Spring Boot Starter需要些什么

java spring教程
什么是java spring
Spring筆記分享
Kafka集群環(huán)境的搭建
spring定時(shí)器用法詳解

Kafka的概念及Kafka的宕機(jī)

Spring Boot實(shí)現(xiàn)各種參數(shù)校驗(yàn)
Spring Validation的使用
Kafka 的簡(jiǎn)介

監(jiān)控Kafka集群的常用的方法和工具介紹

kafka client在 spring如何實(shí)現(xiàn)

Kafka架構(gòu)技術(shù):Kafka的架構(gòu)和客戶端API設(shè)計(jì)

評(píng)論