Kafka 是一种分布式的,高吞吐率, 基于发布 / 订阅的消息系统。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,如果有大量的消息处理(10W+),对一致性要求不高, 那么可以采用KAFKA, 比如日志采集、 数据同步与归档、实时流数据处理等场景。
Spring 官方为我们提供了Spring-kafka组件, 这里讲解如何与Spring Boot集成使用,并通过代码将全面讲解Kafka普通订阅模式、死信队列配置使用、多路订阅模式以及事务控制的使用。
下载安装Kafka
这里选取的是kafka的2.3.0版本, 基于scale2.11版本;根据系统环境选择使用。
kafka下载地址: .3.0/kafka_2.11-2.
启动命令
先启动zookeeper:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
支持后台模式启动。
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
创建工程spring-boot-mq-kafka
MAVEN依赖
<!-- Spring kafka 组件依赖-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
工程基础配置
server:port: 12616
spring:application:name: mq-kafka# kafka 配置kafka:# 指定kafka服务地址bootstrap-servers: :9092
配置服务运行的端、名称以及kafka连接地址信息。
创建启动类
com.mirson.spring.boot.mq.kafka.startup.KafkaApplication
@SpringBootApplication
@ComponentScan(basePackages = {"com.mirson"})
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}
}
先将所用的Topic定义为常量
com.mirson.spring.boot.fig.KafkaConfig
/*** 普通订阅主题*/public static final String TOPIC_COMMON = "topic_common";public static final String TOPIC_COMMON_SECOND = "topic_common_second";
定义两个TOPIC, 分别为TOPIC_COMMON与TOPIC_COMMON_SECOND,便于演示不同消息的自适应装配。
定义消息监听器
com.mirson.spring.boot.sume.KafkaCommonConsumer
@Configuration
@Log4j2
public class KafkaCommonConsumer {/*** JSON 字符串作为消息转换器, 可以支持相同属性不同对象的转换* @return*/@Beanpublic RecordMessageConverter converter() {return new StringJsonMessageConverter();}/*** 死信配置* @param configurer* @param kafkaConsumerFactory* @param template* @return*/@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory,KafkaTemplate<Object, Object> template) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();figure(factory, kafkaConsumerFactory);factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 2)); // dead-letter after 2 triesreturn factory;}/*** 定义第一个监听器* @param dataMsg*/@KafkaListener(id = KafkaConfig.TOPIC_COMMON, topics = KafkaConfig.TOPIC_COMMON)public void listen(DataMsg dataMsg) {log.info("TOPIC_COMMON Received: " + dataMsg);}/*** 创建第一个TOPIC* @return*/@Beanpublic NewTopic topic() {return new NewTopic(KafkaConfig.TOPIC_COMMON, 1, (short) 1);}/*** 定义第二个监听器* @param dataMsg*/@KafkaListener(id = KafkaConfig.TOPIC_COMMON_SECOND, topics = KafkaConfig.TOPIC_COMMON_SECOND)public void listen(DataMsgSecond dataMsg) {log.info("TOPIC_COMMON_SECOND Received: " + dataMsg);if (Msg().startsWith("fail")) {throw new RuntimeException("failed");}}/*** 创建第二个TOPIC* @return*/@Beanpublic NewTopic topicSecond() {return new NewTopic(KafkaConfig.TOPIC_COMMON_SECOND, 1, (short) 1);}/*** 定义第二队列的死信队列监听器* @param in*/@KafkaListener(id = "dltGroup", topics = KafkaConfig.TOPIC_COMMON_SECOND + ".DLT")public void dltListen(String in) {log.info("Received from DLT: " + in);}/*** 创建死信队列TOPIC* @return*/@Beanpublic NewTopic dlt() {return new NewTopic(KafkaConfig.TOPIC_COMMON_SECOND + ".DLT", 1, (short) 1);}}
采用JSON, 作为队列消息转换器, 可以支持具有相同属性的不同对象的转换, 自适应装配转换为可用对象。定义两个VO作演示, DataMsg对象:
@Data
public class DataMsg implements Serializable {/*** 消息数据*/private String msg;public DataMsg() {}public DataMsg(String msg) {this.msg = msg;}}
再定义一个VO对象, 名称为DataMsgSecond, 与DataMsg属性结构一致。
kafkaListenerContainerFactory方法设置死信配置, 当接收端出现异常时, 如果超过指定重试次数(这里设置2次), 将会判定为消亡的信息,如果有配置DLT队列, 会发送一条消息至该队列,可以通过补偿机制处理。
接下来分别定义两个队列的的主题与监听器, 通过日志打印接收数据, 便于跟踪; 第二个监听器, 当收到“fail"消息时, 会抛出异常, 用于验证消亡信息功能。
最后, 配置死信队列与监听器, 队列名称规范, 后缀必须为”.DLT", 可以接收到消亡信息。
定义发送接口
com.mirson.spring.boot.mq.kafka.provider.KafkaProviderController
/*** 发送普通消息* @param topic* @param msg* @return*/@GetMapping(path = "/sendCommonMsg")public String sendCommonMsg(String topic, String msg) {plate.send(topic, new DataMsg(msg));return "send topic: " + topic + ", msg: " + msg;}
通过参数, 来指定发送的主题以及消息数据。
功能验证
验证能否正常收发消息
调用发送接口, 指定发送主题为TOPIC_COMMON,数据内容为test
127.0.0.1:12616/sendCommonMsg?topic=topic_common&msg=test
消息成功发送, 查看监听器日志:
接收到指定发送的消息。
验证第二个队列,能否正常接收不同对象, 相同属性的消息:
调用发送接口,指定发送主题为TOPIC_COMMON_SECOND,数据内容为test
127.0.0.1:12616/sendCommonMsg?topic=topic_common_second&msg=test
查看监听器日志:
第二个队列, 能够根据属性自适应装配为DataMsgSecond对象, 正常打印出消息内容。
死信功能验证
需要伪造一个异常消息, 抛出异常, 触发消亡消息。
调用接口, 发送一个fail数据:
127.0.0.1:12616/sendCommonMsg?topic=topic_common_second&msg=fail
查看监听日志:
接收到了两次发送的消息, 与死信的失败配置, 保证最多接收两次是一致的;
并且从DLT死信队列中, 接收到了发送的数据。
多路订阅模式是指在监听器中指定多个主题队列,通过不同方法来接收处理消息数据。
JavaConfig配置
com.mirson.spring.boot.fig.KafkaMultipleConfiguration
@Configuration
public class KafkaMultipleConfiguration {/*** 定义消息转换器,采用StringJsonMessageConverter, 指定对象映射关系* @return*/@Beanpublic RecordMessageConverter converter() {StringJsonMessageConverter converter = new StringJsonMessageConverter();DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);typeMapper.addTrustedPackages("*");Map<String, Class<?>> mappings = new HashMap<>();mappings.put("multiApp", MultiApp.class);mappings.put("multiWeb", MultiWeb.class);typeMapper.setIdClassMapping(mappings);converter.setTypeMapper(typeMapper);return converter;}/*** 创建APP主题* @return*/@Beanpublic NewTopic app() {return new NewTopic(KafkaConfig.TOPIC_MULTI_APP, 1, (short) 1);}/*** 创建WEB主题* @return*/@Beanpublic NewTopic web() {return new NewTopic(KafkaConfig.TOPIC_MULTI_WEB, 1, (short) 1);}}
实际业务当中, 传递的JSON对象比较复杂, 可能会出现内嵌多个层级的对象, 这时候就需要在StringJsonMessageConverter转换器中指定,属性与对象的映射关系,这里是消费端的配置, 发送端可以在配置文件中定义:
spring:application:name: mq-kafka# kafka 配置kafka:# 指定kafka服务地址bootstrap-servers: :9092# 指定生产者采用JSON序列化producer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializerproperties:pe.mapping: multiApp:com.mirson.spring.boot.mq.kafka.vo.MultiApp,multiWeb:com.mirson.spring.boot.mq.kafka.vo.MultiWeb
这里指定了两个对象,一个是multiApp, 另一个是multiWeb。
创建了两个主题, 分别是APP与WEB, 在多路订阅中, 一个处理APP类型消息, 另一个处理WEB类型消息。
在使用前, 将原普通订阅模式的配置注解注释掉, 避免出现重复配置不能启动问题。
com.mirson.spring.boot.sume.KafkaCommonConsumer
// @Configuration
@Log4j2
public class KafkaCommonConsumer {...
}
定义多路监听消费端
com.mirson.spring.boot.sume.kafkaMultipleConsumer
@Component
@KafkaListener(id = "multiGroup", topics = {KafkaConfig.TOPIC_MULTI_APP, KafkaConfig.TOPIC_MULTI_WEB })
@Log4j2
public class kafkaMultipleConsumer {@KafkaHandlerpublic void multiApp(MultiApp app) {log.info("MultiApp Received: " + app);}@KafkaHandlerpublic void multiWeb(MultiWeb web) {log.info("MultiWeb Received: " + web);}@KafkaHandler(isDefault = true)public void unknown(Object object) {log.info("Received unknown: " + object);}}
订阅了两个主题, KafkaConfig.TOPIC_MULTI_APP与KafkaConfig.TOPIC_MULTI_WEB, 定义了三个方法:
multiApp接收处理TOPIC_MULTI_APP消息; multiWeb接收处理TOPIC_MULTI_WEB消息; unknown处理发往这两个主题, 但不符合数据类型的消息。
定义发送接口
com.mirson.spring.boot.mq.kafka.provider.KafkaProviderController
/*** 发送多路订阅消息* @param msg* @return*/@GetMapping(path = "/sendMultiMsg")public String sendMultiMsg(String msg) {ains("app")) {// APP主题发送消息plate.send(KafkaConfig.TOPIC_MULTI_APP, new MultiApp(msg));}else ains("web")){// WEB主题发送消息plate.send(KafkaConfig.TOPIC_MULTI_WEB, new MultiWeb(msg));}else {// 默认, 往WEB主题发送字符消息plate.send(KafkaConfig.TOPIC_MULTI_WEB, msg);}return "send msg: " + msg;}
通过参数来控制, 便于测试验证。
功能验证
发送一个APP消息
调用接口:127.0.0.1:12616/sendMultiMsg?msg=app123
监听打印日志:
MultiApp监听器, 正常接收消息。
发送一个WEB消息
调用接口: 127.0.0.1:12616/sendMultiMsg?msg=web123
查看监听日志:
MultiWeb监听器, 正常接收消息。
发送一个未知类型消息
调用接口: 127.0.0.1:12616/sendMultiMsg?msg=123
查看监听日志:
进入了多路监听器的默认方法, 将消息内容正常打印。
为避免kafka事务消息配置对原工程造成的冲突, 重新创建工程演示。
创建工程spring-boot-mq-kafka-transaction
工程配置
l
server:port: 12618
spring:application:name: mq-kafka# kafka 配置kafka:# 指定kafka服务地址bootstrap-servers: :9092# 指定生产者采用JSON序列化producer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializertransaction-id-prefix: tx.# 指定接收线程数量listener:concurrency: 3# 消费者事务级别设置consumer:properties:isolation.level: read_committed
定义两个主题常量
一个用于接收事务消息,另一个用于转发处理后的事务消息。
com.mirson.spring.boot.fig.KafkaConfig
public static final String TOPIC_TRANSACTION= "topic_transaction";public static final String TOPIC_TRANSACTION_FORWARD = "topic_transaction_forward";
定义消息监听器
com.mirson.spring.boot.sume.KafkaTransactionConsumer
@Configuration
@Log4j2
public class KafkaTransactionConsumer {/*** 设置KAFKA监听工厂, 支持批量消息处理* @param configurer* @param kafkaConsumerFactory* @param template* @return*/@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory,KafkaTemplate<Object, Object> template) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();figure(factory, kafkaConsumerFactory);factory.setBatchListener(true);factory.setMessageConverter(batchConverter());return factory;}/*** 设置消息转换器* @return*/@Beanpublic RecordMessageConverter converter() {return new StringJsonMessageConverter();}/*** 设置批量消息处理器* @return*/@Beanpublic BatchMessagingMessageConverter batchConverter() {return new BatchMessagingMessageConverter(converter());}@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 事务消息监听器* @param dataMsgs* @throws IOException*/@KafkaListener(id = "group1", topics = KafkaConfig.TOPIC_TRANSACTION)public void listenTransaction(List<DataMsg> dataMsgs) throws IOException {log.info("ListenTransaction Received: " + dataMsgs);dataMsgs.forEach(f -> kafkaTemplate.send(KafkaConfig.TOPIC_TRANSACTION_FORWARD, f.getMsg()+" commit. "));log.info("ListenTransaction message commit and forward. ");}/*** 转发消息监听器* @param in*/@KafkaListener(id = "group2", topics = KafkaConfig.TOPIC_TRANSACTION_FORWARD)public void listenTransactionForward(List<String> in) {log.info("ListenTransactionForward Received: " + in);}@Beanpublic NewTopic topic() {return new NewTopic(KafkaConfig.TOPIC_TRANSACTION, 1, (short) 1);}}
定义发送接口
com.mirson.spring.boot.ansaction.provider.KafkaProviderController
@RestController
public class KafkaProviderController {@Autowiredprivate KafkaTemplate<Object, Object> template;/*** 发送事务消息** @param msg*/@GetMapping(path = "/sendTransaction")public String sendTransaction(String msg) {uteInTransaction(kafkaTemplate -> {StringUtilsmaDelimitedListToSet(msg).stream().map(s -> new DataMsg(s)).forEach(data -> {kafkaTemplate.send(KafkaConfig.TOPIC_TRANSACTION, data);Msg().contains("exception")){throw new RuntimeException("occur exception!");}});return null;});return " send batch msg: " + msg;}}
这里不能采用普通的发送模式, 通过executeInTransaction方法处理与发送事务消息, 为了作验证, 当接收到的消息为exception时会抛出一个异常, 验证事务消息能否正常回滚。
功能验证
发送一批消息, 验证能否正常收发。
调用接口:127.0.0.1:12618/sendTransaction?msg=a,b,c,d,e,f
查看监听器日志:
能够接收到这五条消息, 并且正常转发。
发送一批消息, 包含一条异常消息, 验证监听器能否接收到消息。
调用发送接口: 127.0.0.1:12618/sendTransaction?msg=a,b,c,d,e,f,exception
出现异常, 查看监听器日志:
所有消息全部进行回滚,监听器没有打印任何消息,验证能够正常受到KAFKA事务的控制。
通过Spring-Kafka组件,能够快速实现与Spring Boot的集成,一般项目当中采用的是普通订阅模式, 多路订阅模式更适合具有一定业务共性的队列消息处理, 这里还讲解了死信配置与事务功能, 这些在生产环境当中都有一定价值, 可以帮助我们实现消息的异常监控与补偿, 以及保障批量数据处理的一致性。
本文发布于:2024-01-29 05:19:41,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170647678612977.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |