Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10476

Enable common RDD operations on standard Scala collections

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Minor
    • Resolution: Won't Fix
    • 1.4.1
    • None
    • Spark Core

    Description

      A common pattern in Spark development is to look for opportunities to leverage data locality using mechanisms such as mapPartitions. Often this happens when an existing set of RDD transformations is refactored to improve performance. At that point, significant code refactoring may be required because the input is Iterator[T] as opposed to an RDD. The most common examples we've encountered so far involve the *ByKey methods, sample and takeSample. We have also observed cases where, due to changes in the structure of data use of mapPartitions is no longer possible and the code has to be converted to use the RDD API.

      If data manipulation through the RDD API could be applied to the standard Scala data structures then refactoring Spark data pipelines would become faster and less bug-prone. Also, and this is no small benefit, the thoughtfulness and experience of the Spark community could spread to the broader Scala community.

      There are multiple approaches to solving this problem, including but not limited to creating a set of Local*RDD classes and/or adding implicit conversions.

      Here is a simple example meant to be short as opposed to complete or performance-optimized:

      implicit class LocalRDD[T](it: Iterator[T]) extends Iterable[T] {
        def this(collection: Iterable[T]) = this(collection.toIterator)
        def iterator = it
      }
      
      implicit class LocalPairRDD[K, V](it: Iterator[(K, V)]) extends Iterable[(K, V)] {
        def this(collection: Iterable[(K, V)]) = this(collection.toIterator)
        def iterator = it
        def groupByKey() = new LocalPairRDD[K, Iterable[V]](
          groupBy(_._1).map { case (k, valuePairs) => (k, valuePairs.map(_._2)) }
        )
      }
      
      sc.
        parallelize(Array((1, 10), (2, 10), (1, 20))).
        repartition(1).
        mapPartitions(data => data.groupByKey().toIterator).
        take(2)
      // Array[(Int, Iterable[Int])] = Array((2,List(10)), (1,List(10, 20)))
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            simeons Simeon Simeonov
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: