Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-1074

Set default-partitioner in SourceRDD.Unbounded.

Details

    • Improvement
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.0.0
    • runner-spark
    • None

    Description

      The SparkRunner uses mapWithState to read and manage CheckpointMarks, and this stateful operation will be followed by a shuffle:
      https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159

      Since the stateful read maps "splitSource" -> "partition of a list of read values", the following shuffle won't benefit in any way (the list of read values has not been flatMapped yet). In order to avoid shuffle we need to set the input RDD (SourceRDD.Unbounded) partitioner to be a default HashPartitioner since mapWithState would use the same partitioner and will skip shuffle if the partitioners match.

      Attachments

        Issue Links

          Activity

            People

              aviemzur Aviem Zur
              amitsela Amit Sela
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: