flink写iceberg时,IcebergStreamWriter的open()方法中,会调用ate(),会创建四种类型的写(UnpartitionedDeltaWriter/UnpartitionedWriter/PartitionedDeltaWriter/RowDataPartitionedFanoutWriter),本文主要追踪这四种类型的写。
其中,IcebergStreamWriter.open()方法:
public void open() {this.subTaskId = RuntimeContext().getIndexOfThisSubtask();this.attemptId = RuntimeContext().getAttemptNumber();this.taskWriterFactory.initialize(this.subTaskId, this.attemptId);this.writer = ate();}
public TaskWriter<RowData> create() {Preconditions.checkNotNull(this.outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize().");if (this.equalityFieldIds != null && !this.equalityFieldIds.isEmpty()) {return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds) : new PartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds));} else {return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes) : new RowDataTaskWriterFactory.RowDataPartitionedFanoutWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema));}}
此方法中根据是否指定字段,构造分区写(PartitionedDeltaWriter/RowDataPartitionedFanoutWriter)和非分区写实例(UnpartitionedDeltaWriter/UnpartitionedWriter)
从图中可以看出,几种类型的写均继承自BaseTaskWriter抽象类。区别在于 Partitioned方式的写需要处理一些分区 Key 生成的逻辑。
其中:
以上类均实现TaskWriter接口:
public interface TaskWriter<T> extends Closeable {void write(T var1) throws IOException;void abort() throws IOException;default DataFile[] dataFiles() throws IOException {WriteResult result = thisplete();Preconditions.checkArgument(result.deleteFiles() == null || result.deleteFiles().length == 0, "Should have no delete files in this write result.");return result.dataFiles();}WriteResult complete() throws IOException;
}
BaseTaskWriter继承自TaskWriter,内部类RollingFileWriter(以上几种最终调用的RollingFileWriter.write())继承自内部类BaseRollingWriter,创建BaseRollingWriter时,调用openCurrent(),会生成DataWriter。BaseRollingWriter的write()方法会调用datawriter.add(),最后调用FileAppender的add()方法进行数据的写入工作。
private abstract class BaseRollingWriter<W extends Closeable> implements Closeable {private static final int ROWS_DIVISOR = 1000;private final PartitionKey partitionKey;private EncryptedOutputFile currentFile;private W currentWriter;private long currentRows;private BaseRollingWriter(PartitionKey partitionKey) {this.currentFile = null;this.currentWriter = null;this.currentRows = 0L;this.partitionKey = partitionKey;this.openCurrent();}abstract W newWriter(EncryptedOutputFile var1, PartitionKey var2);abstract long length(W var1);abstract void write(W var1, T var2);abstract void complete(W var1);public void write(T record) throws IOException {this.write(this.currentWriter, record);++this.currentRows;if (this.shouldRollToNewFile()) {this.closeCurrent();this.openCurrent();}}private void openCurrent() {if (this.partitionKey == null) {this.currentFile = BaseTaskWriter.wOutputFile();} else {this.currentFile = BaseTaskWriter.wOutputFile(this.partitionKey);}this.currentWriter = wWriter(this.currentFile, this.partitionKey);this.currentRows = 0L;}}
以下是以上类做写操作的调用流程
UnpartitionedDeltaWriter 调用父类的write()方法: BaseDeltaTaskWriter.write() -> BaseEqualityDeltaWriter.write() -> BaseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()
PartitionedDeltaWriter 调用父类的write()方法: BaseDeltaTaskWriter.write() -> BaseEqualityDeltaWriter.write() -> BaseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()
两种方式均调用BaseDeltaTaskWriter的write()方法,区别在于route()的实现不同:
abstract BaseDeltaTaskWriter.RowDataDeltaWriter route(RowData var1);
// PartitionedDeltaWriter
RowDataDeltaWriter route(RowData row) {this.partitionKey.partition(this.wrapper().wrap(row));RowDataDeltaWriter writer = (RowDataDeltaWriter)(this.partitionKey);if (writer == null) {PartitionKey copiedKey = py();writer = new RowDataDeltaWriter(this, copiedKey);this.writers.put(copiedKey, writer);}return writer;
}// UnpartitionedDeltaWriter
RowDataDeltaWriter route(RowData row) {return this.writer;
}
public void write(RowData row) throws IOException {BaseDeltaTaskWriter.RowDataDeltaWriter writer = ute(row);RowKind()) {case INSERT:case UPDATE_AFTER:writer.write(row);break;case DELETE:case UPDATE_BEFORE:writer.delete(row);break;default:throw new UnsupportedOperationException("Unknown row kind: " + RowKind());}}
UnpartitionedWriter.write() -> BaseRollingWriter.write()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()
RowDataPartitionedFanoutWriter调用父类的write() 方法: PartitionedFanoutWriter.write() -> BaseRollingWriter.write ()-> RollingFileWriter.write() -> DataWriter.add() -> appender.add()
以上非分区写和分区写,区别在于开始调用write()时,做些分区处理的工作。PartitionedFanoutWriter会在write()方法中做特殊处理,如下:
public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> {private final Map<PartitionKey, BaseTaskWriter<T>.RollingFileWriter> writers = wHashMap();protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize) {super(spec, format, appenderFactory, fileFactory, io, targetFileSize);}protected abstract PartitionKey partition(T var1);public void write(T row) throws IOException {PartitionKey partitionKey = this.partition(row);BaseTaskWriter<T>.RollingFileWriter writer = (RollingFileWriter)(partitionKey);if (writer == null) {PartitionKey copiedKey = py();writer = new RollingFileWriter(this, copiedKey);this.writers.put(copiedKey, writer);}writer.write(row);}
}
另:
写数据会根据file format生成对应的FileAppender,FileAppender完成实际的写文件操作。目前支持3种文件格式的写入:Parquet、Avro以及Orc
public interface FileAppender<D> extends Closeable {void add(D var1);default void addAll(Iterator<D> values) {while(values.hasNext()) {this.());}}default void addAll(Iterable<D> values) {this.addAll(values.iterator());}Metrics metrics();long length();default List<Long> splitOffsets() {return null;}
}
DataWriter创建时,传入的FileAppender参数,由FlinkAppenderFactory或GenericAppenderFactory的newAppender()方法生成
public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFormat) {MetricsConfig metricsConfig = MetricsConfig.fig);try {switch(fileFormat) {case AVRO:return Avro.write(outputFile).schema(this.schema).createWriterFunc(DataWriter::create).fig).overwrite().build();case PARQUET:return Parquet.write(outputFile).schema(this.schema).createWriterFunc(GenericParquetWriter::buildWriter).fig).metricsConfig(metricsConfig).overwrite().build();case ORC:return ORC.write(outputFile).schema(this.schema).createWriterFunc(GenericOrcWriter::buildWriter).fig).metricsConfig(metricsConfig).overwrite().build();default:throw new UnsupportedOperationException("Cannot write unknown file format: " + fileFormat);}} catch (IOException var5) {throw new UncheckedIOException(var5);}}
以下共用字段解释:
1、rowDataStream:flink输入流 ,如下:
DataStream input = … ;
2、FLINK_SCHEMA:flink schema,创建如下:
TableSchema.Builder flinkTable = TableSchema.builder();
flinkTable.field(“name”, DataTypes.STRING());
TableSchema FLINK_SCHEMA = flinkTable.build();
3、tableLoader:用于加载iceberg表,创建方式如下:
TableLoader tableLoader = TableLoader.fromHadoopTable(“hdfs://nn:8020/warehouse/path”);
1、UnpartitionedDeltaWriter
执行过程图:
如果创建的表不是分区表并且设置参数equalityFieldColumns(),会调用此类UnpartitionedDeltaWriter,实例代码:
FlinkSink.forRow(rowDataStream, FLINK_SCHEMA).tableLoader(tableLoader).tableSchema(FLINK_SCHEMA).equalityFieldColumns(Arrays.asList("uid", "mkey", "pla","timestamp", "dt", "hh")).writeParallelism(1).build();
build()方法会对equalityFieldColumns ([“uid”, “mkey”, “pla”,“timestamp”, “dt”, “hh”]) 字段进行处理,生成equalityFieldIds即表字段的schema id列表 ([1,2,3,9,10,11])。
2、PartitionedDeltaWriter
执行过程图:
如果创建的表是分区表并且设置参数equalityFieldColumns(),会调用此类PartitionedDeltaWriter,实例代码:
FlinkSink.forRow(rowDataStream, FLINK_SCHEMA).tableLoader(tableLoader).tableSchema(FLINK_SCHEMA).equalityFieldColumns(Arrays.asList("uid", "mkey", "pla","timestamp", "dt", "hh")).writeParallelism(1).build();
build()方法会对equalityFieldColumns ([“uid”, “mkey”, “pla”,“timestamp”, “dt”, “hh”]) 字段进行处理,生成equalityFieldIds即表字段的schema id列表 ([1,2,3,9,10,11])。
以下是route(),会生成分区字段partitionKey和writer,其中包括数据文件和delete文件路径等信息:
3、UnpartitionedWriter
执行过程图:
如果创建的表不是分区表,会调用此类UnpartitionedWriter。
实例代码:
FlinkSink.forRow(rowDataStream, FLINK_SCHEMA).tableLoader(tableLoader).tableSchema(FLINK_SCHEMA).writeParallelism(1).build();
4、RowDataPartitionedFanoutWriter
执行过程图:
如果创建的表是分区表,会调用此类RowDataPartitionedFanoutWriter。
实例代码:
FlinkSink.forRow(rowDataStream, FLINK_SCHEMA).tableLoader(tableLoader).tableSchema(FLINK_SCHEMA).writeParallelism(1).build();
另:
使用equalityFieldColumns方法若报错,请参考这里
1、使用equalityFieldColumns写v1表不支持
2、使用equalityFieldColumns写只能flink批读取。
参考:
本文发布于:2024-01-30 16:59:19,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170660516021502.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |