背景kafka: 0.9
spring-kafka: 2.2.4.RELEASE
kafka-client: 2.0.1
测试环境:topic的partition数量为2,consumer数量为1
问题
项目中kafka消费配置了失败重试规则,最大重试次数maxRetryTimes的值为15。测试时故意在消费时报错,在消费15次之后就会写入死信队列。kafkaListenerContainerFactory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), maxRetryTimes));
但是最近测试环境却出现“消费端无限重试异常消息的情况”。
解决
本地连上测试环境的kafka调试消费,发现:topic的partition数量为2。
当前只有一个consumer,且consumer的concurrency配置为1。
如果同时有两批异常消息在重试。
在以上情况下,消费端就会出现无限重试。
启动debug了解spring-kafka重试原理和问题原因。
从配置的SeekToCurrentErrorHandler开始,SeekToCurrentErrorHandler会将consumer消费进度重新定位到消费失败的消息的offset,这样comsumer就会重新拉取到失败的消息:public class SeekToCurrentErrorHandler implements ContainerAwareErrorHandler {
@Override
public void handle(Exception thrownException, List> records,
Consumer, ?> consumer, MessageListenerContainer container) {
if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, LOGGER)) {
throw new KafkaException("Seek to current after exception", thrownException);
}
else if (thismitRecovered) {
if (ContainerProperties().getAckMode().equals(AckMode.MANUAL_IMMEDIATE)) {
ConsumerRecord, ?> record = (0);
Map offsetToCommit = Collections.singletonMap(
new pic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
if (ContainerProperties().isSyncCommits()) {
consumermitSync(offsetToCommit);
}
else {
OffsetCommitCallback commitCallback = ContainerProperties().getCommitCallback();
if (commitCallback == null) {
commitCallback = LOGGING_COMMIT_CALLBACK;
}
consumermitAsync(offsetToCommit, commitCallback);
}
}
else {
LOGGER.warn("'commitRecovered' ignored, container AckMode must be MANUAL_IMMEDIATE");
}
}
}
}
这里由if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, LOGGER))来判断是否需要重试,并重新定位offset。public final class SeekUtils {
public static boolean doSeeks(List> records, Consumer, ?> consumer, Exception exception,
boolean recoverable, BiPredicate, Exception> skipper, Log logger) {
Map partitions = new LinkedHashMap<>();
AtomicBoolean first = ne
本文发布于:2024-01-31 14:53:19,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170668399929315.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |