这里采用了zookeeper作为保存的地址,就是实时更新偏移量属性。再job挂掉后重新拉取偏移量保存下来
就能一次消费啦,但真正做到一次消费必须和业务场景结合来做,比如事务。
废话不多说啦,我本地实现了一个小demo
<dependency><groupId>com.wehotel</groupId><artifactId>commons</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table_2.11</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.3</version></dependency><dependency><groupId>org.apachemons</groupId><artifactId>commons-pool2</artifactId><version>2.4.2</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><!--基于scala-logging和logback的日志打印模板,其中logback是一个更高效/更优于log4j的日志打印框架,目前正逐渐替代log4j的位置,以下为实现日志打印的几个步骤:--><dependency><groupId&pesafe.scala-logging</groupId><artifactId>scala-logging_2.11</artifactId><version>3.7.2</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.3</version></dependency><!-- .apache.curator/curator-framework --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>2.12.0</version></dependency>
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.ExponentialBackoffRetry;
import org.apache.peinfo.TypeHint;
import org.apache.peinfo.TypeInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.vironment.StreamExecutionEnvironment;
import org.apache.tors.kafka.FlinkKafkaConsumer011;
import org.apache.tors.kafka.FlinkKafkaConsumerBase;
import org.apache.tors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import keeper.CreateMode;
import scala.Tuple2;import java.io.IOException;
import java.util.*;
/*** <Description>** @author enjian* @version 1.0* @taskId:* @createDate 2020/04/01 10:10* @see ""*/
public class KafkaSource {private String topic;private String message;private Integer partition;private Long offset;public String getTopic() {return topic;}public void setTopic(String topic) {pic = topic;}public String getMessage() {return message;}public void setMessage(String message) {ssage = message;}public Integer getPartition() {return partition;}public void setPartition(Integer partition) {this.partition = partition;}public Long getOffset() {return offset;}public void setOffset(Long offset) {this.offset = offset;}
}
*** <Description>** @author enjian* @version 1.0* @taskId:* @createDate 2020/03/31 11:35* @see ""*/
public class ZKUtils {//会话超时时间private static final int SESSION_TIMEOUT = 30 * 1000;//连接超时时间private static final int CONNECTION_TIMEOUT = 3 * 1000;//ZooKeeper服务地址private static final String CONNECT_ADDR = "xxxxx";//创建连接实例private static CuratorFramework client ;public static void main(String[] args) throws Exception {//1 重试策略:初试时间为1s 重试10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);//2 通过工厂创建连接client = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).connectionTimeoutMs(CONNECTION_TIMEOUT).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy)
// .namespace("super") //命名空间.build();//3 开启连接client.start();StreamExecutionEnvironment flinkEnv = changeEnv();Tuple2<HashMap<KafkaTopicPartition, Long>, Boolean> kafkaOffset = getFromOffsets("tripGoodsCA00001", "test");FlinkKafkaConsumer011<KafkaSource> ds = createKafkaSource("tripGoodsCA00001", "test");FlinkKafkaConsumerBase flinkKafkaConsumerBase = ds.setStartFromLatest();// 如果kafka不为空的话,从这里开始执行if (kafkaOffset._2){System.out.println("----------------------zookeeper manager offsets-----------------------------------");Map<KafkaTopicPartition, Long> specificStartOffsets = kafkaOffset._1;flinkKafkaConsumerBase = ds.setStartFromSpecificOffsets(specificStartOffsets);}DataStreamSource<KafkaSource> tetsds = flinkEnv.addSource(flinkKafkaConsumerBase);tetsds.print();// tetsds.print();ute("test");}public static void ensureZKExists(String zkTopicPath) {try {if (client.checkExists().forPath(zkTopicPath) == null) {//zk中没有没写过数据,创建父节点,也就是会递归创建ate().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) // 节点类型.forPath(zkTopicPath);}} catch (Exception e) {e.printStackTrace();}}public static void storeOffsets(HashMap<String, Long> offsetRange, String topic, String group) {String zkTopicPath = String.format("/offsets/%s/%s", topic,group);Iterator<Map.Entry<String, Long>> setoffsetrange = Set().iterator();while (setoffsetrange.hasNext()) {Map.Entry<String, Long> offsethas = ();//partitionString path = String.format("%s/%s", zkTopicPath, Key());ensureZKExists(path);try {client.setData().forPath(path, (Value() + "").getBytes());} catch (Exception e) {e.printStackTrace();}}}/*** 从zookeeper中读取kafka对应的offset* @param topic* @param group* @return Tuple2<HashMap<TopicPartition, Long>, Boolean>*/public static Tuple2<HashMap<KafkaTopicPartition, Long>, Boolean> getFromOffsets(String topic, String group) {Tuple2<HashMap<KafkaTopicPartition, Long>, Boolean> returnTuple2 = null;///xxxxx/offsets/topic/group/partition/String zkTopicPath = String.format("/offsets/%s/%s", topic,group);ensureZKExists(zkTopicPath);HashMap<KafkaTopicPartition, Long> offsets = new HashMap<KafkaTopicPartition, Long>();try {List<String> partitions = Children().forPath(zkTopicPath);for (String partition : partitions) {
// System.out.println(new Data().forPath(String.format("%s/%s", zkTopicPath,partition))));Long offset = Long.valueOf(new Data().forPath(String.format("%s/%s", zkTopicPath,partition))));KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, Integer.valueOf(partition));offsets.put(topicPartition, offset);}if (offsets.isEmpty()) {return new Tuple2<>(offsets, false);} else {return new Tuple2<>(offsets, true);}} catch (Exception e) {e.printStackTrace();}//如果有直接读取对应的数据return returnTuple2;}public static Properties getKafkaProperties(String groupId) {Properties properties = new Properties();properties.setProperty("bootstrap.servers", "");
// properties.setProperty(t", getStrValue(new Constants().KAFKA_ZOOKEEPER_LIST));properties.setProperty("group.id", groupId);return properties;}public static Properties getProduceKafkaProperties() {Properties properties = new Properties();properties.setProperty("bootstrap.servers", "");properties.setProperty("key.serializer", "org.apache.kafkamon.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");return properties;}public static StreamExecutionEnvironment changeEnv(){final StreamExecutionEnvironment env = ExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Config().enableForceKryo();//启用检查点,设置检查点的最小间隔为5000ms
// env.setStateBackend(new RocksDBStateBackend(chkPointPath));ableCheckpointing(600000);//设置一致性级别为CheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置检查点超时间,如果在超时后,丢弃这个检查点,默认是10分钟CheckpointConfig().setCheckpointTimeout(60000);//设置快照失败后任务继续正常执行CheckpointConfig().setFailOnCheckpointingErrors(false);//设置并发检查点数量为CheckpointConfig().setMaxConcurrentCheckpoints(1);return env;}/*** 创建kafka的source* @param topic* @param groupid* @return*/public static FlinkKafkaConsumer011<KafkaSource> createKafkaSource(String topic, String groupid){// kafka消费者配置FlinkKafkaConsumer011<KafkaSource> dataStream = new FlinkKafkaConsumer011<KafkaSource>(topic, new KeyedDeserializationSchema<KafkaSource>() {@Overridepublic TypeInformation<KafkaSource> getProducedType() {return TypeInformation.of(new TypeHint<KafkaSource>() {});}@Overridepublic KafkaSource deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {KafkaSource kafkasource = new KafkaSource();kafkasource.setTopic(topic);kafkasource.String());kafkasource.setPartition(partition);kafkasource.setOffset(offset);HashMap<String,Long> partitionAndOffset = new HashMap<>();partitionAndOffset.put(String.valueOf(partition),offset);storeOffsets(partitionAndOffset,topic,groupid);return kafkasource;}@Overridepublic boolean isEndOfStream(KafkaSource s) {return false;}}, getKafkaProperties(groupid));//设置消息的起始位置的偏移量,最晚的记录开始启动dataStream.setStartFromLatest();//自动提交offset
// dataStream.setCommitOffsetsOnCheckpoints(true);return dataStream;}
[zk: localhost:2181(CONNECTED) 36] get /offsets/tripGoodsCA00001/test/0
160231645
cZxid = 0x300049e7e
ctime = Wed Apr 01 11:34:51 CST 2020
mZxid = 0x30004ac85
mtime = Wed Apr 01 13:41:05 CST 2020
pZxid = 0x300049e7e
cversion = 0
dataVersion = 449
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
[zk: localhost:2181(CONNECTED) 37] get /offsets/tripGoodsCA00001/test/0
160231645
cZxid = 0x300049e7e
ctime = Wed Apr 01 11:34:51 CST 2020
mZxid = 0x30004ac85
mtime = Wed Apr 01 13:41:05 CST 2020
pZxid = 0x300049e7e
cversion = 0
dataVersion = 449
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
7.总结
在使用flink做项目时候,刚开始一个人摸索全新框架难免会碰壁,跑到官方文档慢慢查询需要的状态改动,调试,慢慢的,所有的难题都会被解决,此次做的目的为了我flink反爬虫项目铺垫的,之前的strom方式太老化了,经过1个月的摸索,优化更新,终于实现了scala版本和java版本迭代,为什么是2个版本呢,也是为了提升编码能力和两种实现方式的摸索,很有幸去尝试将flink嵌入到微服务应用里。
博客写的不多,虚心学习,我们江湖见!
本文发布于:2024-01-31 18:13:30,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170669601330417.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |