Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.4.4
-
None
Description
Right now many `KeyValueGroupedDataset` do not return a `KeyValueGroupedDataset`. In some cases this means we have to do multiple `groupByKey`s into order to express certain patterns.
Setup
def f: T => K def g: U => K def h: V => K val ds1: Dataset[T] = ??? val ds2: Dataset[U] = ??? val ds3: Dataset[V] = ??? val kvDs1: KeyValueGroupedDataset[K, T] = ds1.groupByKey(f) val kvDs2: KeyValueGroupedDataset[K, U] = ds2.groupByKey(g) val kvDs3: KeyValueGroupedDataset[K, V] = ds3.groupByKey(h)
Example one: Combining multiple CoGrouped Dataset.
// Current kvDs1 .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X) .groupByKey((x: X) => ???: K) .coGroup(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z) // Wanted trait KeyValueGroupedDataset[K, T] { def coGroupKeyValueGroupedDataset[U, X](r: KeyValueGroupedDataset)(K, Iterator[T], Iterator[U] => X): KeyValueGroupedDataset[K, X] } kvDs1 .coGroupKeyValueGroupedDataset(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X) .coGroupKeyValueGroupedDataset(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
Example two: Combining a reduceGroups with a coGroup
// current val newDs1: Dataset[X] = kvDs1 .reduceGroups((l: T, r: T) => ???: T)) .groupByKey {case (k, _) => k } .mapValues { case (_, v) => v } .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X) // wanted trait KeyValueGroupedDataset[K, T] { def reduceGroupsKeyValueGroupedDataset(v: (V, V) => V): KeyValueGroupedDataset[K, V] } val newDs2: Dataset[X] = kvDs1 .reduceGroupsKeyValueGroupedDataset((l: T, r: T) => ???: T)) .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
In both cases not only are the ergonomics better, Spark will better able to optimize the code.
For almost every method of `KeyValueGroupedDataset` we should have a matching method that returns a `KeyValueGroupedDataset`.
We can also add a `.toDs` method which converts a `KeyValueGroupedDataset[K, V]` to a `Dataset[(K, V)]`