Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      When I test Non-windowed group-aggregate with stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new StreamITCase.StringSink), I got the error as follows:

      org.apache.flink.table.api.TableException: Table is not an append-only table. Output needs to handle update and delete changes.
      
      	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
      	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
      	at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
      	at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
      	at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
      

      The reason is DataStreamGroupAggregate#producesUpdates as follows:

      override def producesUpdates = true
      

      I think in the view of the user, what user want are(for example):
      Data:

      val data = List(
            (1L, 1, "Hello"),
            (2L, 2, "Hello"),
            (3L, 3, "Hello"),
            (4L, 4, "Hello"),
            (5L, 5, "Hello"),
            (6L, 6, "Hello"),
            (7L, 7, "Hello World"),
            (8L, 8, "Hello World"),
            (20L, 20, "Hello World"))
      
      • Case1:
        TableAPI
         stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
         .toAppendStream[Row].addSink(new StreamITCase.StringSink)
        

        Result

        // StringSink process datas:
        1
        3
        6
        10
        15
        21
        28
        36
        56
        // Last output datas:
        1
        3
        6
        10
        15
        21
        28
        36
        56
        
      • Case 2:
        TableAPI
        stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
        .addSink(new StreamITCase.RetractingSink)
        

        Result:

        // RetractingSink process datas:
        (true,1)
        (false,1)
        (true,3)
        (false,3)
        (true,6)
        (false,6)
        (true,10)
        (false,10)
        (true,15)
        (false,15)
        (true,21)
        (false,21)
        (true,28)
        (false,28)
        (true,36)
        (false,36)
        (true,56)
        // Last output data:
        56
        

        In fact about #Case 1,we can using unbounded OVER windows, as follows:
        TableAPI

        stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
            .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
            .select('a.sum over 'w)
            .toAppendStream[Row].addSink(new StreamITCase.StringSink)
        

        Result

        Same as #Case1
        

      But after the FLINK-6649 OVER can not express the #Case1 with earlyFiring.

      So I still think that Non-windowed group-aggregate not always update-table, user can decide which mode to use.

      Is there any drawback to this improvement? Welcome anyone feedback?

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Thanks for starting this discussion. I think the current behavior is correct.
          The result of a query should only depend on the operators of a query (select('a.sum)) and not on the method to convert the result into a stream. The choice of toAppendStream() and toRetractStream() should only affect the representation of the result but not the result itself.

          Moreover, the result of a query on a streaming table must be identical to a batch query on the same input data. Hence, the example in case 1 would not be correct, because it should be a single row with a single value 56 as in case 2 and not multiple rows.
          The correct query to specify the desired of case 1 is the OVER query as shown in the description. The result of an OVER query can be converted into an append stream (unless we add / enable support for late data).

          To summarize: The drawback of implementing this would be incorrect semantics (batch != streaming) and unclear behavior (result of a query depends on the conversion method).

          Show
          fhueske Fabian Hueske added a comment - Thanks for starting this discussion. I think the current behavior is correct. The result of a query should only depend on the operators of a query ( select('a.sum) ) and not on the method to convert the result into a stream. The choice of toAppendStream() and toRetractStream() should only affect the representation of the result but not the result itself. Moreover, the result of a query on a streaming table must be identical to a batch query on the same input data. Hence, the example in case 1 would not be correct, because it should be a single row with a single value 56 as in case 2 and not multiple rows. The correct query to specify the desired of case 1 is the OVER query as shown in the description. The result of an OVER query can be converted into an append stream (unless we add / enable support for late data). To summarize: The drawback of implementing this would be incorrect semantics (batch != streaming) and unclear behavior (result of a query depends on the conversion method).
          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi, Fabian Hueske Thanks a lot for your explanation. I completely agree the semantics of non-window should be a update table. We should not call `toAppendStream[Row]` in non-window agg.
          The error message need little improve. So in this JIRA. only improve the error message.

          Thanks,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - Hi, Fabian Hueske Thanks a lot for your explanation. I completely agree the semantics of non-window should be a update table. We should not call `toAppendStream [Row] ` in non-window agg. The error message need little improve. So in this JIRA. only improve the error message. Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3958

          FLINK-6650[table] Improve the error message for toAppendStream

          The PR have three small changes:
          1. Improve the error message for toAppendStream.
          2. Change incorrect variable name.
          3.Add JAVA DOC for key parameter of method.

          • [x] General
          • The pull request references the related JIRA issue ("FLINK-6650[table] Improve the error message for toAppendStream")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-6650-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3958.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3958



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3958 FLINK-6650 [table] Improve the error message for toAppendStream The PR have three small changes: 1. Improve the error message for toAppendStream. 2. Change incorrect variable name. 3.Add JAVA DOC for key parameter of method. [x] General The pull request references the related JIRA issue (" FLINK-6650 [table] Improve the error message for toAppendStream") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6650 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3958.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3958
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3958#discussion_r117724493

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -629,8 +629,7 @@ abstract class StreamTableEnvironment(
          // if no change flags are requested, verify table is an insert-only (append-only) table.
          if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
          throw new TableException(

          • "Table is not an append-only table. " +
          • "Output needs to handle update and delete changes.")
            + "Table is not an append-only table. Try calling the [table.toRetractStream] method.")
              • End diff –

          I don't think that "try calling..." is a helpful message. What about "Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages."?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3958#discussion_r117724493 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -629,8 +629,7 @@ abstract class StreamTableEnvironment( // if no change flags are requested, verify table is an insert-only (append-only) table. if (!withChangeFlag && !isAppendOnly(logicalPlan)) { throw new TableException( "Table is not an append-only table. " + "Output needs to handle update and delete changes.") + "Table is not an append-only table. Try calling the [table.toRetractStream] method.") End diff – I don't think that "try calling..." is a helpful message. What about "Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages."?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3958#discussion_r118038355

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -629,8 +629,7 @@ abstract class StreamTableEnvironment(
          // if no change flags are requested, verify table is an insert-only (append-only) table.
          if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
          throw new TableException(

          • "Table is not an append-only table. " +
          • "Output needs to handle update and delete changes.")
            + "Table is not an append-only table. Try calling the [table.toRetractStream] method.")
              • End diff –

          Yes, make sense to user. should update it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3958#discussion_r118038355 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -629,8 +629,7 @@ abstract class StreamTableEnvironment( // if no change flags are requested, verify table is an insert-only (append-only) table. if (!withChangeFlag && !isAppendOnly(logicalPlan)) { throw new TableException( "Table is not an append-only table. " + "Output needs to handle update and delete changes.") + "Table is not an append-only table. Try calling the [table.toRetractStream] method.") End diff – Yes, make sense to user. should update it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3958

          Hi @twalthr Thanks for review the PR. I have updated the PR. According your comment.

          Best,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3958 Hi @twalthr Thanks for review the PR. I have updated the PR. According your comment. Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3958

          Merging this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3958 Merging this...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3958

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3958
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.4: 61914abffa83a55d4f0a339dbcf64c540962a9cd
          Fixed in 1.3: 0f86deed28ab326c8cdb886e3b5ea32da76beab7

          Show
          twalthr Timo Walther added a comment - Fixed in 1.4: 61914abffa83a55d4f0a339dbcf64c540962a9cd Fixed in 1.3: 0f86deed28ab326c8cdb886e3b5ea32da76beab7

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development