Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.1.2
-
None
Description
Structured Streaming supports arbitrary stateful processing using mapGroupsWithState and flatMapGroupWithState operators. The state is created by processing the data that comes in with every batch. This API improvement will allow users to specify an initial state which is applied at the time of executing the first batch.
Proposed new APIs (Scala)
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: Dataset[(K, S)])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: Dataset[(K, S)])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Proposed new APIs (Java)
def mapGroupsWithState[S, U](
func: MapGroupsWithStateFunction[K, V, S, U],
stateEncoder: Encoder[S],
outputEncoder: Encoder[U],
timeoutConf: GroupStateTimeout,
initialState: Dataset[(K, S)]): Dataset[U]
def flatMapGroupsWithState[S, U](
func: FlatMapGroupsWithStateFunction[K, V, S, U],
outputMode: OutputMode,
stateEncoder: Encoder[S],
outputEncoder: Encoder[U],
timeoutConf: GroupStateTimeout,
initialState: Dataset[(K, S)]): Dataset[U]
Example Usage
val initialState: Dataset[(String, RunningCount)] = Seq(
("a", new RunningCount(1)),
("b", new RunningCount(1))
).toDS()
val inputData = MemoryStream[String]
val result =
inputData.toDS()
.groupByKey(x => x)
.mapGroupsWithState(initialState, timeoutConf)(stateFunc)