Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6619 Check Table API & SQL support for 1.3.0 RC01 Release
  3. FLINK-6650

Fix Non-windowed group-aggregate error when using append-table mode.

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.3.0, 1.4.0
    • Table SQL / API
    • None

    Description

      When I test Non-windowed group-aggregate with stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new StreamITCase.StringSink), I got the error as follows:

      org.apache.flink.table.api.TableException: Table is not an append-only table. Output needs to handle update and delete changes.
      
      	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
      	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
      	at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
      	at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
      	at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
      

      The reason is DataStreamGroupAggregate#producesUpdates as follows:

      override def producesUpdates = true
      

      I think in the view of the user, what user want are(for example):
      Data:

      val data = List(
            (1L, 1, "Hello"),
            (2L, 2, "Hello"),
            (3L, 3, "Hello"),
            (4L, 4, "Hello"),
            (5L, 5, "Hello"),
            (6L, 6, "Hello"),
            (7L, 7, "Hello World"),
            (8L, 8, "Hello World"),
            (20L, 20, "Hello World"))
      
      • Case1:
        TableAPI
         stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
         .toAppendStream[Row].addSink(new StreamITCase.StringSink)
        

        Result

        // StringSink process datas:
        1
        3
        6
        10
        15
        21
        28
        36
        56
        // Last output datas:
        1
        3
        6
        10
        15
        21
        28
        36
        56
        
      • Case 2:
        TableAPI
        stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
        .addSink(new StreamITCase.RetractingSink)
        

        Result:

        // RetractingSink process datas:
        (true,1)
        (false,1)
        (true,3)
        (false,3)
        (true,6)
        (false,6)
        (true,10)
        (false,10)
        (true,15)
        (false,15)
        (true,21)
        (false,21)
        (true,28)
        (false,28)
        (true,36)
        (false,36)
        (true,56)
        // Last output data:
        56
        

        In fact about #Case 1,we can using unbounded OVER windows, as follows:
        TableAPI

        stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
            .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
            .select('a.sum over 'w)
            .toAppendStream[Row].addSink(new StreamITCase.StringSink)
        

        Result

        Same as #Case1
        

      But after the FLINK-6649 OVER can not express the #Case1 with earlyFiring.

      So I still think that Non-windowed group-aggregate not always update-table, user can decide which mode to use.

      Is there any drawback to this improvement? Welcome anyone feedback?

      Attachments

        Issue Links

          Activity

            People

              sunjincheng121 sunjincheng
              sunjincheng121 sunjincheng
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: