Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.1.0
    • 2.2.0
    • Structured Streaming
    • None

    Description

      When a key does not get any new data in mapGroupsWithState, the mapping function is never called on it. So we need a timeout feature that calls the function again in such cases, so that the user can decide whether to continue waiting or clean up (remove state, save stuff externally, etc.).

      Timeouts can be either based on processing time or event time. This JIRA is for processing time, but defines the high level API design for both. The usage would look like this

      def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = {
        ...
        state.setTimeoutDuration(10000)
        ...
      }
      
      dataset					// type is Dataset[T]
        .groupByKey[K](keyingFunc)   // generates KeyValueGroupedDataset[K, T]
        .mapGroupsWithState[S, U](
           func = stateFunction, 
           timeout = KeyedStateTimeout.withProcessingTime)	// returns Dataset[U]
      

      Note the following design aspects.

      • The timeout type is provided as a param in mapGroupsWithState as a parameter global to all the keys. This is so that the planner knows this at planning time, and accordingly optimize the execution based on whether to saves extra info in state or not (e.g. timeout durations or timestamps).
      • The exact timeout duration is provided inside the function call so that it can be customized on a per key basis.
      • When the timeout occurs for a key, the function is called with no values, and KeyedState.isTimingOut() set to true.
      • The timeout is reset for key every time the function is called on the key, that is, when the key has new data, or the key has timed out. So the user has to set the timeout duration everytime the function is called, otherwise there will not be any timeout set.

      Attachments

        Activity

          People

            tdas Tathagata Das
            tdas Tathagata Das
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: