Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21507

Reinterpreting stream as keyed breaks the upstream partitioning

    XMLWordPrintableJSON

Details

    Description

      When I use multiple custom partitioning operations in a row like this:

      stream
        .partitionCustom(<custom_partitioner1>, _.key1)
        .mapWithState(...)
        .partitionCustom(<custom_partitioner2>, _.key2)
        .map(...)
        ....

      I see that only last partitioning operation (custom_partitioner2) is reflected in the DAG while the 1st one is ignored entirely.

      I've also confirmed that the 1st partitioning wasn't applied at runtime from application logs.

      UPD
      Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:

          case class TestRecord(key: String)
          
          val env = StreamExecutionEnvironment.getExecutionEnvironment 
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
          env.setParallelism(1)
      
          val testRecordSoruce = ...
      
          val randomPartitioner = new Partitioner[String] {
            override def partition(key: String, numPartitions: Int): Int = math.abs(key.hashCode) % numPartitions
          }
      
          val firstPartitioning = env
            .addSource(testRecordSoruce)
            .partitionCustom(randomPartitioner, _.key)
      
          val keyedStream = new KeyedStream(
            DataStreamUtils.reinterpretAsKeyedStream(
              firstPartitioning.javaStream,
              new KeySelector[TestRecord, String] {
                override def getKey(value: TestRecord): String = value.key
              }
            )
          )
      
          keyedStream
            .map(identity(_))
            .partitionCustom(randomPartitioner, _.key)
            .map(identity(_))
       
       

      This code produces the following DAG:

      {
        "nodes" : [ {
          "id" : 22,
          "type" : "Source: Custom Source",
          "pact" : "Data Source",
          "contents" : "Source: Custom Source",
          "parallelism" : 1
        }, {
          "id" : 25,
          "type" : "Map",
          "pact" : "Operator",
          "contents" : "Map",
          "parallelism" : 1,
          "predecessors" : [ {
            "id" : 22,
            "ship_strategy" : "FORWARD",
            "side" : "second"
          } ]
        }, {
          "id" : 27,
          "type" : "Map",
          "pact" : "Operator",
          "contents" : "Map",
          "parallelism" : 1,
          "predecessors" : [ {
            "id" : 25,
            "ship_strategy" : "CUSTOM",
            "side" : "second"
          } ]
        } ]
      }
      

      The expected behavior to have CUSTOM connection in both cases vs FORWARD then CUSTOM.
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            izeigerman Iaroslav Zeigerman
            Votes:
            2 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: