Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32426

Fix adaptive local hash agg can't work when auxGrouping exist

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • 1.18.0, 1.17.1
    • 1.18.0
    • None

    Description

      For the following case, the field `a` is primary key,  we select from `AuxGroupingTable` and group by a, b. Since a is primary key, it also guarantee the unique, so planner will extract b as auxGrouping field.

      registerCollection(
        "AuxGroupingTable",
        data2,
        type2,
        "a, b, c, d, e",
        nullablesOfData2,
        FlinkStatistic.builder().uniqueKeys(Set(Set("a").asJava).asJava).build())
      
      checkResult(
        "SELECT a, b, COUNT(c) FROM AuxGroupingTable GROUP BY a, b",
        Seq(
          row(1, 1, 1),
          row(2, 3, 2),
          row(3, 4, 3),
          row(4, 10, 4),
          row(5, 11, 5)
        )
      ) 

       

      Due to the generated code doesn't get auxGrouping fields from input RowData and then setting it to aggBuffer, the aggBuffer RowData loses some fields, and it will throw an index Exception when get the field from it. As following:

      Caused by: java.lang.AssertionError: index (1) should < 1
          at org.apache.flink.table.data.binary.BinaryRowData.assertIndexIsValid(BinaryRowData.java:127)
          at org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:156)
          at org.apache.flink.table.data.utils.JoinedRowData.isNullAt(JoinedRowData.java:113)
          at org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:201)
          at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
          at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
          at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:165)
          at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:43)
          at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:141)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
          at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
          at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:134)
          at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:114)
          at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:95)
          at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:48)
          at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:59)
          at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:31)
          at LocalHashAggregateWithKeys$39.processElement_split2(Unknown Source)
          at LocalHashAggregateWithKeys$39.processElement(Unknown Source)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
          at BatchExecCalc$10.processElement(Unknown Source)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
          at SourceConversion$6.processElement(Unknown Source)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:108)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:77)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
          at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
          at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
          at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
          at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
          at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) 

      Attachments

        Issue Links

          Activity

            People

              lsy dalongliu
              lsy dalongliu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: