【Spring Boot 集成应用】Kafka的集成用法

阅读: 评论:0

【Spring Boot 集成应用】Kafka的集成用法

【Spring Boot 集成应用】Kafka的集成用法

1. Kafka集成介绍

Kafka 是一种分布式的,高吞吐率, 基于发布 / 订阅的消息系统。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,如果有大量的消息处理(10W+),对一致性要求不高, 那么可以采用KAFKA, 比如日志采集、 数据同步与归档、实时流数据处理等场景。

Spring 官方为我们提供了Spring-kafka组件, 这里讲解如何与Spring Boot集成使用,并通过代码将全面讲解Kafka普通订阅模式、死信队列配置使用、多路订阅模式以及事务控制的使用。

2. Kafka安装参考
  1. 下载安装Kafka

    这里选取的是kafka的2.3.0版本, 基于scale2.11版本;根据系统环境选择使用。
    kafka下载地址: .3.0/kafka_2.11-2.

  2. 启动命令

    • 先启动zookeeper:

      bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
      

      支持后台模式启动。

    • 启动kafka

      bin/kafka-server-start.sh -daemon  config/server.properties
      
3. Kafka工程搭建配置
  1. 创建工程spring-boot-mq-kafka

  2. MAVEN依赖

    <!-- Spring kafka 组件依赖-->
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
  3. 工程基础配置

    server:port: 12616
    spring:application:name: mq-kafka# kafka 配置kafka:# 指定kafka服务地址bootstrap-servers: :9092
    

    配置服务运行的端、名称以及kafka连接地址信息。

  4. 创建启动类

    com.mirson.spring.boot.mq.kafka.startup.KafkaApplication

    @SpringBootApplication
    @ComponentScan(basePackages = {"com.mirson"})
    public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}
    }
4. Kafka集成之普通订阅模式(消息自适应装配与死信队列配置)
  1. 先将所用的Topic定义为常量

    com.mirson.spring.boot.fig.KafkaConfig

        /*** 普通订阅主题*/public static final String TOPIC_COMMON = "topic_common";public static final String TOPIC_COMMON_SECOND = "topic_common_second";
    

    定义两个TOPIC, 分别为TOPIC_COMMON与TOPIC_COMMON_SECOND,便于演示不同消息的自适应装配。

  2. 定义消息监听器

    com.mirson.spring.boot.sume.KafkaCommonConsumer

    @Configuration
    @Log4j2
    public class KafkaCommonConsumer {/*** JSON 字符串作为消息转换器, 可以支持相同属性不同对象的转换* @return*/@Beanpublic RecordMessageConverter converter() {return new StringJsonMessageConverter();}/*** 死信配置* @param configurer* @param kafkaConsumerFactory* @param template* @return*/@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory,KafkaTemplate<Object, Object> template) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();figure(factory, kafkaConsumerFactory);factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 2)); // dead-letter after 2 triesreturn factory;}/*** 定义第一个监听器* @param dataMsg*/@KafkaListener(id = KafkaConfig.TOPIC_COMMON, topics = KafkaConfig.TOPIC_COMMON)public void listen(DataMsg dataMsg) {log.info("TOPIC_COMMON Received: " + dataMsg);}/*** 创建第一个TOPIC* @return*/@Beanpublic NewTopic topic() {return new NewTopic(KafkaConfig.TOPIC_COMMON, 1, (short) 1);}/*** 定义第二个监听器* @param dataMsg*/@KafkaListener(id = KafkaConfig.TOPIC_COMMON_SECOND, topics = KafkaConfig.TOPIC_COMMON_SECOND)public void listen(DataMsgSecond dataMsg) {log.info("TOPIC_COMMON_SECOND Received: " + dataMsg);if (Msg().startsWith("fail")) {throw new RuntimeException("failed");}}/*** 创建第二个TOPIC* @return*/@Beanpublic NewTopic topicSecond() {return new NewTopic(KafkaConfig.TOPIC_COMMON_SECOND, 1, (short) 1);}/*** 定义第二队列的死信队列监听器* @param in*/@KafkaListener(id = "dltGroup", topics = KafkaConfig.TOPIC_COMMON_SECOND + ".DLT")public void dltListen(String in) {log.info("Received from DLT: " + in);}/*** 创建死信队列TOPIC* @return*/@Beanpublic NewTopic dlt() {return new NewTopic(KafkaConfig.TOPIC_COMMON_SECOND + ".DLT", 1, (short) 1);}}
    • 采用JSON, 作为队列消息转换器, 可以支持具有相同属性的不同对象的转换, 自适应装配转换为可用对象。定义两个VO作演示, DataMsg对象:

      @Data
      public class DataMsg implements Serializable {/*** 消息数据*/private String msg;public DataMsg() {}public DataMsg(String msg) {this.msg = msg;}}
      

      再定义一个VO对象, 名称为DataMsgSecond, 与DataMsg属性结构一致。

    • kafkaListenerContainerFactory方法设置死信配置, 当接收端出现异常时, 如果超过指定重试次数(这里设置2次), 将会判定为消亡的信息,如果有配置DLT队列, 会发送一条消息至该队列,可以通过补偿机制处理。

    • 接下来分别定义两个队列的的主题与监听器, 通过日志打印接收数据, 便于跟踪; 第二个监听器, 当收到“fail"消息时, 会抛出异常, 用于验证消亡信息功能。

    • 最后, 配置死信队列与监听器, 队列名称规范, 后缀必须为”.DLT", 可以接收到消亡信息。

  3. 定义发送接口

    com.mirson.spring.boot.mq.kafka.provider.KafkaProviderController

        /*** 发送普通消息* @param topic* @param msg* @return*/@GetMapping(path = "/sendCommonMsg")public String sendCommonMsg(String topic, String msg) {plate.send(topic, new DataMsg(msg));return "send topic: " + topic + ", msg: " + msg;}
    

    通过参数, 来指定发送的主题以及消息数据。

  4. 功能验证

    • 验证能否正常收发消息

      调用发送接口, 指定发送主题为TOPIC_COMMON,数据内容为test

      127.0.0.1:12616/sendCommonMsg?topic=topic_common&msg=test

      消息成功发送, 查看监听器日志:

      接收到指定发送的消息。

    • 验证第二个队列,能否正常接收不同对象, 相同属性的消息:

      调用发送接口,指定发送主题为TOPIC_COMMON_SECOND,数据内容为test

      127.0.0.1:12616/sendCommonMsg?topic=topic_common_second&msg=test

      查看监听器日志:

      第二个队列, 能够根据属性自适应装配为DataMsgSecond对象, 正常打印出消息内容。

    • 死信功能验证

      需要伪造一个异常消息, 抛出异常, 触发消亡消息。

      调用接口, 发送一个fail数据:

      127.0.0.1:12616/sendCommonMsg?topic=topic_common_second&msg=fail

      查看监听日志:

      接收到了两次发送的消息, 与死信的失败配置, 保证最多接收两次是一致的;

      并且从DLT死信队列中, 接收到了发送的数据。

5. Kafka集成之多路订阅模式

多路订阅模式是指在监听器中指定多个主题队列,通过不同方法来接收处理消息数据。

  1. JavaConfig配置

    com.mirson.spring.boot.fig.KafkaMultipleConfiguration

    @Configuration
    public class KafkaMultipleConfiguration {/*** 定义消息转换器,采用StringJsonMessageConverter, 指定对象映射关系* @return*/@Beanpublic RecordMessageConverter converter() {StringJsonMessageConverter converter = new StringJsonMessageConverter();DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);typeMapper.addTrustedPackages("*");Map<String, Class<?>> mappings = new HashMap<>();mappings.put("multiApp", MultiApp.class);mappings.put("multiWeb", MultiWeb.class);typeMapper.setIdClassMapping(mappings);converter.setTypeMapper(typeMapper);return converter;}/*** 创建APP主题* @return*/@Beanpublic NewTopic app() {return new NewTopic(KafkaConfig.TOPIC_MULTI_APP, 1, (short) 1);}/*** 创建WEB主题* @return*/@Beanpublic NewTopic web() {return new NewTopic(KafkaConfig.TOPIC_MULTI_WEB, 1, (short) 1);}}
    • 实际业务当中, 传递的JSON对象比较复杂, 可能会出现内嵌多个层级的对象, 这时候就需要在StringJsonMessageConverter转换器中指定,属性与对象的映射关系,这里是消费端的配置, 发送端可以在配置文件中定义:

      spring:application:name: mq-kafka# kafka 配置kafka:# 指定kafka服务地址bootstrap-servers: :9092# 指定生产者采用JSON序列化producer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializerproperties:pe.mapping: multiApp:com.mirson.spring.boot.mq.kafka.vo.MultiApp,multiWeb:com.mirson.spring.boot.mq.kafka.vo.MultiWeb
      

      这里指定了两个对象,一个是multiApp, 另一个是multiWeb。

    • 创建了两个主题, 分别是APP与WEB, 在多路订阅中, 一个处理APP类型消息, 另一个处理WEB类型消息。

    • 在使用前, 将原普通订阅模式的配置注解注释掉, 避免出现重复配置不能启动问题。

      com.mirson.spring.boot.sume.KafkaCommonConsumer

      // @Configuration
      @Log4j2
      public class KafkaCommonConsumer {...
      }
      
  2. 定义多路监听消费端

    com.mirson.spring.boot.sume.kafkaMultipleConsumer

    @Component
    @KafkaListener(id = "multiGroup", topics = {KafkaConfig.TOPIC_MULTI_APP, KafkaConfig.TOPIC_MULTI_WEB })
    @Log4j2
    public class kafkaMultipleConsumer {@KafkaHandlerpublic void multiApp(MultiApp app) {log.info("MultiApp Received: " + app);}@KafkaHandlerpublic void multiWeb(MultiWeb web) {log.info("MultiWeb Received: " + web);}@KafkaHandler(isDefault = true)public void unknown(Object object) {log.info("Received unknown: " + object);}}

    订阅了两个主题, KafkaConfig.TOPIC_MULTI_APP与KafkaConfig.TOPIC_MULTI_WEB, 定义了三个方法:

    multiApp接收处理TOPIC_MULTI_APP消息; multiWeb接收处理TOPIC_MULTI_WEB消息; unknown处理发往这两个主题, 但不符合数据类型的消息。

  3. 定义发送接口

    com.mirson.spring.boot.mq.kafka.provider.KafkaProviderController

       /*** 发送多路订阅消息* @param msg* @return*/@GetMapping(path = "/sendMultiMsg")public String sendMultiMsg(String msg) {ains("app")) {// APP主题发送消息plate.send(KafkaConfig.TOPIC_MULTI_APP, new MultiApp(msg));}else ains("web")){// WEB主题发送消息plate.send(KafkaConfig.TOPIC_MULTI_WEB, new MultiWeb(msg));}else {// 默认, 往WEB主题发送字符消息plate.send(KafkaConfig.TOPIC_MULTI_WEB, msg);}return "send msg: " + msg;}
    

    通过参数来控制, 便于测试验证。

  4. 功能验证

    • 发送一个APP消息

      调用接口:127.0.0.1:12616/sendMultiMsg?msg=app123

      监听打印日志:

      MultiApp监听器, 正常接收消息。

    • 发送一个WEB消息

      调用接口: 127.0.0.1:12616/sendMultiMsg?msg=web123

      查看监听日志:

      MultiWeb监听器, 正常接收消息。

    • 发送一个未知类型消息

      调用接口: 127.0.0.1:12616/sendMultiMsg?msg=123

      查看监听日志:

      进入了多路监听器的默认方法, 将消息内容正常打印。

6 Kafka集成之事务功能

为避免kafka事务消息配置对原工程造成的冲突, 重新创建工程演示。

  1. 创建工程spring-boot-mq-kafka-transaction

  2. 工程配置
    l

server:port: 12618
spring:application:name: mq-kafka# kafka 配置kafka:# 指定kafka服务地址bootstrap-servers: :9092# 指定生产者采用JSON序列化producer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializertransaction-id-prefix: tx.# 指定接收线程数量listener:concurrency: 3# 消费者事务级别设置consumer:properties:isolation.level: read_committed
  • 采用事务消息, 要加入transaction-id-prefix, 事务ID前缀标识
  • 消费者事务隔离级别设置为只读, read_committed。
  1. 定义两个主题常量

    一个用于接收事务消息,另一个用于转发处理后的事务消息。

    com.mirson.spring.boot.fig.KafkaConfig

    public static final String TOPIC_TRANSACTION= "topic_transaction";public static final String TOPIC_TRANSACTION_FORWARD = "topic_transaction_forward";
    
  2. 定义消息监听器

    com.mirson.spring.boot.sume.KafkaTransactionConsumer

    @Configuration
    @Log4j2
    public class KafkaTransactionConsumer {/*** 设置KAFKA监听工厂, 支持批量消息处理* @param configurer* @param kafkaConsumerFactory* @param template* @return*/@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory,KafkaTemplate<Object, Object> template) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();figure(factory, kafkaConsumerFactory);factory.setBatchListener(true);factory.setMessageConverter(batchConverter());return factory;}/*** 设置消息转换器* @return*/@Beanpublic RecordMessageConverter converter() {return new StringJsonMessageConverter();}/*** 设置批量消息处理器* @return*/@Beanpublic BatchMessagingMessageConverter batchConverter() {return new BatchMessagingMessageConverter(converter());}@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 事务消息监听器* @param dataMsgs* @throws IOException*/@KafkaListener(id = "group1", topics = KafkaConfig.TOPIC_TRANSACTION)public void listenTransaction(List<DataMsg> dataMsgs) throws IOException {log.info("ListenTransaction Received: " + dataMsgs);dataMsgs.forEach(f -> kafkaTemplate.send(KafkaConfig.TOPIC_TRANSACTION_FORWARD, f.getMsg()+" commit. "));log.info("ListenTransaction message commit and forward. ");}/*** 转发消息监听器* @param in*/@KafkaListener(id = "group2", topics = KafkaConfig.TOPIC_TRANSACTION_FORWARD)public void listenTransactionForward(List<String> in) {log.info("ListenTransactionForward Received: " + in);}@Beanpublic NewTopic topic() {return new NewTopic(KafkaConfig.TOPIC_TRANSACTION, 1, (short) 1);}}
    • 设置KAFKA监听工厂, 支持事务批量消息处理。
    • 设置事务消息监听器, 这里可以结合本地事务作具体业务处理, 事务消息处理完成之后, 再转发至TOPIC_TRANSACTION_FORWARD监听队列。
  3. 定义发送接口

    com.mirson.spring.boot.ansaction.provider.KafkaProviderController

    @RestController
    public class KafkaProviderController {@Autowiredprivate KafkaTemplate<Object, Object> template;/*** 发送事务消息** @param msg*/@GetMapping(path = "/sendTransaction")public String sendTransaction(String msg) {uteInTransaction(kafkaTemplate -> {StringUtilsmaDelimitedListToSet(msg).stream().map(s -> new DataMsg(s)).forEach(data -> {kafkaTemplate.send(KafkaConfig.TOPIC_TRANSACTION, data);Msg().contains("exception")){throw new RuntimeException("occur exception!");}});return null;});return " send batch msg: " + msg;}}
    

    这里不能采用普通的发送模式, 通过executeInTransaction方法处理与发送事务消息, 为了作验证, 当接收到的消息为exception时会抛出一个异常, 验证事务消息能否正常回滚。

  4. 功能验证

    • 发送一批消息, 验证能否正常收发。

      调用接口:127.0.0.1:12618/sendTransaction?msg=a,b,c,d,e,f

      查看监听器日志:

      能够接收到这五条消息, 并且正常转发。

    • 发送一批消息, 包含一条异常消息, 验证监听器能否接收到消息。

      调用发送接口: 127.0.0.1:12618/sendTransaction?msg=a,b,c,d,e,f,exception

      出现异常, 查看监听器日志:

      所有消息全部进行回滚,监听器没有打印任何消息,验证能够正常受到KAFKA事务的控制。

7. 总结

通过Spring-Kafka组件,能够快速实现与Spring Boot的集成,一般项目当中采用的是普通订阅模式, 多路订阅模式更适合具有一定业务共性的队列消息处理, 这里还讲解了死信配置与事务功能, 这些在生产环境当中都有一定价值, 可以帮助我们实现消息的异常监控与补偿, 以及保障批量数据处理的一致性。

本文发布于:2024-01-29 05:19:41,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170647678612977.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:Spring   Boot   Kafka
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23