Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13008

Stream will stop processing data for a long time while waiting for the partition lag

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 3.0.0
    • 3.0.0
    • streams
    • None

    Description

      In KIP-695, we improved the task idling mechanism by checking partition lag. It's a good improvement for timestamp sync. But I found it will cause the stream stop processing the data for a long time while waiting for the partition metadata.

       

      I've been investigating this case for a while, and figuring out the issue will happen in below situation (or similar situation):

      1. start 2 streams (each with 1 thread) to consume from a topicA (with 3 partitions: A-0, A-1, A-2)
      2. After 2 streams started, the partitions assignment are: (I skipped some other processing related partitions for simplicity)
        stream1-thread1: A-0, A-1
        stream2-thread1: A-2
      3. start processing some data, assume now, the position and high watermark is:
        A-0: offset: 2, highWM: 2
        A-1: offset: 2, highWM: 2
        A-2: offset: 2, highWM: 2
      4. Now, stream3 joined, so trigger rebalance with this assignment:
        stream1-thread1: A-0
        stream2-thread1: A-2
        stream3-thread1: A-1
      5. Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment:
        stream1-thread1: A-0, A-1
        stream2-thread1: A-2
        (note: after initialization, the  position of A-1 will be: position: null, highWM: null)
      6. Now, note that, the partition A-1 used to get assigned to stream1-thread1, and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 record per 30 mins), and partition A-0 has fast input (ex: 10K records / sec). So, now, the stream1-thread1 won't process any data until we got input from partition A-1 (even if partition A-0 is buffered a lot, and we have `max.task.idle.ms` set to 0).

       

      The reason why the stream1-thread1 won't process any data is because we can't get the lag of partition A-1. And why we can't get the lag? It's because

      1. In KIP-695, we use consumer's cache to get the partition lag, to avoid remote call
      2. The lag for a partition will be cleared if the assignment in this round doesn't have this partition. check here. So, in the above example, the metadata cache for partition A-1 will be cleared in step 4, and re-initialized (to null) in step 5
      3. In KIP-227, we introduced a fetch session to have incremental fetch request/response. That is, if the session existed, the client(consumer) will get the update only when the fetched partition have update (ex: new data). So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 mins), it won't have update until next 30 mins, or wait for the fetch session become inactive for (default) 2 mins to be evicted. Either case, the metadata won't be updated for a while.

       

      In KIP-695, if we don't get the partition lag, we can't determine the partition data status to do timestamp sync, so we'll keep waiting and not processing any data. That's why this issue will happen.

       

      Proposed solution:

      1. If we don't get the current lag for a partition, or the current lag > 0, we start to wait for max.task.idle.ms, and reset the deadline when we get the partition lag, like what we did in previous KIP-353
      2. Introduce a waiting time config when no partition lag, or partition lag keeps > 0 (need KIP)

      vvcephei guozhang , any suggestions?

       

      cc ableegoldman  mjsax , this is the root cause that in https://github.com/apache/kafka/pull/10736, we discussed and thought there's a data lose situation. FYI.

      Attachments

        Issue Links

          Activity

            People

              guozhang Guozhang Wang
              showuon Luke Chen
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: