Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
1.4.2, 1.5.1, 1.6.0, 1.7.0
-
Important
Description
Grouping by window AND some other attribute(s) seems broken. Test case attached:
class BatchStatisticsIntegrationTest extends FlatSpec with Matchers { trait BatchContext { implicit lazy val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment implicit val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env) val data = Seq( (1532424567000L, "id1", "location1"), (1532424567000L, "id2", "location1"), (1532424567000L, "id3", "location1"), (1532424568000L, "id1", "location2"), (1532424568000L, "id2", "location3") ) val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data) val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id, 'location) } it should "be possible to run Table API queries with grouping by tumble window and column(s) on batch data" in new BatchContext { val results = table .window(Tumble over 1.second on 'rowtime as 'w) .groupBy('w, 'location) .select( 'w.start.cast(Types.LONG), 'w.end.cast(Types.LONG), 'location, 'id.count ) .toDataSet[(Long, Long, String, Long)] .collect() results should contain theSameElementsAs Seq( (1532424567000L, 1532424568000L, "location1", 3L), (1532424568000L, 1532424569000L, "location2", 1L), (1532424568000L, 1532424569000L, "location3", 1L) ) } }
It seems like during execution time, the 'rowtime attribute replaces 'location and that causes ClassCastException.
[info] Cause: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String [info] at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) [info] at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160) [info] at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46) [info] at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) [info] at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) [info] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) [info] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) [info] at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) [info] at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) [info] at org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)
Here is some debug information that I was able to get. So, field serializers don't match the type of Row fields:
this.instance = {Row@68451} "1532424567000,(3),1532424567000" fields = {Object[3]@68461} 0 = {Long@68462} 1532424567000 1 = {CountAccumulator@68463} "(3)" 2 = {Long@68462} 1532424567000 this.serializer = {RowSerializer@68452} fieldSerializers = {TypeSerializer[3]@68455} 0 = {StringSerializer@68458} 1 = {TupleSerializer@68459} 2 = {LongSerializer@68460} arity = 3 nullMask = {boolean[3]@68457}
Attachments
Issue Links
- links to