Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3655

Support sorting of values in addition to keys (i.e. secondary sort)

    Details

    • Type: New Feature
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.1.0, 1.2.0
    • Fix Version/s: None
    • Component/s: Spark Core
    • Labels:
      None

      Description

      Now that spark has a sort based shuffle, can we expect a secondary sort soon? There are some use cases where getting a sorted iterator of values per key is helpful.

        Issue Links

          Activity

          Hide
          matei Matei Zaharia added a comment -

          I believe you can build this on top of sortByKey with mapPartitions. The values for each key are guaranteed to go to the same node (though we should document that). Or are you looking to partition the keys by one function and have the values sorted by another? In that case we added this weird repartitionAndSortWithinPartitions function to OrderedRDDFunctions that would do the trick (it was added to make it easier to port apps from MapReduce).

          Show
          matei Matei Zaharia added a comment - I believe you can build this on top of sortByKey with mapPartitions. The values for each key are guaranteed to go to the same node (though we should document that). Or are you looking to partition the keys by one function and have the values sorted by another? In that case we added this weird repartitionAndSortWithinPartitions function to OrderedRDDFunctions that would do the trick (it was added to make it easier to port apps from MapReduce).
          Hide
          koert koert kuipers added a comment -

          hey matei,
          i was referring to the partition by one key and values sorted by another.

          does adding a keyOrdering in the ShuffleDependency lead to an efficient (secondary) sorting on that key in the sort-based shuffle? i haven't looked at the code yet for SortShuffleManager and SortShuffleWriter...

          Show
          koert koert kuipers added a comment - hey matei, i was referring to the partition by one key and values sorted by another. does adding a keyOrdering in the ShuffleDependency lead to an efficient (secondary) sorting on that key in the sort-based shuffle? i haven't looked at the code yet for SortShuffleManager and SortShuffleWriter...
          Hide
          koert koert kuipers added a comment - - edited

          i am not sure repartitionAndSortWithinPartitions does what i want. what i want to do is for a given RDD[(K, V)] is use the sort-based shuffle to group by key but sort by (K, V), so that for each key the values come out sorted in the resulting RDD

          i could do something like map a RDD[(K, V)] to a RDD[((K, V), V] and then use sortByKey, which does result in the values sorted for each key, but if i do that i have no guarantee that all values for a given key end up in same partition.

          maybe i am missing something...
          best, koert

          Show
          koert koert kuipers added a comment - - edited i am not sure repartitionAndSortWithinPartitions does what i want. what i want to do is for a given RDD [(K, V)] is use the sort-based shuffle to group by key but sort by (K, V), so that for each key the values come out sorted in the resulting RDD i could do something like map a RDD [(K, V)] to a RDD [((K, V), V] and then use sortByKey, which does result in the values sorted for each key, but if i do that i have no guarantee that all values for a given key end up in same partition. maybe i am missing something... best, koert
          Hide
          koert koert kuipers added a comment -

          i went through the code. to allow a secondary sort on values changes required are:

          ShuffleDependency[K, V, C] should have a Option[Ordering[Product2[K, C]]] instead of Option[Ordering[K]]

          the changes to shuffle classes seem straightforward, but i get bogged down in the details of classes in collections, in particular AppendOnlyMap and the usage of Sorter

          Show
          koert koert kuipers added a comment - i went through the code. to allow a secondary sort on values changes required are: ShuffleDependency [K, V, C] should have a Option[Ordering[Product2 [K, C] ]] instead of Option[Ordering [K] ] the changes to shuffle classes seem straightforward, but i get bogged down in the details of classes in collections, in particular AppendOnlyMap and the usage of Sorter
          Hide
          pwendell Patrick Wendell added a comment -

          Hey Koert Kuipers - i'm not an expert on this part of the code, but what if you used a compound key composed of the key and the value, then you write a custom partitioner that partitioned based only on the key component, then you used repartitionAndSortWithinPartition?

          Show
          pwendell Patrick Wendell added a comment - Hey Koert Kuipers - i'm not an expert on this part of the code, but what if you used a compound key composed of the key and the value, then you write a custom partitioner that partitioned based only on the key component, then you used repartitionAndSortWithinPartition?
          Hide
          koert koert kuipers added a comment -

          hey patrick. i was looking into modifying the sorting code so far, but you
          are saying modify partitioning instead. great. will look into that approach
          as well. it might be easier/cleaner.

          Show
          koert koert kuipers added a comment - hey patrick. i was looking into modifying the sorting code so far, but you are saying modify partitioning instead. great. will look into that approach as well. it might be easier/cleaner.
          Hide
          pwendell Patrick Wendell added a comment -

          Yeah so to be clear here is what I meant:

          1. Map your RDD[(K, V)] to an RDD[((K, V), null)]
          2. Write a custom partitioner that partitions based only on the K component of the key.
          3. Call repartitionAndSortWithinPartition with your custom partitioner
          4. Map the RDD back into RDD[(K, V)]

          This will return an RDD where each partition is sorted by both key and value order. If you called mapPartitions on you would cycle through them in sorted order.

          Show
          pwendell Patrick Wendell added a comment - Yeah so to be clear here is what I meant: 1. Map your RDD [(K, V)] to an RDD [((K, V), null)] 2. Write a custom partitioner that partitions based only on the K component of the key. 3. Call repartitionAndSortWithinPartition with your custom partitioner 4. Map the RDD back into RDD [(K, V)] This will return an RDD where each partition is sorted by both key and value order. If you called mapPartitions on you would cycle through them in sorted order.
          Hide
          koert koert kuipers added a comment -

          yes, that makes sense.

          i am working right now on supporting proper sorting on keys and values (as
          opposed to just keys). its almost done. my first use case will be
          foldLeftByKey with a custom Ordering for values (usefor for time series).

          once i finish with this i wil investigate the approach you described, and
          then compare them to see what is more elegant.

          On Thu, Oct 23, 2014 at 3:14 PM, Patrick Wendell (JIRA) <jira@apache.org>

          Show
          koert koert kuipers added a comment - yes, that makes sense. i am working right now on supporting proper sorting on keys and values (as opposed to just keys). its almost done. my first use case will be foldLeftByKey with a custom Ordering for values (usefor for time series). once i finish with this i wil investigate the approach you described, and then compare them to see what is more elegant. On Thu, Oct 23, 2014 at 3:14 PM, Patrick Wendell (JIRA) <jira@apache.org>
          Hide
          pwendell Patrick Wendell added a comment -

          Okay, sounds good.

          Show
          pwendell Patrick Wendell added a comment - Okay, sounds good.
          Hide
          koert koert kuipers added a comment -

          can you assign to me?
          i will have 2 pullreq in a few days

          Show
          koert koert kuipers added a comment - can you assign to me? i will have 2 pullreq in a few days
          Hide
          koert koert kuipers added a comment -

          first pullreq is here:
          https://github.com/apache/spark/pull/2962

          On Sun, Oct 26, 2014 at 2:02 PM, Koert Kuipers <koertkuipers@gmail.com>

          Show
          koert koert kuipers added a comment - first pullreq is here: https://github.com/apache/spark/pull/2962 On Sun, Oct 26, 2014 at 2:02 PM, Koert Kuipers <koertkuipers@gmail.com>
          Hide
          koert koert kuipers added a comment -

          second pullreq is here:
          https://github.com/apache/spark/pull/2963

          On Mon, Oct 27, 2014 at 3:50 PM, Koert Kuipers <koertkuipers@gmail.com>

          Show
          koert koert kuipers added a comment - second pullreq is here: https://github.com/apache/spark/pull/2963 On Mon, Oct 27, 2014 at 3:50 PM, Koert Kuipers <koertkuipers@gmail.com>
          Hide
          sandyr Sandy Ryza added a comment -

          Hey koert kuipers, I think the transform that would most closely mimic MR-style secondary would be a groupByKeyAndSortValues. Not that a foldBy transformation wouldn't be useful as well. Do you have any interest in implementing the former? If not, I'll have a go at it.

          Show
          sandyr Sandy Ryza added a comment - Hey koert kuipers , I think the transform that would most closely mimic MR-style secondary would be a groupByKeyAndSortValues. Not that a foldBy transformation wouldn't be useful as well. Do you have any interest in implementing the former? If not, I'll have a go at it.
          Hide
          koert koert kuipers added a comment -

          something that takes in an ordering, and exposes an sorted iterator of the
          values? that would indeed be more generic.
          i can add that.

          any preference which pullreq? if not i will just keep going with both

          Show
          koert koert kuipers added a comment - something that takes in an ordering, and exposes an sorted iterator of the values? that would indeed be more generic. i can add that. any preference which pullreq? if not i will just keep going with both
          Hide
          sandyr Sandy Ryza added a comment -

          The repartitionAndSortWithinPartitions approach seems preferable to me

          Show
          sandyr Sandy Ryza added a comment - The repartitionAndSortWithinPartitions approach seems preferable to me
          Hide
          koert koert kuipers added a comment -

          should there be a foldLeft that does not sort? zsxwing mentioned on pullreq that sorting is not necessary part of foldLeft, but i am having a hard time coming up with use cases that do not involve sorting.

          Show
          koert koert kuipers added a comment - should there be a foldLeft that does not sort? zsxwing mentioned on pullreq that sorting is not necessary part of foldLeft, but i am having a hard time coming up with use cases that do not involve sorting.
          Hide
          sandyr Sandy Ryza added a comment -

          foldLeft only conceptually makes sense when applied to an ordered collection. Requiring an Ordering for the values seems reasonable to me. Though maybe explicitly calling it foldLeftSortedValuesByKey would be more clear?

          Show
          sandyr Sandy Ryza added a comment - foldLeft only conceptually makes sense when applied to an ordered collection. Requiring an Ordering for the values seems reasonable to me. Though maybe explicitly calling it foldLeftSortedValuesByKey would be more clear?
          Hide
          koert koert kuipers added a comment -

          Sandy Ryza
          i updated pullreq to include groupByKeyAndSortValues

          https://github.com/apache/spark/pull/2963

          Show
          koert koert kuipers added a comment - Sandy Ryza i updated pullreq to include groupByKeyAndSortValues https://github.com/apache/spark/pull/2963
          Hide
          sandyr Sandy Ryza added a comment -

          Thanks Koert, will take a look soon. Can we separate foldByKey and groupByKeyAndSortValues into two different issues / PRs?

          Show
          sandyr Sandy Ryza added a comment - Thanks Koert, will take a look soon. Can we separate foldByKey and groupByKeyAndSortValues into two different issues / PRs?
          Hide
          pwendell Patrick Wendell added a comment -

          +1 to Sandy's comment. I think groupByKeyAndSortValues is really good to have (also, it would be good to include Java and Python versions for this since we do this for all new API's now). foldByKey is a little more esoteric, that might be one we want in user libraries rather than in Spark core.

          Show
          pwendell Patrick Wendell added a comment - +1 to Sandy's comment. I think groupByKeyAndSortValues is really good to have (also, it would be good to include Java and Python versions for this since we do this for all new API's now). foldByKey is a little more esoteric, that might be one we want in user libraries rather than in Spark core.
          Hide
          koert koert kuipers added a comment -

          Sandy Ryza
          One issue with groupByKeyAndSortValues that i did not have with foldLeft is that a user has no obligation to finish reading all values from the Iterable[V] of the sorted values. This leads to wrong results in the current implementation, since the underlying iterator is then wrongly positioned for the next call.
          I will add a unit test for this and see if there is an elegant/efficient way to handle this.

          Show
          koert koert kuipers added a comment - Sandy Ryza One issue with groupByKeyAndSortValues that i did not have with foldLeft is that a user has no obligation to finish reading all values from the Iterable [V] of the sorted values. This leads to wrong results in the current implementation, since the underlying iterator is then wrongly positioned for the next call. I will add a unit test for this and see if there is an elegant/efficient way to handle this.
          Hide
          koert koert kuipers added a comment -

          i also dont like the signature
          def groupByKeyAndSortValues(valueOrdering: Ordering[V], partitioner: Partitioner): RDD[(K, Iterable[V])]
          i doubt it can be implemented efficiently

          i would much prefer
          def groupByKeyAndSortValues(valueOrdering: Ordering[V], partitioner: Partitioner): RDD[(K, TraversableOnce[V])]

          but that is inconsistent with groupByKey (which i guess has Iterable in it's return type for historical reasons.. used to be Seq)

          Show
          koert koert kuipers added a comment - i also dont like the signature def groupByKeyAndSortValues(valueOrdering: Ordering [V] , partitioner: Partitioner): RDD[(K, Iterable [V] )] i doubt it can be implemented efficiently i would much prefer def groupByKeyAndSortValues(valueOrdering: Ordering [V] , partitioner: Partitioner): RDD[(K, TraversableOnce [V] )] but that is inconsistent with groupByKey (which i guess has Iterable in it's return type for historical reasons.. used to be Seq)
          Hide
          apachespark Apache Spark added a comment -

          User 'koertkuipers' has created a pull request for this issue:
          https://github.com/apache/spark/pull/3632

          Show
          apachespark Apache Spark added a comment - User 'koertkuipers' has created a pull request for this issue: https://github.com/apache/spark/pull/3632
          Hide
          koert koert kuipers added a comment - - edited

          i have a new pullreq that implements just groupByKeyAndSortValues in scala and java. i will need some help with python.

          pullreq is here:
          https://github.com/apache/spark/pull/3632

          i changed methods to return RDD[(K, TraversableOnce[V])] instead of RDD[(K, Iterable[V])], since i dont see a reasonable way to implement it so that it returns Iterables without resorting to keeping the data in memory.
          The assumption made is that once you move on to the next key within a partition that the previous value (so the TraversableOnce[V]) will no longer be used.

          I personally find this API too generic, and too easy to abuse or make mistakes with. So i prefer a more constrained API like foldLeft. Or perhaps groupByKeyAndSortValues could be DeveloperAPI?

          Show
          koert koert kuipers added a comment - - edited i have a new pullreq that implements just groupByKeyAndSortValues in scala and java. i will need some help with python. pullreq is here: https://github.com/apache/spark/pull/3632 i changed methods to return RDD[(K, TraversableOnce [V] )] instead of RDD[(K, Iterable [V] )], since i dont see a reasonable way to implement it so that it returns Iterables without resorting to keeping the data in memory. The assumption made is that once you move on to the next key within a partition that the previous value (so the TraversableOnce [V] ) will no longer be used. I personally find this API too generic, and too easy to abuse or make mistakes with. So i prefer a more constrained API like foldLeft. Or perhaps groupByKeyAndSortValues could be DeveloperAPI?
          Hide
          sandyr Sandy Ryza added a comment -

          The groupBy Iterable vs. TraversableOnce conversation has come up a few times and the resolution has been that, both for consistency and to not have strange interactions with things like caching and downstream shuffles, we need to use Iterable. Some relevant discussion is on SPARK-4644, SPARK-2978, and the PR associated with SPARK-3461. I personally agree that we need to add some APIs that permit these operations to be more efficient. But I think for the moment buffering each group in memory is the best we can do.

          Show
          sandyr Sandy Ryza added a comment - The groupBy Iterable vs. TraversableOnce conversation has come up a few times and the resolution has been that, both for consistency and to not have strange interactions with things like caching and downstream shuffles, we need to use Iterable. Some relevant discussion is on SPARK-4644 , SPARK-2978 , and the PR associated with SPARK-3461 . I personally agree that we need to add some APIs that permit these operations to be more efficient. But I think for the moment buffering each group in memory is the best we can do.
          Hide
          koert koert kuipers added a comment -

          OK that can be done. It definitely highlights that operations like foldLeft
          (that do not need the data in memory) should then not be based on
          groupByKeyAndSortValues.
          So foldLeft then becomes a completely standalone pullreq.

          Show
          koert koert kuipers added a comment - OK that can be done. It definitely highlights that operations like foldLeft (that do not need the data in memory) should then not be based on groupByKeyAndSortValues. So foldLeft then becomes a completely standalone pullreq.
          Hide
          koert koert kuipers added a comment -

          I will update the pullrequest to put out a version that buffers in memory and uses Iterables later this week.

          Show
          koert koert kuipers added a comment - I will update the pullrequest to put out a version that buffers in memory and uses Iterables later this week.
          Hide
          koert koert kuipers added a comment -

          i updated the pullreq to use Iterables instead of TraversableOnce

          i also wanted to take this opportunity to one more time make a pitch for foldLeft. i think we should implement foldLeft because
          1) it is a well known operation that perfectly fits many problems such as time series analysis
          2) it does not need to make the in-memory assumption for the sorted values, which is crucial for a lot of problems
          3) it is (i think?) the most basic api that does not need values in memory, since it uses a repeated operation that uses the values like a Traversable and builds the return value. no Iterator or TraversableOnce is exposed, so it does not have potential strange interactions with things like caching and downstream shuffles.
          4) groupByKeysAndSortValues (which does keep values in memory) can be expressed in foldLeft trivially:
          groupByKeysAndSortValues(valueOrdering) = foldLeftByKey(valueOrdering, new ArrayBuffer[V])(_ += _)

          Show
          koert koert kuipers added a comment - i updated the pullreq to use Iterables instead of TraversableOnce i also wanted to take this opportunity to one more time make a pitch for foldLeft. i think we should implement foldLeft because 1) it is a well known operation that perfectly fits many problems such as time series analysis 2) it does not need to make the in-memory assumption for the sorted values, which is crucial for a lot of problems 3) it is (i think?) the most basic api that does not need values in memory, since it uses a repeated operation that uses the values like a Traversable and builds the return value. no Iterator or TraversableOnce is exposed, so it does not have potential strange interactions with things like caching and downstream shuffles. 4) groupByKeysAndSortValues (which does keep values in memory) can be expressed in foldLeft trivially: groupByKeysAndSortValues(valueOrdering) = foldLeftByKey(valueOrdering, new ArrayBuffer [V] )(_ += _)
          Hide
          irashid Imran Rashid added a comment -

          I think secondary sort is a great addition, thanks for working on this koert kuipers.

          But the discussions about TraversableOnce vs. Iterable have been bugging me, making me feel perhaps there ought to be a different api, and I think I've figured out why. I'd like to make a very different proposal for how we expose this.

          Rather than having some function take an unsorted RDD, do the shuffling, and then give you exactly one view over the result, shouldn't there be some type for just the sorted RDD, and then that type lets you call any of the different views on it.

          We could create SortedRDD as a subclass of RDD. It would have the property that data was partition by X and sorted by X,Y. All the usual RDD functions would exist, but eg. mapPartitions would just have the additional property that you're iterating over elements in sorted order. And it could have all the other util functions you like as well, eg. foldLeftByKey, groupByKeys, etc. etc. which could all be built on top of mapPartitions.

          I think explicitly tracking the idea of a sorted RDD buys us a few nice things:

          1) at the most basic api level (mapPartitions), we don't get stuck into debates about TraversableOnce vs Iterable, whether groups needs to fit in-memory or not, etc. mapPartitions gives you an iterator, which implies nothing is in memory, and which gives us the flexibility to change implementations down the road which don't have the same requirements for buffering things in memory. (Though for now we could still add the util functions which do require more mem.)

          2) Spark can do operations on sorted data, even if its not doing the sorting itself. Eg., if you read a sorted file from hdfs (or from any other datastore for that matter), you shouldn't need to force spark to sort the data again just so you get access to the util functions which use sorting. Right now this logic would need to live at application level, but this would be the first step for us to integrate it more tightly into spark itself.
          (sort of related to SPARK-1061)

          3) I've always felt that the need to pull out the grouping key into the first element of a tuple is a little klunky – we can do away with that. The X & Y for partitioning & sorting could be specified by arbitrary functions. Eg. say you have some case class MyData(name: String, value: Int, count: Long), its a nuisance to say

          rdd.groupBy{_._1}.map{case (name, records) => records.map{case MyData(n2,v, c) => ...}}

          or

          rdd.map

          {x => x.name -> x}

          .groupByKeyAndSortValues(Ordering.by

          {x => (x.name, x.value)}

          ).map

          {case(name, records) => ...}

          I'd prefer

          val sortedRdd:SortedRDD = rdd.groupAndSort(_.name, _.value)

          and then getting to do any of:

          sortedRdd.foldByKey(0)

          {case(prev, next) => ...}

          or

          sortedRdd.mapGroups

          {case (name, records) => ...}

          or

          sortedRdd.mapPartitions

          {itr => ...}

          (Again, note that sortedRdd doesn't have to come from a sort by spark; it could actually come directly from hdfs if the data was written out correctly, or from any other input data source with the right properties)

          Show
          irashid Imran Rashid added a comment - I think secondary sort is a great addition, thanks for working on this koert kuipers . But the discussions about TraversableOnce vs. Iterable have been bugging me, making me feel perhaps there ought to be a different api, and I think I've figured out why. I'd like to make a very different proposal for how we expose this. Rather than having some function take an unsorted RDD, do the shuffling, and then give you exactly one view over the result, shouldn't there be some type for just the sorted RDD, and then that type lets you call any of the different views on it. We could create SortedRDD as a subclass of RDD. It would have the property that data was partition by X and sorted by X,Y. All the usual RDD functions would exist, but eg. mapPartitions would just have the additional property that you're iterating over elements in sorted order. And it could have all the other util functions you like as well, eg. foldLeftByKey, groupByKeys, etc. etc. which could all be built on top of mapPartitions. I think explicitly tracking the idea of a sorted RDD buys us a few nice things: 1) at the most basic api level (mapPartitions), we don't get stuck into debates about TraversableOnce vs Iterable, whether groups needs to fit in-memory or not, etc. mapPartitions gives you an iterator, which implies nothing is in memory, and which gives us the flexibility to change implementations down the road which don't have the same requirements for buffering things in memory. (Though for now we could still add the util functions which do require more mem.) 2) Spark can do operations on sorted data, even if its not doing the sorting itself. Eg., if you read a sorted file from hdfs (or from any other datastore for that matter), you shouldn't need to force spark to sort the data again just so you get access to the util functions which use sorting. Right now this logic would need to live at application level, but this would be the first step for us to integrate it more tightly into spark itself. (sort of related to SPARK-1061 ) 3) I've always felt that the need to pull out the grouping key into the first element of a tuple is a little klunky – we can do away with that. The X & Y for partitioning & sorting could be specified by arbitrary functions. Eg. say you have some case class MyData(name: String, value: Int, count: Long), its a nuisance to say rdd.groupBy{_._1}.map{case (name, records) => records.map{case MyData(n2,v, c) => ...}} or rdd.map {x => x.name -> x} .groupByKeyAndSortValues(Ordering.by {x => (x.name, x.value)} ).map {case(name, records) => ...} I'd prefer val sortedRdd:SortedRDD = rdd.groupAndSort(_.name, _.value) and then getting to do any of: sortedRdd.foldByKey(0) {case(prev, next) => ...} or sortedRdd.mapGroups {case (name, records) => ...} or sortedRdd.mapPartitions {itr => ...} (Again, note that sortedRdd doesn't have to come from a sort by spark; it could actually come directly from hdfs if the data was written out correctly, or from any other input data source with the right properties)
          Hide
          koert koert kuipers added a comment -

          Imran,
          Thanks for taking the time to write this down!

          Just to be clear:
          val x = RDD[X]
          x.groupAndSort(f1, f2) where f1 is X => K and f2 is X => V would produce a
          SortedRDD[K, V]?

          SortedRDD makes me think of OrderedRDD. The RDD you describe is partitioned
          by key and sorted by (key, value). SecondarySortedRDD? Not nice either...

          An implementation of what you suggest could be done pretty quickly with the
          code in the current pullreq. It's an existing building block in the code
          already somewhat.

          Curious to hear what others think.

          On Fri, Dec 19, 2014 at 5:15 PM, Imran Rashid (JIRA) <jira@apache.org>

          Show
          koert koert kuipers added a comment - Imran, Thanks for taking the time to write this down! Just to be clear: val x = RDD [X] x.groupAndSort(f1, f2) where f1 is X => K and f2 is X => V would produce a SortedRDD [K, V] ? SortedRDD makes me think of OrderedRDD. The RDD you describe is partitioned by key and sorted by (key, value). SecondarySortedRDD? Not nice either... An implementation of what you suggest could be done pretty quickly with the code in the current pullreq. It's an existing building block in the code already somewhat. Curious to hear what others think. On Fri, Dec 19, 2014 at 5:15 PM, Imran Rashid (JIRA) <jira@apache.org>
          Hide
          irashid Imran Rashid added a comment -

          Hey Koert,

          good questions about the types, I hadn't really thought about it yet. I guess I'm actually proposing 3 type parameters – the row type doesn't change at all, but there are additional types for the partitioning and sorting.

          val x: RDD[X] = ...
          val y: SortedRDD[X,K,V] = x.groupAndSort(f1, f2)

          so then you'd have

          mapPartitions[Y](f: Iterator[X] => Iterator[Y]): RDD[Y]

          mapGroup[Y](f: (K, Iterator[X]) => Iterator[Y]): RDD[Y]

          foldByKey[Y](zero:Y)(f: (Y, X) => Y): RDD[Y]

          or maybe the return type of mapGroup & foldByKey would be RDD[(K,Seq[Y])] or something ... or there is another variant which would let you return another SortedRDD. probably need to try out some variants and see how they look.

          Having three type parameters is a little unwieldy ... maybe we don't even bother keeping the types K & V if they don't actually get us anything. Eg. I dont' think you actually need to expose the type V at all. You really just need to keep an Ordering[X] as a member variable. Then groupAndSort takes an X => V and constructs an Ordering[X] out of it.

          yeah I dunno about name either ... PartitionSortedRdd? GroupSortedRdd? ...

          Glad you are interested in this and think an implementation would be easy. I was actually going to suggest that maybe I'm proposing a bigger change, so it should come after the existing work you've done. Especially since I'm really proposing adding some new apis for even basic partitioning & grouping, even without involving secondary sort at all ...

          Show
          irashid Imran Rashid added a comment - Hey Koert, good questions about the types, I hadn't really thought about it yet. I guess I'm actually proposing 3 type parameters – the row type doesn't change at all, but there are additional types for the partitioning and sorting. val x: RDD [X] = ... val y: SortedRDD [X,K,V] = x.groupAndSort(f1, f2) so then you'd have mapPartitions [Y] (f: Iterator [X] => Iterator [Y] ): RDD [Y] mapGroup [Y] (f: (K, Iterator [X] ) => Iterator [Y] ): RDD [Y] foldByKey [Y] (zero:Y)(f: (Y, X) => Y): RDD [Y] or maybe the return type of mapGroup & foldByKey would be RDD[(K,Seq [Y] )] or something ... or there is another variant which would let you return another SortedRDD. probably need to try out some variants and see how they look. Having three type parameters is a little unwieldy ... maybe we don't even bother keeping the types K & V if they don't actually get us anything. Eg. I dont' think you actually need to expose the type V at all. You really just need to keep an Ordering [X] as a member variable. Then groupAndSort takes an X => V and constructs an Ordering [X] out of it. yeah I dunno about name either ... PartitionSortedRdd? GroupSortedRdd? ... Glad you are interested in this and think an implementation would be easy. I was actually going to suggest that maybe I'm proposing a bigger change, so it should come after the existing work you've done. Especially since I'm really proposing adding some new apis for even basic partitioning & grouping, even without involving secondary sort at all ...
          Hide
          koert koert kuipers added a comment -

          Imran,
          I think the groupAndSort function is easy to implement with the code in
          this pullreq (which includes the custom partitioning and sorting by key +
          value that you would need), but i agree with you that the rest of what you
          suggest involves a bigger change. So it's probably better to create a new
          jira and start with it after this one is done. We will be able to
          re-use/refactor the code of this pullreq which by then hopefully has been
          merged into master.

          I have some more questions involving types and design, but i will send
          those to you offline.

          On Sat, Dec 20, 2014 at 3:03 PM, Imran Rashid (JIRA) <jira@apache.org>

          Show
          koert koert kuipers added a comment - Imran, I think the groupAndSort function is easy to implement with the code in this pullreq (which includes the custom partitioning and sorting by key + value that you would need), but i agree with you that the rest of what you suggest involves a bigger change. So it's probably better to create a new jira and start with it after this one is done. We will be able to re-use/refactor the code of this pullreq which by then hopefully has been merged into master. I have some more questions involving types and design, but i will send those to you offline. On Sat, Dec 20, 2014 at 3:03 PM, Imran Rashid (JIRA) <jira@apache.org>
          Hide
          koert koert kuipers added a comment - - edited

          since the last pullreq for this ticket i created spark-sorted (based on suggestions from imran), a small library for spark that supports the target features of this ticket, but without the burden of having to be fully compatible with the current spark api conventions (with regards to ordering being implicit).
          i also got a chance to catch up with sandy at spark summit east and we exchanged some emails afterward about this jira ticket and possible design choices.

          so based on those experiences i think there are better alternatives than the current pullreq (https://github.com/apache/spark/pull/3632), and i will close it. the pullreq does bring secondary sort to spark, but only in memory, which is a very limited feature (since if the values can be stored in memory then sorting after the shuffle isn't really that hard, just wasteful).

          instead of the current pullreq i see 2 alternatives:
          1) a new pullreq that introduces the mapStream api, which is very similar to the reduce operation as we know it in hadoop: an sorted streaming reduce. Its signature would be something like this on RDD[(K, V)]:

            def mapStreamByKey[W](partitioner: Partitioner, f: Iterator[V] => Iterator[W])(implicit o1: Ordering[K], o2: Ordering[V]): RDD[(K, W)]
          

          (note that the implicits would not actually be on the method as shown here, but on a class conversion, similar to how PairRDDFunctions works.

          2) don't to anything. the functionality this jira targets is already available in the small spark-sorted library which is available on spark-packages, and that's good enough.

          Show
          koert koert kuipers added a comment - - edited since the last pullreq for this ticket i created spark-sorted (based on suggestions from imran), a small library for spark that supports the target features of this ticket, but without the burden of having to be fully compatible with the current spark api conventions (with regards to ordering being implicit). i also got a chance to catch up with sandy at spark summit east and we exchanged some emails afterward about this jira ticket and possible design choices. so based on those experiences i think there are better alternatives than the current pullreq ( https://github.com/apache/spark/pull/3632 ), and i will close it. the pullreq does bring secondary sort to spark, but only in memory, which is a very limited feature (since if the values can be stored in memory then sorting after the shuffle isn't really that hard, just wasteful). instead of the current pullreq i see 2 alternatives: 1) a new pullreq that introduces the mapStream api, which is very similar to the reduce operation as we know it in hadoop: an sorted streaming reduce. Its signature would be something like this on RDD [(K, V)] : def mapStreamByKey[W](partitioner: Partitioner, f: Iterator[V] => Iterator[W])(implicit o1: Ordering[K], o2: Ordering[V]): RDD[(K, W)] (note that the implicits would not actually be on the method as shown here, but on a class conversion, similar to how PairRDDFunctions works. 2) don't to anything. the functionality this jira targets is already available in the small spark-sorted library which is available on spark-packages, and that's good enough.
          Hide
          sandyr Sandy Ryza added a comment -

          My opinion is that a secondary sort operator in core Spark would definitely be useful.

          Show
          sandyr Sandy Ryza added a comment - My opinion is that a secondary sort operator in core Spark would definitely be useful.
          Hide
          swethakasireddy swetha k added a comment -

          Sandy Ryza koert kuipers

          What is the best option to do secondary sorting with good performance? groupByKeyAndSortValues or spark-sorted library? I am new to Spark, so any suggestion would be helpful.

          Thanks,
          Swetha

          Show
          swethakasireddy swetha k added a comment - Sandy Ryza koert kuipers What is the best option to do secondary sorting with good performance? groupByKeyAndSortValues or spark-sorted library? I am new to Spark, so any suggestion would be helpful. Thanks, Swetha
          Hide
          koertkuipers Koert Kuipers added a comment -

          it depends on the size of your values per key.

          if the values per key fit comfortably in memory then you can just do
          groupBy and sort values yourself. i not i would use spark-sorted. i would
          not use groupByKeyAndSortValues (from
          https://github.com/apache/spark/pull/3632)

          On Wed, Aug 12, 2015 at 5:55 PM, SWETHARAM KASIREDDI (JIRA) <jira@apache.org

          Show
          koertkuipers Koert Kuipers added a comment - it depends on the size of your values per key. if the values per key fit comfortably in memory then you can just do groupBy and sort values yourself. i not i would use spark-sorted. i would not use groupByKeyAndSortValues (from https://github.com/apache/spark/pull/3632 ) On Wed, Aug 12, 2015 at 5:55 PM, SWETHARAM KASIREDDI (JIRA) <jira@apache.org
          Hide
          swethakasireddy swetha k added a comment -

          koert kuipers

          Could you please provide an example call as to how spark-sorted should be used for my scenario?

          For a bunch of user sessions, I have to first Sort by session Id which is the key and further sort the values by timeStamp which is present in the value.

          So basically I need to have all the sessionIds sorted and related values for each sessionId be sorted by timestamp.

          Thanks,
          Swetha

          Show
          swethakasireddy swetha k added a comment - koert kuipers Could you please provide an example call as to how spark-sorted should be used for my scenario? For a bunch of user sessions, I have to first Sort by session Id which is the key and further sort the values by timeStamp which is present in the value. So basically I need to have all the sessionIds sorted and related values for each sessionId be sorted by timestamp. Thanks, Swetha
          Hide
          koertkuipers Koert Kuipers added a comment -

          i assume you want to do some analysis on the values sorted by timeStamp
          within each Session ID?
          this is very similar to the stock price/quote example given in the
          spark-sorted readme.

          see:
          https://github.com/tresata/spark-sorted/blob/master/README.md

          also check out the unit test:
          https://github.com/tresata/spark-sorted/blob/master/src/test/scala/com/tresata/spark/sorted/GroupSortedSpec.scala

          On Wed, Aug 12, 2015 at 6:49 PM, SWETHARAM KASIREDDI (JIRA) <jira@apache.org

          Show
          koertkuipers Koert Kuipers added a comment - i assume you want to do some analysis on the values sorted by timeStamp within each Session ID? this is very similar to the stock price/quote example given in the spark-sorted readme. see: https://github.com/tresata/spark-sorted/blob/master/README.md also check out the unit test: https://github.com/tresata/spark-sorted/blob/master/src/test/scala/com/tresata/spark/sorted/GroupSortedSpec.scala On Wed, Aug 12, 2015 at 6:49 PM, SWETHARAM KASIREDDI (JIRA) <jira@apache.org
          Hide
          tropicalpb Nick Xie added a comment -

          I need a sessionize example whereby all records are first grouped by the key (e.g. machine id), sorted on the timestamp, and then break-up to multiple sessions via a status-code change. (e.g from 1 to 0 and then 0 to 1). Any help on how to achieve this using groupsort would be greatly appreciated.

          thanks

          Show
          tropicalpb Nick Xie added a comment - I need a sessionize example whereby all records are first grouped by the key (e.g. machine id), sorted on the timestamp, and then break-up to multiple sessions via a status-code change. (e.g from 1 to 0 and then 0 to 1). Any help on how to achieve this using groupsort would be greatly appreciated. thanks
          Hide
          koertkuipers Koert Kuipers added a comment -

          hey nick,
          i believe your problem sounds like a good fit for GroupSorted. you can
          process the sessions per machine sorted by timestamp as an iterator using
          mapStreamByKey, keeping state as you iterate to detect status-code changes
          and then assign some identifier to break up sessions accordingly.

          the only tricky part is keeping state as you iterate. i am not sure of my
          suggested way below is the best.

          pseudo scala code:

          rdd[(MachineId, Session)].groupSort(numPartitions,
          timestampOrdering).mapStreamByKey{ iterator =>
          var uuid = UUID.randomUUID.toString
          iterator.map

          { session => if (session.statusCode == 123) uuid = UUID.randomUUID.toString (uuid, session) }

          }

          the way to do this without a mutable state would be something like
          Iterator.scanLeft

          Show
          koertkuipers Koert Kuipers added a comment - hey nick, i believe your problem sounds like a good fit for GroupSorted. you can process the sessions per machine sorted by timestamp as an iterator using mapStreamByKey, keeping state as you iterate to detect status-code changes and then assign some identifier to break up sessions accordingly. the only tricky part is keeping state as you iterate. i am not sure of my suggested way below is the best. pseudo scala code: rdd [(MachineId, Session)] .groupSort(numPartitions, timestampOrdering).mapStreamByKey{ iterator => var uuid = UUID.randomUUID.toString iterator.map { session => if (session.statusCode == 123) uuid = UUID.randomUUID.toString (uuid, session) } } the way to do this without a mutable state would be something like Iterator.scanLeft
          Hide
          tropicalpb Nick Xie added a comment -

          I wanted to add a session id to each detail record, but only way I can do that with mapStreamByKey is to create a LinkedList of detail records and return the lists' iterator which will take up extra memory as opposed to just modifying the record. I ended up just creating a linkedlist of only session records. It seems to work on my test machine. I will test it on the cluster next week.

          Show
          tropicalpb Nick Xie added a comment - I wanted to add a session id to each detail record, but only way I can do that with mapStreamByKey is to create a LinkedList of detail records and return the lists' iterator which will take up extra memory as opposed to just modifying the record. I ended up just creating a linkedlist of only session records. It seems to work on my test machine. I will test it on the cluster next week.
          Hide
          tropicalpb Nick Xie added a comment -

          It worked really well on the cluster. I did notice that it had a dependency on Google guava classes. Any way to rid of this dependency? guava dependency mismatch is a pain with spark and hadoop versions.

          Show
          tropicalpb Nick Xie added a comment - It worked really well on the cluster. I did notice that it had a dependency on Google guava classes. Any way to rid of this dependency? guava dependency mismatch is a pain with spark and hadoop versions.
          Hide
          koertkuipers Koert Kuipers added a comment -

          glad to hear it worked well.

          totally agree guava dependency mismatch is a pain. spark-sorted does not
          have a dependency on guava. could it be one of your other dependencies uses
          guava?

          Show
          koertkuipers Koert Kuipers added a comment - glad to hear it worked well. totally agree guava dependency mismatch is a pain. spark-sorted does not have a dependency on guava. could it be one of your other dependencies uses guava?
          Hide
          tropicalpb Nick Xie added a comment -

          It is in your api/java/GroupSorted.scala

          line 8: import com.google.common.collect.

          { Ordering => GuavaOrdering }

          .....
          line 29: private implicit def ordering[K]: Ordering[K] = comparatorToOrdering(GuavaOrdering.natural.asInstanceOf[Comparator[K]])

          Show
          tropicalpb Nick Xie added a comment - It is in your api/java/GroupSorted.scala line 8: import com.google.common.collect. { Ordering => GuavaOrdering } ..... line 29: private implicit def ordering [K] : Ordering [K] = comparatorToOrdering(GuavaOrdering.natural.asInstanceOf[Comparator [K] ])
          Hide
          tropicalpb Nick Xie added a comment -

          For the record, the data file is 25 million rows and about 3000 unique keys, so that's about 8000 records on average to be sorted per key on the timestamp.

          Show
          tropicalpb Nick Xie added a comment - For the record, the data file is 25 million rows and about 3000 unique keys, so that's about 8000 records on average to be sorted per key on the timestamp.
          Hide
          koertkuipers Koert Kuipers added a comment -

          oh, thats no good i am using guava without even declaring a dependency...

          let me see if there is an alternative to using guava for this

          Show
          koertkuipers Koert Kuipers added a comment - oh, thats no good i am using guava without even declaring a dependency... let me see if there is an alternative to using guava for this
          Hide
          koertkuipers Koert Kuipers added a comment -

          Great. We have stress tested it with millions of records per key (and only
          1.5g of ram per executor) to make sure there was no hidden assumption that
          data needs to fit in memory somehow, and it worked fine. Seems the
          shuffle-based sort keeps it promise...

          Show
          koertkuipers Koert Kuipers added a comment - Great. We have stress tested it with millions of records per key (and only 1.5g of ram per executor) to make sure there was no hidden assumption that data needs to fit in memory somehow, and it worked fine. Seems the shuffle-based sort keeps it promise...
          Hide
          koertkuipers Koert Kuipers added a comment -

          i believe its straightforward to get rid of guava Ordering, but not so for
          guava Optional, since its part of spark-sorted public java api. it is also
          part of spark's public java api

          On Tue, Aug 25, 2015 at 1:26 PM, Koert Kuipers <koertkuipers@gmail.com>

          Show
          koertkuipers Koert Kuipers added a comment - i believe its straightforward to get rid of guava Ordering, but not so for guava Optional, since its part of spark-sorted public java api. it is also part of spark's public java api On Tue, Aug 25, 2015 at 1:26 PM, Koert Kuipers <koertkuipers@gmail.com>
          Hide
          tropicalpb Nick Xie added a comment -

          Moving out guava ordering is at least one less path to dependency hell. Also, does comparator really needs to be optional? It isn't much of an inconvenience to provide a comparator, until eventually Spark does away with its public API dependency in 2.x, hopefully anyway.

          Show
          tropicalpb Nick Xie added a comment - Moving out guava ordering is at least one less path to dependency hell. Also, does comparator really needs to be optional? It isn't much of an inconvenience to provide a comparator, until eventually Spark does away with its public API dependency in 2.x, hopefully anyway.
          Hide
          swethakasireddy swetha k added a comment -

          koert kuipers

          How do I include the dependency for this? Is this available as a jar somewhere?

          Show
          swethakasireddy swetha k added a comment - koert kuipers How do I include the dependency for this? Is this available as a jar somewhere?
          Hide
          koert koert kuipers added a comment -
          Show
          koert koert kuipers added a comment - yes, its available on maven central: http://mvnrepository.com/artifact/com.tresata/spark-sorted_2.10/0.3.1
          Hide
          tropicalpb Nick Xie added a comment -

          Thanks for the quick changes to rid of Ordering dependency. Since I am only using it in a specific way, through a few small hacks I was able to rid of the the entire runtime dependency on Guava.

          Show
          tropicalpb Nick Xie added a comment - Thanks for the quick changes to rid of Ordering dependency. Since I am only using it in a specific way, through a few small hacks I was able to rid of the the entire runtime dependency on Guava.
          Hide
          koertkuipers Koert Kuipers added a comment -

          Did you build a version that does not use Optional for java api?

          [
          https://issues.apache.org/jira/browse/SPARK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726658#comment-14726658
          ]

          Nick Xie commented on SPARK-3655:
          ---------------------------------

          Thanks for the quick changes to rid of Ordering dependency. Since I am
          only using it in a specific way, through a few small hacks I was able to
          rid of the the entire runtime dependency on Guava.

          soon? There are some use cases where getting a sorted iterator of values
          per key is helpful.


          This message was sent by Atlassian JIRA
          (v6.3.4#6332)

          Show
          koertkuipers Koert Kuipers added a comment - Did you build a version that does not use Optional for java api? [ https://issues.apache.org/jira/browse/SPARK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726658#comment-14726658 ] Nick Xie commented on SPARK-3655 : --------------------------------- Thanks for the quick changes to rid of Ordering dependency. Since I am only using it in a specific way, through a few small hacks I was able to rid of the the entire runtime dependency on Guava. soon? There are some use cases where getting a sorted iterator of values per key is helpful. – This message was sent by Atlassian JIRA (v6.3.4#6332)
          Hide
          tropicalpb Nick Xie added a comment -

          I did exactly that, since I will always provide a comparator, I also took the liberty of removing a few overloaded constructors. Less is more when it comes to code maintenance.

          Show
          tropicalpb Nick Xie added a comment - I did exactly that, since I will always provide a comparator, I also took the liberty of removing a few overloaded constructors. Less is more when it comes to code maintenance.
          Hide
          swethakasireddy swetha k added a comment -

          koert kuipers

          Does this use a custom partitioner to make sure that all the values pertaining to a key are placed in a particular node?

          Right now my code does something like the following and it seems to cause a lot of shuffling. I need to be able to group by sessionId and then sort by timeStamp in a tuple. What is the appropriate method for that?

          def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, List[(Long, String)])] =

          { val grpdRecs = rdd.groupByKey(); val srtdRecs = grpdRecs.mapValues[(List[(Long, String)])](iter => iter.toList.sortBy(_._1)) srtdRecs }
          Show
          swethakasireddy swetha k added a comment - koert kuipers Does this use a custom partitioner to make sure that all the values pertaining to a key are placed in a particular node? Right now my code does something like the following and it seems to cause a lot of shuffling. I need to be able to group by sessionId and then sort by timeStamp in a tuple. What is the appropriate method for that? def getGrpdAndSrtdRecs(rdd: RDD [(String, (Long, String))] ): RDD[(String, List [(Long, String)] )] = { val grpdRecs = rdd.groupByKey(); val srtdRecs = grpdRecs.mapValues[(List[(Long, String)])](iter => iter.toList.sortBy(_._1)) srtdRecs }
          Hide
          koertkuipers Koert Kuipers added a comment -

          spark-sorted (https://github.com/tresata/spark-sorted) allows you to
          process your data in a similar way to what you did below, but without
          materializing the sorted data as a list in memory. yes it uses a custom
          partitioner and yes it always shuffles the data.

          are you sure the shuffle is your issue?
          i assume your final output is not the sorted list? what do you do with the
          sorted list after the steps shown below?
          if your final output is RDD[(String, List[(Long, String)])] then there is
          no way around materializing the list in memory and then spark-sorted will
          not give you any benefit over what you did below.

          Show
          koertkuipers Koert Kuipers added a comment - spark-sorted ( https://github.com/tresata/spark-sorted ) allows you to process your data in a similar way to what you did below, but without materializing the sorted data as a list in memory. yes it uses a custom partitioner and yes it always shuffles the data. are you sure the shuffle is your issue? i assume your final output is not the sorted list? what do you do with the sorted list after the steps shown below? if your final output is RDD[(String, List [(Long, String)] )] then there is no way around materializing the list in memory and then spark-sorted will not give you any benefit over what you did below.
          Hide
          swethakasireddy swetha k added a comment -

          koert kuipers
          The final output for this RDD is RDD[(String, List[(Long, String)])] . But, I call updateStateByKey on this RDD. Inside updateStateByKey, I process this list and put all the data in a single object which gets merged with the old state for this
          session. After the updateStateByKey, I will return objects for the session that represents the current batch and the merged batch.

          Show
          swethakasireddy swetha k added a comment - koert kuipers The final output for this RDD is RDD[(String, List [(Long, String)] )] . But, I call updateStateByKey on this RDD. Inside updateStateByKey, I process this list and put all the data in a single object which gets merged with the old state for this session. After the updateStateByKey, I will return objects for the session that represents the current batch and the merged batch.
          Hide
          swethakasireddy swetha k added a comment -

          koert kuipers

          If I don't put the list as a materialized view in memory, what is the appropriate way to use Spark-Sorted to just group and sort the batch of Jsons based on the key(sessionId)

          Show
          swethakasireddy swetha k added a comment - koert kuipers If I don't put the list as a materialized view in memory, what is the appropriate way to use Spark-Sorted to just group and sort the batch of Jsons based on the key(sessionId)
          Hide
          koertkuipers Koert Kuipers added a comment -

          say if your input is sessionId|json and you have a way to extract timestamp
          from json for the custom ordering, then you could use spark-sorted to
          transform it into sessionId|rank|json where rank is the sorted rank for the
          json within the sesssionId.

          this would be a typical example of usage of spark-sorted where the list of
          jsons per sessionId is never materialized in memory. this would work even
          if a single sessionId has millions of json objects.

          the output would also be sorted: first by sessionId and then by rank, with
          the guarantee that all json for the same sessionId ends up in the same part
          file.

          Show
          koertkuipers Koert Kuipers added a comment - say if your input is sessionId|json and you have a way to extract timestamp from json for the custom ordering, then you could use spark-sorted to transform it into sessionId|rank|json where rank is the sorted rank for the json within the sesssionId. this would be a typical example of usage of spark-sorted where the list of jsons per sessionId is never materialized in memory. this would work even if a single sessionId has millions of json objects. the output would also be sorted: first by sessionId and then by rank, with the guarantee that all json for the same sessionId ends up in the same part file.

            People

            • Assignee:
              koertkuipers Koert Kuipers
              Reporter:
              koert koert kuipers
            • Votes:
              7 Vote for this issue
              Watchers:
              40 Start watching this issue

              Dates

              • Created:
                Updated:

                Development