Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30477

More KeyValueGroupedDataset methods should be composable

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.4.4
    • None
    • Spark Core

    Description

      Right now many `KeyValueGroupedDataset` do not return a `KeyValueGroupedDataset`. In some cases this means we have to do multiple `groupByKey`s into order to express certain patterns.

      Setup

      def f: T => K
      def g: U => K
      def h: V => K
      val ds1: Dataset[T] = ???
      val ds2: Dataset[U] = ???
      val ds3: Dataset[V] = ??? 
      val kvDs1: KeyValueGroupedDataset[K, T] = ds1.groupByKey(f)
      val kvDs2: KeyValueGroupedDataset[K, U] = ds2.groupByKey(g)
      val kvDs3: KeyValueGroupedDataset[K, V] = ds3.groupByKey(h)
      

      Example one: Combining multiple CoGrouped Dataset. 

      // Current
      kvDs1
        .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
        .groupByKey((x: X) => ???: K)
        .coGroup(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
      
      // Wanted
      trait KeyValueGroupedDataset[K, T] {
        def coGroupKeyValueGroupedDataset[U, X](r: KeyValueGroupedDataset)(K, Iterator[T], Iterator[U] => X): KeyValueGroupedDataset[K, X]
      }
      
      kvDs1
        .coGroupKeyValueGroupedDataset(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
        .coGroupKeyValueGroupedDataset(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)
      

      Example two: Combining a reduceGroups with a coGroup 

      // current
      val newDs1: Dataset[X] = kvDs1
        .reduceGroups((l: T, r: T) => ???: T))
        .groupByKey {case (k, _) => k }
        .mapValues { case (_, v) => v }
        .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
      
      // wanted
      trait KeyValueGroupedDataset[K, T] {
        def reduceGroupsKeyValueGroupedDataset(v: (V, V) => V): KeyValueGroupedDataset[K, V]
      }
      
      val newDs2: Dataset[X] = kvDs1
        .reduceGroupsKeyValueGroupedDataset((l: T, r: T) => ???: T))
        .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
      

      In both cases not only are the ergonomics better, Spark will better able to optimize the code.

      For almost every method of `KeyValueGroupedDataset` we should have a matching method that returns a `KeyValueGroupedDataset`.

      We can also add a `.toDs` method which converts a `KeyValueGroupedDataset[K, V]` to a `Dataset[(K, V)]`

      Attachments

        Activity

          People

            Unassigned Unassigned
            pjoneswork Paul Jones
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: