MyDateTimeBucketer代码如下:
import org.apache.tors.fs.Clock;
import org.apache.tors.fs.bucketing.Bucketer;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.Path;import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;public class MyDateTimeBucketer<T> implements Bucketer<T> {private static final long serialVersionUID = 1L;private static final String DEFAULT_FORMAT_STRING = "yyyyMMddHH";private final String formatString;private final ZoneId zoneId;private transient DateTimeFormatter dateTimeFormatter;public MyDateTimeBucketer() {this("yyyyMMddHH");}public MyDateTimeBucketer(String formatString) {this(formatString, ZoneId.systemDefault());}public MyDateTimeBucketer(ZoneId zoneId) {this("yyyyMMddHH", zoneId);}public MyDateTimeBucketer(String formatString, ZoneId zoneId) {this.formatString = (String) Preconditions.checkNotNull(formatString);Id = (ZoneId)Preconditions.checkNotNull(zoneId);this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).withZone(zoneId);}private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {in.defaultReadObject();this.dateTimeFormatter = DateTimeFormatter.ofPattern(this.formatString).Id);}public Path getBucketPath(Clock clock, Path basePath, T element) {String newDateTimeString = this.dateTimeFormatter.format(Instant.ofEpochMilli(clock.currentTimeMillis()));//这里自定义目录return new Path(basePath + "/" +"data="+newDateTimeString);}public String toString() {return "DateTimeBucketer{formatString='" + this.formatString + ''' + ", zoneId=" + Id + '}';}}
在flink里引用如下:
BucketingSink<String> bucketingSink = new BucketingSink<>(HDFS_PATH);
bucketingSink.setBucketer(new MyDateTimeBucketer());
本文发布于:2024-01-30 13:25:24,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170659232520322.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |