Details
-
Sub-task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
When I test Non-windowed group-aggregate with stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new StreamITCase.StringSink), I got the error as follows:
org.apache.flink.table.api.TableException: Table is not an append-only table. Output needs to handle update and delete changes. at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219) at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195) at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
The reason is DataStreamGroupAggregate#producesUpdates as follows:
override def producesUpdates = true
I think in the view of the user, what user want are(for example):
Data:
val data = List( (1L, 1, "Hello"), (2L, 2, "Hello"), (3L, 3, "Hello"), (4L, 4, "Hello"), (5L, 5, "Hello"), (6L, 6, "Hello"), (7L, 7, "Hello World"), (8L, 8, "Hello World"), (20L, 20, "Hello World"))
- Case1:
TableAPIstream.toTable(tEnv, 'a, 'b, 'c).select('a.sum) .toAppendStream[Row].addSink(new StreamITCase.StringSink)
Result
// StringSink process datas: 1 3 6 10 15 21 28 36 56 // Last output datas: 1 3 6 10 15 21 28 36 56
- Case 2:
TableAPIstream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row] .addSink(new StreamITCase.RetractingSink)
Result:
// RetractingSink process datas: (true,1) (false,1) (true,3) (false,3) (true,6) (false,6) (true,10) (false,10) (true,15) (false,15) (true,21) (false,21) (true,28) (false,28) (true,36) (false,36) (true,56) // Last output data: 56
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPIstream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w) .select('a.sum over 'w) .toAppendStream[Row].addSink(new StreamITCase.StringSink)
Result
Same as #Case1
But after the FLINK-6649 OVER can not express the #Case1 with earlyFiring.
So I still think that Non-windowed group-aggregate not always update-table, user can decide which mode to use.
Is there any drawback to this improvement? Welcome anyone feedback?
Attachments
Issue Links
- links to