Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19067

mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • None
    • 2.2.0
    • Structured Streaming
    • None

    Description

      Right now the only way to do stateful operations with with Aggregator or UDAF. However, this does not give users control of emission or expiration of state making it hard to implement things like sessionization. We should add a more general construct (probably similar to DStream.mapWithState) to structured streaming. Here is the design.

      Requirements

      • Users should be able to specify a function that can do the following
      • Access the input row corresponding to a key
      • Access the previous state corresponding to a key
      • Optionally, update or remove the state
      • Output any number of new rows (or none at all)

      Proposed API

      // ------------ New methods on KeyValueGroupedDataset ------------
      class KeyValueGroupedDataset[K, V] {	
      	// Scala friendly
      	def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], State[S]) => U)
              def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], State[S]) => Iterator[U])
      	// Java friendly
             def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
             def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
      }
      
      // ------------------- New Java-friendly function classes ------------------- 
      public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
        R call(K key, Iterator<V> values, state: State<S>) throws Exception;
      }
      public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
        Iterator<R> call(K key, Iterator<V> values, state: GroupState<S>) throws Exception;
      }
      
      // ---------------------- Wrapper class for state data ---------------------- 
      trait GroupState[S] {
      	def exists(): Boolean  	
        	def get(): S 			// throws Exception is state does not exist
      	def getOption(): Option[S]       
      	def update(newState: S): Unit
      	def remove(): Unit		// exists() will be false after this
      }
      

      Key Semantics of the State class

      • The state can be null.
      • If the state.remove() is called, then state.exists() will return false, and getOption will returm None.
      • After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...).
      • None of the operations are thread-safe. This is to avoid memory barriers.

      Usage

      val stateFunc = (word: String, words: Iterator[String, runningCount: GroupState[Long]) => {
          val newCount = words.size + runningCount.getOption.getOrElse(0L)
          runningCount.update(newCount)
         (word, newCount)
      }
      
      dataset					                        // type is Dataset[String]
        .groupByKey[String](w => w)        	                // generates KeyValueGroupedDataset[String, String]
        .mapGroupsWithState[Long, (String, Long)](stateFunc)	// returns Dataset[(String, Long)]
      

      Future Directions

      • Timeout based state expiration (that has not received data for a while) - Done
      • General expression based expiration - TODO. Any real usecases that cannot be done with timeouts?

      Attachments

        Activity

          People

            tdas Tathagata Das
            marmbrus Michael Armbrust
            Votes:
            0 Vote for this issue
            Watchers:
            14 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: