Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-50302

TransformWithState secondary index sizes should be proportional to primary indexes

    XMLWordPrintableJSON

Details

    Description

      Currently, the TWS operator handles TTL state variables in the same approximate way:

      1. Upsert the value into the primary index
      2. Upsert the expiration timestamp into the secondary index, where the expiration timestamp is the `batchTimestampMs`

      The issue with this approach is that if the same state variable is updated across two different micro-batches, there exists 1 entry in the primary index, while there exists two entries in the secondary index. Consider the following example for a state variable `foo` with value `v1`, and TTL delay of 500:

      Batch 0, `batchTimestampMs = 100`, `foo` updates to `v1`:

      • Primary index: `[foo -> (v1, 600)]`
      • Secondary index: `[(600, foo) -> EMPTY]`

      Batch 1: `batchTimestampMs = 200`, `foo` updates to `v2`:

      • Primary index: `[foo -> (v2, 700)]`
      • Secondary index: `[(600, foo) -> EMPTY, (700, foo) -> EMPTY]`

      You'll notice that the secondary index now has size 2, even though the primary index only has size 1. When we clean up `(600, foo)`, we actually don't delete it, since we do another lookup in the primary index to determine if the secondary index entry we're dealing with is stale.

      So, to summarize, the write path is always write to the primary and secondary index, and then for deletion, we go through everything in the secondary index, and do a lookup on the primary index to see whether to delete it.

      While this may not seem like a huge issue, things get way worse for `ListState`. Our cleanup logic for list state is as follows:

      1. Grab an iterator for the entire list
      2. Clear the entire list
      3. For each element in the iterator that isn't expired, merge it back into the list

      This means that having an erroneous entry in your secondary index means that you will go through the entire list several times, which will negatively impact performance. We should most definitely make sure that the secondary index has only as many elements as the primary index, which will prevent us from doing unnecessary work during cleanup.

      Solutions to this problem will be proposed in the PR.

      Attachments

        Issue Links

          Activity

            People

              neilramaswamy Neil Ramaswamy
              neilramaswamy Neil Ramaswamy
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: