Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      When I test Non-windowed group-aggregate with stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new StreamITCase.StringSink), I got the error as follows:

      org.apache.flink.table.api.TableException: Table is not an append-only table. Output needs to handle update and delete changes.
      
      	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
      	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
      	at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
      	at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
      	at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
      

      The reason is DataStreamGroupAggregate#producesUpdates as follows:

      override def producesUpdates = true
      

      I think in the view of the user, what user want are(for example):
      Data:

      val data = List(
            (1L, 1, "Hello"),
            (2L, 2, "Hello"),
            (3L, 3, "Hello"),
            (4L, 4, "Hello"),
            (5L, 5, "Hello"),
            (6L, 6, "Hello"),
            (7L, 7, "Hello World"),
            (8L, 8, "Hello World"),
            (20L, 20, "Hello World"))
      
      • Case1:
        TableAPI
         stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
         .toAppendStream[Row].addSink(new StreamITCase.StringSink)
        

        Result

        // StringSink process datas:
        1
        3
        6
        10
        15
        21
        28
        36
        56
        // Last output datas:
        1
        3
        6
        10
        15
        21
        28
        36
        56
        
      • Case 2:
        TableAPI
        stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
        .addSink(new StreamITCase.RetractingSink)
        

        Result:

        // RetractingSink process datas:
        (true,1)
        (false,1)
        (true,3)
        (false,3)
        (true,6)
        (false,6)
        (true,10)
        (false,10)
        (true,15)
        (false,15)
        (true,21)
        (false,21)
        (true,28)
        (false,28)
        (true,36)
        (false,36)
        (true,56)
        // Last output data:
        56
        

        In fact about #Case 1,we can using unbounded OVER windows, as follows:
        TableAPI

        stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
            .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
            .select('a.sum over 'w)
            .toAppendStream[Row].addSink(new StreamITCase.StringSink)
        

        Result

        Same as #Case1
        

      But after the FLINK-6649 OVER can not express the #Case1 with earlyFiring.

      So I still think that Non-windowed group-aggregate not always update-table, user can decide which mode to use.

      Is there any drawback to this improvement? Welcome anyone feedback?

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                sunjincheng121 sunjincheng
                Reporter:
                sunjincheng121 sunjincheng
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: