kafkatemplate无法注入

阅读: 评论:0

kafkatemplate无法注入

kafkatemplate无法注入

背景kafka: 0.9

spring-kafka: 2.2.4.RELEASE

kafka-client: 2.0.1

测试环境:topic的partition数量为2,consumer数量为1

问题

项目中kafka消费配置了失败重试规则,最大重试次数maxRetryTimes的值为15。测试时故意在消费时报错,在消费15次之后就会写入死信队列。kafkaListenerContainerFactory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), maxRetryTimes));

但是最近测试环境却出现“消费端无限重试异常消息的情况”。

解决

本地连上测试环境的kafka调试消费,发现:topic的partition数量为2。

当前只有一个consumer,且consumer的concurrency配置为1。

如果同时有两批异常消息在重试。

在以上情况下,消费端就会出现无限重试。

启动debug了解spring-kafka重试原理和问题原因。

从配置的SeekToCurrentErrorHandler开始,SeekToCurrentErrorHandler会将consumer消费进度重新定位到消费失败的消息的offset,这样comsumer就会重新拉取到失败的消息:public class SeekToCurrentErrorHandler implements ContainerAwareErrorHandler {

@Override

public void handle(Exception thrownException, List> records,

Consumer, ?> consumer, MessageListenerContainer container) {

if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, LOGGER)) {

throw new KafkaException("Seek to current after exception", thrownException);

}

else if (thismitRecovered) {

if (ContainerProperties().getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {

ConsumerRecord, ?> record = (0);

Map offsetToCommit = Collections.singletonMap(

new pic(), record.partition()),

new OffsetAndMetadata(record.offset() + 1));

if (ContainerProperties().isSyncCommits()) {

consumermitSync(offsetToCommit);

}

else {

OffsetCommitCallback commitCallback = ContainerProperties().getCommitCallback();

if (commitCallback == null) {

commitCallback = LOGGING_COMMIT_CALLBACK;

}

consumermitAsync(offsetToCommit, commitCallback);

}

}

else {

LOGGER.warn("'commitRecovered' ignored, container AckMode must be MANUAL_IMMEDIATE");

}

}

}

}

这里由if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, LOGGER))来判断是否需要重试,并重新定位offset。public final class SeekUtils {

public static boolean doSeeks(List> records, Consumer, ?> consumer, Exception exception,

boolean recoverable, BiPredicate, Exception> skipper, Log logger) {

Map partitions = new LinkedHashMap<>();

AtomicBoolean first = ne

本文发布于:2024-01-31 14:53:19,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170668399929315.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:kafkatemplate
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23