Description
When reading a MOR table in flink, we encountered an exception from flink runtime ( as shown in image1), which complained the table source should not emit a retract record.
I think here is the cause, in HoodieTableSource:
@Override public ChangelogMode getChangelogMode() { // when read as streaming and changelog mode is enabled, emit as FULL mode; // when all the changes are compacted or read as batch, emit as INSERT mode. return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly(); }
private InputFormat<RowData, ?> getStreamInputFormat() { ... if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ; return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList(), emitDelete); } ... }
With these options:
'table.type' = 'MERGE_ON_READ'
'read.streaming.enabled' = 'true'
The HoodieTableSource annouces it has only INSERT changelog,
but MergeOnReadInputFormat will emit delete.