Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27853

Allow for custom Partitioning implementations

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.1.0
    • None
    • Optimizer, SQL
    • None

    Description

      When partitioning a Dataset Spark uses the physical plan element ShuffleExchangeExec together with a Partitioning instance.

      I find myself in situation where I need to provide my own partitioning criteria, that decides to which partition each InternalRow should belong. According to the Spark API I would expect to be able to provide my custom partitioning criteria as a custom implementation of the Partitioning interface.

      Sadly after implementing a custom Partitioning implementation you will receive a "Exchange not implemented for $newPartitioning" error message, because of the following code inside the ShuffleExchangeExec#prepareShuffleDependency method:

      val part: Partitioner = newPartitioning match {
          case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
          case HashPartitioning(_, n) =>
          new Partitioner {
              override def numPartitions: Int = n
              // For HashPartitioning, the partitioning key is already a valid partition ID, as we use
              // `HashPartitioning.partitionIdExpression` to produce partitioning key.
              override def getPartition(key: Any): Int = key.asInstanceOf[Int]
          }
          case RangePartitioning(sortingExpressions, numPartitions) =>
          // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
          // partition bounds. To get accurate samples, we need to copy the mutable keys.
          val rddForSampling = rdd.mapPartitionsInternal { iter =>
              val mutablePair = new MutablePair[InternalRow, Null]()
              iter.map(row => mutablePair.update(row.copy(), null))
          }
          implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
          new RangePartitioner(
              numPartitions,
              rddForSampling,
              ascending = true,
              samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition)
          case SinglePartition =>
          new Partitioner {
              override def numPartitions: Int = 1
              override def getPartition(key: Any): Int = 0
          }
          case _ => sys.error(s"Exchange not implemented for $newPartitioning")
          // TODO: Handle BroadcastPartitioning.
      }
      def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
          case RoundRobinPartitioning(numPartitions) =>
          // Distributes elements evenly across output partitions, starting from a random partition.
          var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
          (row: InternalRow) => {
              // The HashPartitioner will handle the `mod` by the number of partitions
              position += 1
              position
          }
          case h: HashPartitioning =>
          val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
          row => projection(row).getInt(0)
          case RangePartitioning(_, _) | SinglePartition => identity
          case _ => sys.error(s"Exchange not implemented for $newPartitioning")
      }
      

      The code in the above code snippet matches the given Partitioning instance "newPartitioning" against a set of given hardcoded Partitioning types. When adding a new Partitioning implementation the pattern matching won't be able to find a pattern for it and therefore will use the fallback case:

          case _ => sys.error(s"Exchange not implemented for $newPartitioning")
      

      and throw an exception.

      To be able to provide custom partition behaviour I would suggest to change the implementation in ShuffleExchangeExec to be able to work with an arbitrary Partitioning implementation. For the Partition creation I would imagine that this can be done in a nice way inside the Partitioning classes via a Partitioning#createPartitioner method.

      Attachments

        Activity

          People

            Unassigned Unassigned
            madoar Marc Arndt
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: