Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15631

Cannot use generic types as the result of an AggregateFunction in Blink planner

    XMLWordPrintableJSON

    Details

      Description

      It is not possible to use a GenericTypeInfo for a result type of an AggregateFunction in a retract mode with state cleaning disabled.

      
        @Test
        def testGenericTypes(): Unit = {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
          val tEnv = StreamTableEnvironment.create(env, setting)
          val t = env.fromElements(1, 2, 3).toTable(tEnv, 'a)
      
          val results = t
            .select(new GenericAggregateFunction()('a))
            .toRetractStream[Row]
      
          val sink = new TestingRetractSink
          results.addSink(sink).setParallelism(1)
          env.execute()
        }
      
      class RandomClass(var i: Int)
      
      class GenericAggregateFunction extends AggregateFunction[java.lang.Integer, RandomClass] {
        override def getValue(accumulator: RandomClass): java.lang.Integer = accumulator.i
      
        override def createAccumulator(): RandomClass = new RandomClass(0)
      
        override def getResultType: TypeInformation[java.lang.Integer] = new GenericTypeInfo[Integer](classOf[Integer])
      
        override def getAccumulatorType: TypeInformation[RandomClass] = new GenericTypeInfo[RandomClass](
          classOf[RandomClass])
      
        def accumulate(acc: RandomClass, value: Int): Unit = {
          acc.i = value
        }
      
        def retract(acc: RandomClass, value: Int): Unit = {
          acc.i = value
        }
      
        def resetAccumulator(acc: RandomClass): Unit = {
          acc.i = 0
        }
      }
      

      The code above fails with:

      Caused by: java.lang.UnsupportedOperationException: BinaryGeneric cannot be compared
      	at org.apache.flink.table.dataformat.BinaryGeneric.equals(BinaryGeneric.java:77)
      	at GroupAggValueEqualiser$17.equalsWithoutHeader(Unknown Source)
      	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:177)
      	at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
      	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:170)
      	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
      	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
      	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
      	at java.lang.Thread.run(Thread.java:748)
      

      This is related to FLINK-13702

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                lzljs3620320 Jingsong Lee
                Reporter:
                dwysakowicz Dawid Wysakowicz
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m