Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16451

Fix IndexOutOfBoundsException for DISTINCT AGG with constants

    XMLWordPrintableJSON

    Details

      Description

      When I use lisgagg with distinct and over window.

      //代码占位符
      "select listagg(distinct product, '|') over(partition by user order by proctime rows between 200 preceding and current row) as product, user from " + testTable
      

      I got the follwing exception

      //代码占位符
      
      Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, Size: 3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at java.util.Collections$UnmodifiableList.get(Collections.java:1311) at org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374) at org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
      

      But It worked with 

      //代码占位符
      select listagg(distinct product) over(partition by user order by proctime rows between 200 preceding and current row) as product, user from " + testTable
      

       

      The exception will be throw  at the below code. 

      //代码占位符
      private def generateKeyExpression(
          ctx: CodeGeneratorContext,
          generator: ExprCodeGenerator): GeneratedExpression = {
        val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess(
          ctx,
          generator.input1Type,
          generator.input1Term,
          _,
          nullableInput = false,
          deepCopy = inputFieldCopy))
      

       

      The distinctInfo.argIndexs is  [1, 3] .  But the index 3 is a logical index. It will be replaced by  '|' . And should not  generate Input Access for  index 3 

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                jark Jark Wu
                Reporter:
                hackergin jinfeng
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: