flink 自定义Bucketer 实现写入hdfs对应的实时分区

阅读: 评论:0

flink 自定义Bucketer 实现写入hdfs对应的实时分区

flink 自定义Bucketer 实现写入hdfs对应的实时分区

MyDateTimeBucketer代码如下:

import org.apache.tors.fs.Clock;
import org.apache.tors.fs.bucketing.Bucketer;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.Path;import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;public class MyDateTimeBucketer<T> implements Bucketer<T> {private static final long serialVersionUID = 1L;private static final String DEFAULT_FORMAT_STRING = "yyyyMMddHH";private final String formatString;private final ZoneId zoneId;private transient DateTimeFormatter dateTimeFormatter;public MyDateTimeBucketer() {this("yyyyMMddHH");}public MyDateTimeBucketer(String formatString) {this(formatString, ZoneId.systemDefault());}public MyDateTimeBucketer(ZoneId zoneId) {this("yyyyMMddHH", zoneId);}public MyDateTimeBucketer(String formatString, ZoneId zoneId) {this.formatString = (String) Preconditions.checkNotNull(formatString);Id = (ZoneId)Preconditions.checkNotNull(zoneId);this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId);}private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {in.defaultReadObject();this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).Id);}public Path getBucketPath(Clock clock, Path basePath, T element) {String newDateTimeString = this.dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis()));//这里自定义目录return new Path(basePath + "/" +"data="+newDateTimeString);}public String toString() {return "DateTimeBucketer{formatString='" + this.formatString + ''' + ", zoneId=" + Id + '}';}}

在flink里引用如下:

BucketingSink<String> bucketingSink = new BucketingSink<>(HDFS_PATH);
bucketingSink.setBucketer(new MyDateTimeBucketer());

本文发布于:2024-01-30 13:25:24,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170659232520322.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:自定义   分区   实时   flink   hdfs
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23