Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13721

BroadcastState should support StateTTL

    XMLWordPrintableJSON

Details

    Description

      Hi everyone,

      Sorry if I'm doing anything wrong, this is my first issue in Apache Jira.

      I have a use case which requires 2 data streams to be cross joined. To be exact, one stream is location updates from clients and the other stream is event data with location information. I'm trying to get events that happen within a certain radius of client location(s).

      Since the streams can not be related to each other by using a common key for partitioning, I have to broadcast client updates to all tasks and evaluate the radius check for each event.

      The requirement here is, if we don't receive any location updates from a client for a certain amount of time, we should consider the client is "gone" (similar to the rationale stated in FLINK-3089 description)

      I have attached the sample application classes I used to debug BroadcastState and StateTTL together.

      The output (see flink_broadcast_state_ttl_debug.log) shows that client "c0" got its first event at 17:08:07.67 (expected to be evicted sometime after 17:08:37.xxx) but doesn't get evicted.

      For the operator part (which is the result of BroadcastConnectedStream<IntHolder, StringHolder>.process) - since context in onTimer method doesn't let user to change contents of the broadcast state, only way to deal with stale client data is as follows:

      • In processElement method, calculate result for client data which is newer than the ttl
      • In processBroadcastElement method, remove client data older than a certain amount of time from the broadcast state

      If broadcast side of the connected streams doesn't get data for longer than the desired time-to-live amount of time, BroadcastState will hold stale data and processElement method would have to filter those client data each time. This is the method I am using in production right now.

      I am not aware if BroadcastState already supports StateTTL and I'm using it incorrectly. I am also not aware of any decision or limitation that makes it not possible to implement StateTTL for BroadcastState, I will be pleased if someone explains if there are any.

      Thanks and regards.

      Attachments

        1. StringHolder.java
          0.8 kB
          Kerem Ulutaş
        2. IntHolder.java
          0.5 kB
          Kerem Ulutaş
        3. flink_broadcast_state_ttl_debug.log
          86 kB
          Kerem Ulutaş
        4. DebugBroadcastStateTTL.java
          6 kB
          Kerem Ulutaş

        Activity

          People

            Unassigned Unassigned
            keremulutas Kerem Ulutaş
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: