Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6368

Grouping keys in stream aggregations have wrong order

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It seems that the order of grouping keys is sometimes messed up. The following tests fails:

        @Test
        def testEventTimeSlidingGroupWindow(): Unit = {
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          val tEnv = TableEnvironment.getTableEnvironment(env)
          StreamITCase.testResults = mutable.MutableList()
      
          val stream = env
            .fromCollection(data)
            .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
            .map(t => (t._2, t._6))
          val table = stream.toTable(tEnv, 'int, 'string)
      
          val windowedTable = table
            .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
            .groupBy('w, 'string)
            .select('string, 'int.count, 'w.start, 'w.end)
      
          val results = windowedTable.toDataStream[Row]
          results.addSink(new StreamITCase.StringSink)
          env.execute()
        }
      

      Exception:

      Caused by: java.lang.RuntimeException: Could not forward element to next operator
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849)
      	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
      	at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50)
      	at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29)
      	at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74)
      	at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64)
      	at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35)
      	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505)
      	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276)
      	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940)
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
      	... 7 more
      Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user xccui opened a pull request:

          https://github.com/apache/flink/pull/3768

          FLINK-6368[table] Grouping keys in stream aggregations have wrong order

          FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage, who maps all grouping keys to the first n fields of a record. That's why in old versions we generated new shifted grouping keys (`val groupingKeys = grouping.indices.toArray`) by the original keys' indices. Now that the mapping has been removed, we should use the original grouping keys rather than the shifted keys. Also, a test method posted in https://issues.apache.org/jira/browse/FLINK-6368 is added to DataStreamAggregateITCase.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/xccui/flink FLINK-6368

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3768.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3768


          commit ed03570d9bfa52e634de5a13b3425a5fd21fe6c8
          Author: xccui <xingcanc@gmail.com>
          Date: 2017-04-25T06:06:45Z

          FLINK-6368 Fix the wrong ordered keys problem


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/3768 FLINK-6368 [table] Grouping keys in stream aggregations have wrong order ​ FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage, who maps all grouping keys to the first n fields of a record. That's why in old versions we generated new shifted grouping keys (`val groupingKeys = grouping.indices.toArray`) by the original keys' indices. Now that the mapping has been removed, we should use the original grouping keys rather than the shifted keys. Also, a test method posted in https://issues.apache.org/jira/browse/FLINK-6368 is added to DataStreamAggregateITCase. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-6368 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3768.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3768 commit ed03570d9bfa52e634de5a13b3425a5fd21fe6c8 Author: xccui <xingcanc@gmail.com> Date: 2017-04-25T06:06:45Z FLINK-6368 Fix the wrong ordered keys problem
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3768

          Thanks for the fix @xccui!

          +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3768 Thanks for the fix @xccui! +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3768

          Thanks for fixing this so quickly @xccui! I will test this fix with my branch and merge this later today.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3768 Thanks for fixing this so quickly @xccui! I will test this fix with my branch and merge this later today.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3768

          Merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3768 Merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3768

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3768
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: 05088b4a61001b536b3d07e49c415606edf11fba

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: 05088b4a61001b536b3d07e49c415606edf11fba
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3768

          I ran into the same problem today when adding the new test cases for UDAGG. Thanks for the fix, @xccui

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3768 I ran into the same problem today when adding the new test cases for UDAGG. Thanks for the fix, @xccui

            People

            • Assignee:
              xccui Xingcan Cui
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development