Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Bug
-
1.11.0
-
None
Description
When I use multiple custom partitioning operations in a row like this:
stream .partitionCustom(<custom_partitioner1>, _.key1) .mapWithState(...) .partitionCustom(<custom_partitioner2>, _.key2) .map(...) ....
I see that only last partitioning operation (custom_partitioner2) is reflected in the DAG while the 1st one is ignored entirely.
I've also confirmed that the 1st partitioning wasn't applied at runtime from application logs.
UPD
Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
case class TestRecord(key: String) val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val testRecordSoruce = ... val randomPartitioner = new Partitioner[String] { override def partition(key: String, numPartitions: Int): Int = math.abs(key.hashCode) % numPartitions } val firstPartitioning = env .addSource(testRecordSoruce) .partitionCustom(randomPartitioner, _.key) val keyedStream = new KeyedStream( DataStreamUtils.reinterpretAsKeyedStream( firstPartitioning.javaStream, new KeySelector[TestRecord, String] { override def getKey(value: TestRecord): String = value.key } ) ) keyedStream .map(identity(_)) .partitionCustom(randomPartitioner, _.key) .map(identity(_))
This code produces the following DAG:
{ "nodes" : [ { "id" : 22, "type" : "Source: Custom Source", "pact" : "Data Source", "contents" : "Source: Custom Source", "parallelism" : 1 }, { "id" : 25, "type" : "Map", "pact" : "Operator", "contents" : "Map", "parallelism" : 1, "predecessors" : [ { "id" : 22, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 27, "type" : "Map", "pact" : "Operator", "contents" : "Map", "parallelism" : 1, "predecessors" : [ { "id" : 25, "ship_strategy" : "CUSTOM", "side" : "second" } ] } ] }
The expected behavior to have CUSTOM connection in both cases vs FORWARD then CUSTOM.