Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-4733

Flag emitDelete is inconsistent in HoodieTableSource and MergeOnReadInputFormat

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • None
    • 1.1.0
    • flink, flink-sql
    • None

    Description

      When reading a MOR table in flink, we encountered an exception from flink runtime ( as shown in image1), which complained the table source should not emit a retract record.

      I think here is the cause, in HoodieTableSource:

      @Override
      public ChangelogMode getChangelogMode() {
        // when read as streaming and changelog mode is enabled, emit as FULL mode;
        // when all the changes are compacted or read as batch, emit as INSERT mode.
        return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly();
      } 
      private InputFormat<RowData, ?> getStreamInputFormat() { 
      ...
      if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { 
        final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); 
        boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ; 
        return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList(), emitDelete); }
      ...
       }
      

      With these options:

      'table.type' 'MERGE_ON_READ'

      'read.streaming.enabled' 'true'

      The HoodieTableSource annouces it has only INSERT changelog, 

      but MergeOnReadInputFormat will emit delete.

      Attachments

        1. image 1.png
          179 kB
          nonggia.liang

        Activity

          People

            yuzhaojing Zhaojing Yu
            nonggia nonggia.liang
            Danny Chen
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: