Iceberg源码学习:flink写iceberg四种TaskWriter区别

阅读: 评论:0

Iceberg源码学习:flink写iceberg四种TaskWriter区别

Iceberg源码学习:flink写iceberg四种TaskWriter区别

目录

  • 开始
  • 继承关系
    • 关系图
    • 调用流程
  • 底层调用
  • 实例与过程分析
    • 开始
    • 实例
    • 总结

开始

flink写iceberg时,IcebergStreamWriter的open()方法中,会调用ate(),会创建四种类型的写(UnpartitionedDeltaWriter/UnpartitionedWriter/PartitionedDeltaWriter/RowDataPartitionedFanoutWriter),本文主要追踪这四种类型的写。
其中,IcebergStreamWriter.open()方法:

    public void open() {this.subTaskId = RuntimeContext().getIndexOfThisSubtask();this.attemptId = RuntimeContext().getAttemptNumber();this.taskWriterFactory.initialize(this.subTaskId, this.attemptId);this.writer = ate();}
    public TaskWriter<RowData> create() {Preconditions.checkNotNull(this.outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize().");if (this.equalityFieldIds != null && !this.equalityFieldIds.isEmpty()) {return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds) : new PartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds));} else {return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes) : new RowDataTaskWriterFactory.RowDataPartitionedFanoutWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema));}}

继承关系

此方法中根据是否指定字段,构造分区写(PartitionedDeltaWriter/RowDataPartitionedFanoutWriter)和非分区写实例(UnpartitionedDeltaWriter/UnpartitionedWriter)

关系图


从图中可以看出,几种类型的写均继承自BaseTaskWriter抽象类。区别在于 Partitioned方式的写需要处理一些分区 Key 生成的逻辑。
其中:

  1. TaskWriter/BaseTaskWriter/UnpartitionedWriter/PartitionedWriter/RowDataPartitionedFanoutWriter均在org.apache.iceberg.io 这个包,这里面的类或接口都是在 iceberg-core 模块中,这里面定义了 Iceberg 写数据的公共逻辑;
  2. PartitionedDeltaWriter和UnpartitionedDeltaWriter是在org.apache.iceberg.flink.sink包中,flink connector模块实现的。

以上类均实现TaskWriter接口:

public interface TaskWriter<T> extends Closeable {void write(T var1) throws IOException;void abort() throws IOException;default DataFile[] dataFiles() throws IOException {WriteResult result = thisplete();Preconditions.checkArgument(result.deleteFiles() == null || result.deleteFiles().length == 0, "Should have no delete files in this write result.");return result.dataFiles();}WriteResult complete() throws IOException;
}

BaseTaskWriter继承自TaskWriter,内部类RollingFileWriter(以上几种最终调用的RollingFileWriter.write())继承自内部类BaseRollingWriter,创建BaseRollingWriter时,调用openCurrent(),会生成DataWriter。BaseRollingWriter的write()方法会调用datawriter.add(),最后调用FileAppender的add()方法进行数据的写入工作。

    private abstract class BaseRollingWriter<W extends Closeable> implements Closeable {private static final int ROWS_DIVISOR = 1000;private final PartitionKey partitionKey;private EncryptedOutputFile currentFile;private W currentWriter;private long currentRows;private BaseRollingWriter(PartitionKey partitionKey) {this.currentFile = null;this.currentWriter = null;this.currentRows = 0L;this.partitionKey = partitionKey;this.openCurrent();}abstract W newWriter(EncryptedOutputFile var1, PartitionKey var2);abstract long length(W var1);abstract void write(W var1, T var2);abstract void complete(W var1);public void write(T record) throws IOException {this.write(this.currentWriter, record);++this.currentRows;if (this.shouldRollToNewFile()) {this.closeCurrent();this.openCurrent();}}private void openCurrent() {if (this.partitionKey == null) {this.currentFile = BaseTaskWriter.wOutputFile();} else {this.currentFile = BaseTaskWriter.wOutputFile(this.partitionKey);}this.currentWriter = wWriter(this.currentFile, this.partitionKey);this.currentRows = 0L;}}

调用流程

以下是以上类做写操作的调用流程

  1. 指定字段:

UnpartitionedDeltaWriter 调用父类的write()方法: BaseDeltaTaskWriter.write() -> BaseEqualityDeltaWriter.write() -> BaseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()

PartitionedDeltaWriter 调用父类的write()方法: BaseDeltaTaskWriter.write() -> BaseEqualityDeltaWriter.write() -> BaseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()

两种方式均调用BaseDeltaTaskWriter的write()方法,区别在于route()的实现不同:

abstract BaseDeltaTaskWriter.RowDataDeltaWriter route(RowData var1);

// PartitionedDeltaWriter
RowDataDeltaWriter route(RowData row) {this.partitionKey.partition(this.wrapper().wrap(row));RowDataDeltaWriter writer = (RowDataDeltaWriter)(this.partitionKey);if (writer == null) {PartitionKey copiedKey = py();writer = new RowDataDeltaWriter(this, copiedKey);this.writers.put(copiedKey, writer);}return writer;
}// UnpartitionedDeltaWriter
RowDataDeltaWriter route(RowData row) {return this.writer;
}
public void write(RowData row) throws IOException {BaseDeltaTaskWriter.RowDataDeltaWriter writer = ute(row);RowKind()) {case INSERT:case UPDATE_AFTER:writer.write(row);break;case DELETE:case UPDATE_BEFORE:writer.delete(row);break;default:throw new UnsupportedOperationException("Unknown row kind: " + RowKind());}}
  1. 未指定字段:

UnpartitionedWriter.write() -> BaseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()

RowDataPartitionedFanoutWriter调用父类的write() 方法: PartitionedFanoutWriter.write() -> BaseRollingWriter.write ()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()

以上非分区写和分区写,区别在于开始调用write()时,做些分区处理的工作。PartitionedFanoutWriter会在write()方法中做特殊处理,如下:

public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {private final Map<PartitionKey, BaseTaskWriter<T>.RollingFileWriter> writers = wHashMap();protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize) {super(spec, format, appenderFactory, fileFactory, io, targetFileSize);}protected abstract PartitionKey partition(T var1);public void write(T row) throws IOException {PartitionKey partitionKey = this.partition(row);BaseTaskWriter<T>.RollingFileWriter writer = (RollingFileWriter)(partitionKey);if (writer == null) {PartitionKey copiedKey = py();writer = new RollingFileWriter(this, copiedKey);this.writers.put(copiedKey, writer);}writer.write(row);}
}

