【弄nèng-Kafka】应用篇(八)——Springboot整合Kafka(异常处理器

阅读: 评论:0

2024年2月6日发(作者:)

【弄nèng-Kafka】应用篇(八)——Springboot整合Kafka(异常处理器

import red;import ier;import ;import emplate;import ent;@Componentpublic class ErrorProducer { @Autowired @Qualifier("errorTemplate") private KafkaTemplate kafkaTemplate; @Value("${}") private String topic; public void send(String message) { (topic, message); }}2.2.2 消费者配置类

import erConfig;import Deserializer;import ;import ;import uration;import Kafka;import rentKafkaListenerContainerFactory;import istenerContainerFactory;import tKafkaConsumerFactory;import p;import ;@Configuration@EnableKafkapublic class ErrorConsumerConfig { @Value("${rap-servers}") private String bootstrapServers; @Value("${}") private String topic; /** *

单线程-单条消费 * @return */ @Bean public KafkaListenerContainerFactory<?> errorKafkaListenerContainerFactory() { Map configProps = new HashMap<>(); (RAP_SERVERS_CONFIG, bootstrapServers); (_DESERIALIZER_CLASS_CONFIG, ); (_DESERIALIZER_CLASS_CONFIG, ); (_ID_CONFIG, topic); ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); sumerFactory(new DefaultKafkaConsumerFactory<>(configProps)); return factory; }}消费者

import 4j;import er;import ;import istener;import erAwareListenerErrorHandler;import erExecutionFailedException;import eaders;import e;import eHeaders;import ;import d;import ent;import ;@Component@Slf4jpublic class ErrorConsumer { @KafkaListener(topics = "${}", containerFactory = "errorKafkaListenerContainerFactory", errorHandler = "consumerAwareErrorHandler") public void receive(@Payload String message, @Header(ED_PARTITION_ID) int partition) { n(("From partition %d : %s", partition, message) ); throw new RuntimeException("fail"); } /** *

单条消息 * @return */ @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { ("ConsumerAwareListenerErrorHandler receive : "+load().toString()); return null; } }; } /** *

批量消息 * @return */ @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandlerBatch() { return new ConsumerAwareListenerErrorHandler() { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) { ("consumerAwareErrorHandler receive : "+load().toString()); MessageHeaders headers = ders(); List topics = (ED_TOPIC, ); List partitions = (ED_PARTITION_ID, ); List offsets = (, ); return null; } }; }}

【弄nèng-Kafka】应用篇(八)——Springboot整合Kafka(异常处理器

本文发布于:2024-02-06 22:13:23,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170722880562586.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   单条   整合   配置   处理器
留言与评论(共有 0 条评论)
   
验证码:
排行榜

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23