使用java操作kafka订阅新增Topic踩坑

阅读: 评论:0

使用java操作kafka订阅新增Topic踩坑

使用java操作kafka订阅新增Topic踩坑

我面对的场景是:需要动态的订阅新增的topic。即在程序运行的时候轮询查看kafka服务器查看是否有符合规则的新的topic,如果有的话,把它加入到订阅列表。

  1. 第一种是使用spring-kafka自带注解 @KafkaListener的topicPattern参数,这里传入合适的正则表达式。然后程序本身就会去轮询(网上有代码使用,说测试的轮询时间是2min,但是我测试是5min,目前还没找到修改时间的地方)。这个正则跟普通要求的正则不太一样,比如不能使用* 开头。
//匹配 以test开头的topic,注意*前面需要连接一个"."
@KafkaListener(topicPattern = "test.*")

轮询的日志不会在控制台打印出来(我控制台使用的是info等级看不到,有可能在debug里面),但是如果检查到了新增的topic重新订阅就能在控制台看到。
我的代码

@Component
public class KafkaConsumer {// 消费监听,单条消息监听@KafkaListener(topics = {"yu-test"})public void onMessage(ConsumerRecord<String, String> record) {// 消费的哪个topic、partition的消息,打印出消息内容System.out.println("简单消费:" + pic() + "-" + record.partition() + "-" + record.value());}@KafkaListener(topicPattern = "test.*")public void onMessage3(ConsumerRecord<String, String> record) {// 消费的哪个topic、partition的消息,打印出消息内容System.out.println("简单消费:" + pic() + "-" + record.partition() + "-" + record.value());}//批量监听 pe=batch//@KafkaListener(topics = {"yu-test"})public void onMessage(List<ConsumerRecord<String, String>> records) {for (ConsumerRecord<String, String> record : records) {onMessage(record);}}
}

一些关键的日志信息

04-02 10:43:51.326 INFO  [o.a.sumer.KafkaConsumer] - [Consumer clientId=consumer-defaultGroupId-1, groupId=defaultGroupId] Subscribed to pattern: 'test.*'
04-02 10:43:51.342 INFO  [o.a.sumer.KafkaConsumer] - [Consumer clientId=consumer-defaultGroupId-2, groupId=defaultGroupId] Subscribed to topic(s): yu-test
....等等日志
04-02 10:43:51.407 INFO  [o.s.k.l.KafkaMessageListenerContainer] - defaultGroupId: partitions assigned: [testyu3-0, testyu2-0, testwe3-0, testwe2-0, testwe1-0, testwe-0, testyu-0, testwe9-0, testwe8-0, testwe7-0]
04-02 10:43:51.407 INFO  [o.s.k.l.KafkaMessageListenerContainer] - defaultGroupId: partitions assigned: [yu-test-0]

这个时候大概是有10个topic,时间是10:43:51
然后我创建了一个新的topic

D:Kafkakafka_2.12-2.8.0>.binwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test888
Created topic test888.

过去了漫长的时间后,终于出现了新的日志信息(时间跨度大概是五分钟 10:48:51)

04-02 10:43:51.407 INFO  [o.s.k.l.KafkaMessageListenerContainer] - defaultGroupId: partitions assigned: [testyu3-0, testyu2-0, testwe3-0, testwe2-0, testwe1-0, testwe-0, testyu-0, testwe9-0, testwe8-0, testwe7-0]
04-02 10:43:51.407 INFO  [o.s.k.l.KafkaMessageListenerContainer] - defaultGroupId: partitions assigned: [yu-test-0]
04-02 10:48:51.643 INFO  [o.i.ConsumerCoordinator] - [Consumer clientId=consumer-defaultGroupId-1, groupId=defaultGroupId] Requesting to re-join the group and trigger rebalance since the subscription has changed from [testwe7, testyu3, testyu2, testwe3, testwe2, testwe1, testwe, testyu, testwe9, testwe8] to [testwe7, test888, testyu3, testyu2, testwe3, testwe2, testwe1, testwe, testyu, testwe9, testwe8]
04-02 10:48:51.814 INFO  [o.i.ConsumerCoordinator] - [Consumer clientId=consumer-defaultGroupId-1, groupId=defaultGroupId] Requesting to re-join the group and trigger rebalance since the subscription has changed from [testwe7, testyu3, testyu2, testwe3, testwe2, testwe1, testwe, testyu, testwe9, testwe8] to [testwe7, test888, testyu3, testyu2, testwe3, testwe2, testwe1, testwe, testyu, testwe9, testwe8]
04-02 10:48:52.158 INFO  [o.i.ConsumerCoordinator] - [Consumer clientId=consumer-defaultGroupId-1, groupId=defaultGroupId] Requesting to re-join the group and trigger rebalance since the subscription has changed from [testwe7, testyu3, testyu2, testwe3, testwe2, testwe1, testwe, testyu, testwe9, testwe8] to [testwe7, test888, testyu3, testyu2, testwe3, testwe2, testwe1, testwe, testyu, testwe9, testwe8]
04-02 10:48:52.159 INFO  [o.i.ConsumerCoordinator] - [Consumer clientId=consumer-defaultGroupId-1, groupId=defaultGroupId] Revoke previously assigned partitions testyu3-0, testyu2-0, testwe3-0, testwe2-0, testwe1-0, testwe-0, testyu-0, testwe9-0, testwe8-0, testwe7-0
....大概是这样的一串日志,触发重平衡,平衡到到达最新的
04-02 10:48:52.685 INFO  [o.s.k.l.KafkaMessageListenerContainer] - defaultGroupId: partitions assigned: [test888-0, testyu3-0, testyu2-0, testwe3-0, testwe2-0, testwe1-0, testwe-0, testyu-0, testwe9-0, testwe8-0, testwe7-0]

如果五分钟内一直没有topic新增的话,控制台就很干净…
这个使用的注意事项是:
1. 消息在控制台未刷新重订阅的日志前发送,则收不到原来的消息。后面你发送的消息可以收到
2. 轮询的时间大概是五分钟,有点长,目前也没找到调整的办法
3. 正则规则跟我们常见的不一样需要注意
4. 源码里面标注,topicPattern和topics和topicPartitions是互斥的,即不能同时使用。

  1. 第二种方法就是自己手动定义KafkaConsumer,自己启动一个定时器进行轮询,如果查到有新的topic,然后手动重新调用订阅方法进行新的订阅。这里也有几个注意事项(我自己踩得坑)
    我的代码
@Testpublic void kafka动态订阅() throws ExecutionException, InterruptedException {String myTestTopic = "test";String bootstrapConfig = "localhost:9092";Properties cProp = new Properties();cProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapConfig);cProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafkamon.serialization.StringDeserializer");cProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafkamon.serialization.StringDeserializer");cProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");cProp.setProperty(CommonClientConfigs.GROUP_ID_CONFIG, groupId);//第一次订阅KafkaConsumer<String, String> consumer = consume(cProp, Patternpile(myTestTopic + ".*"));//轮询查看是否有新topicScheduledExecutorService threadPool = wScheduledThreadPool(1);//定时方法有scheduleAtFixedRate()和scheduleWithFixedDelay,区别大概是一个会加上程序执行的时间计算下一个周期,一个是不管程序执行时间有多长,周期时间是固定的//我这里程序运行时,第一次五秒开始执行,后面每10秒轮询一次threadPool.scheduleAtFixedRate(() -> {log.info("开始循环...");//获取当前订阅的topic信息Set<String> oldTopics = consumer.subscription();//手动获取kafka里面最新的topic,过滤出符合要求的topicSet<String> groupTopics = consumer.listTopics().keySet().stream().filter(e -> e.startsWith(myTestTopic)).Set());//进行比较,如果出现了新的topic则进行重新订阅。for (String topic : groupTopics) {if (!ains(topic)) {consumer.subscribe(Patternpile(myTestTopic + ".*"));//使用poll方法,也代表开始消费,即真正加入到订阅consumer.poll(Duration.ofSeconds(1));//我在这里卡住过...下面将卡住的原因log.info("我又卡住了?");break;}}}, 5, 10, TimeUnit.SECONDS);while (true) {Thread.yield();}}public KafkaConsumer<String, String> consume(Properties cProp, Pattern pattern) {KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(cProp);//订阅初始化kafkaConsumer.subscribe(pattern);//如果不使用poll方法的话,是不会建立消费的连接的。所以我这里调用了一下poll方法kafkaConsumer.poll(Duration.ZERO);//手动提交一下消费的长度,不然如果程序意外终止,还是判断你并没有开始消费kafkaConsumermitSync();return kafkaConsumer;}

我碰到的问题是第二次使用consumer.subscribe();方法会卡住
这个订阅方法有两种类型的重载

	//入参是Pattern类型@Overridepublic void subscribe(Pattern pattern) {subscribe(pattern, new NoOpConsumerRebalanceListener());}//入参是集合类型@Overridepublic void subscribe(Collection<String> topics) {subscribe(topics, new NoOpConsumerRebalanceListener());}//这里我们把assign放到一起@Overridepublic void assign(Collection<TopicPartition> partitions) {acquireAndEnsureOpen();try {if (partitions == null) {throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");} else if (partitions.isEmpty()) {this.unsubscribe();} else {for (TopicPartition tp : partitions) {String topic = (tp != null) ? tp.topic() : null;if (topic == null || im().isEmpty())throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");}fetcher.clearBufferedDataForUnassignedPartitions(partitions);// make sure the offsets of topic partitions the consumer is unsubscribing from// are committed since there will be no following rebalanceif (coordinator != dinator.maybeAutoCommitOffsetsAsync(time.milliseconds());log.info("Subscribed to partition(s): {}", Utils.join(partitions, ", "));if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))questUpdateForNewTopics();}} finally {release();}}

这三种方法其实就是对应了注解里面的三个参数,即他们是互斥的。笔者卡住的原因就是第一次订阅的时候调用的Pattern的入参,第二次订阅的时候把最新查到的groupTopics传入了进去,调用的Collection的入参,所以程序一直卡住。
assign方法是自己指定消费topic的哪个分区,所以也是相斥的(严谨一点,我没有做测试,但是看源码里面的注释应该是这样。有兴趣的可以自己尝试)

注意事项是:
使用subscribe()方法,应该上一次poll之后,因为poll是拉取信息,如果你一边拉取消息,一边重新订阅,在还没有重平衡的情况下,程序也会卡住(我在生产环境的时候试验过)。

本文发布于:2024-02-04 10:14:45,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170704875954673.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:操作   java   kafka   踩坑   Topic
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23