另:

  1. iceberg自带的PartitionedWriter是一个抽象类,flink用RowDataPartitionedFanoutWriter实现分区写。
  2. 底层调用的appender为IcebergStreamWriter的open()方法中创建 TaskWriter传入的FlinkAppenderFactory创建的FileAppender。

底层调用

写数据会根据file format生成对应的FileAppender,FileAppender完成实际的写文件操作。目前支持3种文件格式的写入:Parquet、Avro以及Orc

public interface FileAppender<D> extends Closeable {void add(D var1);default void addAll(Iterator<D> values) {while(values.hasNext()) {this.());}}default void addAll(Iterable<D> values) {this.addAll(values.iterator());}Metrics metrics();long length();default List<Long> splitOffsets() {return null;}
}

DataWriter创建时,传入的FileAppender参数,由FlinkAppenderFactory或GenericAppenderFactory的newAppender()方法生成

public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFormat) {MetricsConfig metricsConfig = MetricsConfig.fig);try {switch(fileFormat) {case AVRO:return Avro.write(outputFile).schema(this.schema).createWriterFunc(DataWriter::create).fig).overwrite().build();case PARQUET:return Parquet.write(outputFile).schema(this.schema).createWriterFunc(GenericParquetWriter::buildWriter).fig).metricsConfig(metricsConfig).overwrite().build();case ORC:return ORC.write(outputFile).schema(this.schema).createWriterFunc(GenericOrcWriter::buildWriter).fig).metricsConfig(metricsConfig).overwrite().build();default:throw new UnsupportedOperationException("Cannot write unknown file format: " + fileFormat);}} catch (IOException var5) {throw new UncheckedIOException(var5);}}

  1. iceberg分区数据不直接写入数据文件中,而是通过目录树结构来进行存储,分区目录结构与hive类型,都是以key1=value1/key2=value2的形式进行组织。在写入数据之前,partitionWriter首先根据partition transform函数得到对应的partition value,然后创建对应的分区目录
  2. fileAppender通过调用不同的file format组件将数据写入到文件中。iceberg写入时可以通过设置write.target-file-size-bytes table property调整写入文件target大小,默认为LONG_MAX
  3. 当所有数据写入完成后,iceberg会收集写入的统计信息,例如record_count, lower_bound, upper_bound, value_count等用于driver端生成对应的manifest文件,最后executor端将这些信息传回driver端。

实例与过程分析

开始

以下共用字段解释:
1、rowDataStream:flink输入流 ,如下:

DataStream input = … ;

2、FLINK_SCHEMA:flink schema,创建如下:

TableSchema.Builder flinkTable = TableSchema.builder();
flinkTable.field(“name”, DataTypes.STRING());
TableSchema FLINK_SCHEMA = flinkTable.build();

3、tableLoader:用于加载iceberg表,创建方式如下:

TableLoader tableLoader = TableLoader.fromHadoopTable(“hdfs://nn:8020/warehouse/path”);

实例

1、UnpartitionedDeltaWriter
执行过程图:

如果创建的表不是分区表并且设置参数equalityFieldColumns(),会调用此类UnpartitionedDeltaWriter,实例代码:

FlinkSink.forRow(rowDataStream, FLINK_SCHEMA).tableLoader(tableLoader).tableSchema(FLINK_SCHEMA).equalityFieldColumns(Arrays.asList("uid", "mkey", "pla","timestamp", "dt", "hh")).writeParallelism(1).build();

build()方法会对equalityFieldColumns ([“uid”, “mkey”, “pla”,“timestamp”, “dt”, “hh”]) 字段进行处理,生成equalityFieldIds即表字段的schema id列表 ([1,2,3,9,10,11])。

2、PartitionedDeltaWriter
执行过程图:

如果创建的表是分区表并且设置参数equalityFieldColumns(),会调用此类PartitionedDeltaWriter,实例代码:

FlinkSink.forRow(rowDataStream, FLINK_SCHEMA).tableLoader(tableLoader).tableSchema(FLINK_SCHEMA).equalityFieldColumns(Arrays.asList("uid", "mkey", "pla","timestamp", "dt", "hh")).writeParallelism(1).build();

build()方法会对equalityFieldColumns ([“uid”, “mkey”, “pla”,“timestamp”, “dt”, “hh”]) 字段进行处理,生成equalityFieldIds即表字段的schema id列表 ([1,2,3,9,10,11])。

以下是route(),会生成分区字段partitionKey和writer,其中包括数据文件和delete文件路径等信息:

3、UnpartitionedWriter
执行过程图:

如果创建的表不是分区表,会调用此类UnpartitionedWriter。
实例代码:

FlinkSink.forRow(rowDataStream, FLINK_SCHEMA).tableLoader(tableLoader).tableSchema(FLINK_SCHEMA).writeParallelism(1).build();

4、RowDataPartitionedFanoutWriter
执行过程图:

如果创建的表是分区表,会调用此类RowDataPartitionedFanoutWriter。
实例代码:

FlinkSink.forRow(rowDataStream, FLINK_SCHEMA).tableLoader(tableLoader).tableSchema(FLINK_SCHEMA).writeParallelism(1).build();

另:

使用equalityFieldColumns方法若报错,请参考这里

总结

1、使用equalityFieldColumns写v1表不支持
2、使用equalityFieldColumns写只能flink批读取。

参考

  1. .html
  2. /

本文发布于:2024-01-30 16:59:19,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170660516021502.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:四种   源码   区别   Iceberg   flink
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23