Kafka
  1. Kafka
  2. KAFKA-1211

Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: 0.9.0
    • Component/s: None
    • Labels:
      None

      Description

      Today during leader failover we will have a weakness period when the followers truncate their data before fetching from the new leader, i.e., number of in-sync replicas is just 1. If during this time the leader has also failed then produce requests with ack >1 that have get responded will still be lost. To avoid this scenario we would prefer to hold the produce request in purgatory until replica's HW has larger than the offset instead of just their end-of-log offsets.

        Issue Links

          Activity

          Hide
          Jun Rao added a comment - - edited

          Yes, this is a potential problem. Waiting for HW to be propagated to the followers will introduce another round of network delay on every message to be committed though. The following is another potential solution that avoid this overhead.

          Note that the follower in ISR always has all committed messages. On follower startup, if we can figure out accurately which messages are committed and which ones are not, we won't unnecessarily truncate committed messages. Not that when a follower takes over as the new leader, it always tries to commit all existing messages that are obtained from the previous generation of the leader. After that, it will start committing new messages received in its own generation. If we can track the leader generation of each message, we can do the truncation accurately. To do that, in each replica, we maintain a leader generation vector that contains the leader generation id and its starting offset (the offset of the first message written by the leader in that generation) and we persist that vector in a LeaderGeneration file locally.

          If a replica becomes a leader, before it accepts any new message, it first appends the current leader generation id and its current log end offset to the LeaderGeneration file. If a replica becomes a follower, it first gets the leader generation vector from the leader and then determines the offset where its highest leader generation ends in the leader. It will then truncate its log up to that offset (if there are messages beyond that offset). After that, the follower will store the leader generation vector obtained from the leader in its local LeaderGeneration file and starts fetching messages from the leader from its log end offset.

          Let's consider a couple of examples.

          Example 1. Suppose that we have two replicas A and B and A is the leader. At some point, we have the following messages in A and B.

          offset   A    B
          1       m1  m1
          2       m2
          

          Let's assume that message m1 is committed, but m2 is not. At this point, A dies and B takes over as the leader. Let's say B then commits 2 more messages m3 and m4.

          offset    A    B
          0        m1  m1
          1        m2  m3
          2            m4
          

          When replica A comes back, it's important for A to get rid of m2 from offset 1 since m2 is never committed. In this case, the leader generation vector in A and B will look like the following.

                           A                                        B
          leaderGenId   startOffset                leaderGenId   startOffset
          1                     0                    1           0
                                                     2           1
          

          By comparing A's leader generation vector with that from the current leader B, A knows that its latest messages are produced by the leader in generation 1, which ends at offset 0. So any message in its local log after offset 0 are not committed and can be truncated. Any message at or before offset 0 is guaranteed to be committed. So, replica A will remove m2 from offset 1 and get m3 and m4 from B afterward. At that point, A's log is consistent with that of B. All committed messages are preserved and all uncommitted messages are removed.

          Example 2. Suppose that we have two replicas A and B and A is the leader. At some point, we have the following messages in A and B.

          offset    A    B
          1          m1  m1
          2          m2  m2
          

          Let's assume that both message m1 and m2 are committed. At this point, A dies and B takes over as the leader. Let's say B then commits 2 more messages m3 and m4.

          offset    A    B
          0          m1  m1
          1          m2  m2
          2              m3
          3              m4
          

          In this case, the leader generation vector in A and B will look like the following.

                           A                                       B
          leaderGenId   startOffset                leaderGenId   startOffset
          1                     0                    1                0
                                                     2                2
          

          When A comes back, by comparing its leader generation vector with that from the current leader B, A knows that its latest messages are produced by the leader in generation 1, which ends at offset 1. So, it will keep m2 at offset 1 and get m3 and m4 from B. Again, this will make A's log consistent with B.

          This approach doesn't pay the extra network roundtrip to commit a message. The becoming follower process will be a bit slower since It now needs to issue a new request to get the leader vector before it can start fetching from the leader. However, since leader changes are rare, this probably provides a better tradeoff. There are also other details that need to be worked out.

          1. We need to deal with the case that the leader generation vector may have a gap, i.e., no messages are produced in a leader generation.
          2. We probably need to remove old leader generations from the LeaderGeneration file so that it won't grow forever. Perhaps we need to configure a max # of generations to keep.

          Since this problem is relatively rare and the fix is a bit involved, we can probably put it off until 0.9 or beyond.

          Show
          Jun Rao added a comment - - edited Yes, this is a potential problem. Waiting for HW to be propagated to the followers will introduce another round of network delay on every message to be committed though. The following is another potential solution that avoid this overhead. Note that the follower in ISR always has all committed messages. On follower startup, if we can figure out accurately which messages are committed and which ones are not, we won't unnecessarily truncate committed messages. Not that when a follower takes over as the new leader, it always tries to commit all existing messages that are obtained from the previous generation of the leader. After that, it will start committing new messages received in its own generation. If we can track the leader generation of each message, we can do the truncation accurately. To do that, in each replica, we maintain a leader generation vector that contains the leader generation id and its starting offset (the offset of the first message written by the leader in that generation) and we persist that vector in a LeaderGeneration file locally. If a replica becomes a leader, before it accepts any new message, it first appends the current leader generation id and its current log end offset to the LeaderGeneration file. If a replica becomes a follower, it first gets the leader generation vector from the leader and then determines the offset where its highest leader generation ends in the leader. It will then truncate its log up to that offset (if there are messages beyond that offset). After that, the follower will store the leader generation vector obtained from the leader in its local LeaderGeneration file and starts fetching messages from the leader from its log end offset. Let's consider a couple of examples. Example 1. Suppose that we have two replicas A and B and A is the leader. At some point, we have the following messages in A and B. offset A B 1 m1 m1 2 m2 Let's assume that message m1 is committed, but m2 is not. At this point, A dies and B takes over as the leader. Let's say B then commits 2 more messages m3 and m4. offset A B 0 m1 m1 1 m2 m3 2 m4 When replica A comes back, it's important for A to get rid of m2 from offset 1 since m2 is never committed. In this case, the leader generation vector in A and B will look like the following. A B leaderGenId startOffset leaderGenId startOffset 1 0 1 0 2 1 By comparing A's leader generation vector with that from the current leader B, A knows that its latest messages are produced by the leader in generation 1, which ends at offset 0. So any message in its local log after offset 0 are not committed and can be truncated. Any message at or before offset 0 is guaranteed to be committed. So, replica A will remove m2 from offset 1 and get m3 and m4 from B afterward. At that point, A's log is consistent with that of B. All committed messages are preserved and all uncommitted messages are removed. Example 2. Suppose that we have two replicas A and B and A is the leader. At some point, we have the following messages in A and B. offset A B 1 m1 m1 2 m2 m2 Let's assume that both message m1 and m2 are committed. At this point, A dies and B takes over as the leader. Let's say B then commits 2 more messages m3 and m4. offset A B 0 m1 m1 1 m2 m2 2 m3 3 m4 In this case, the leader generation vector in A and B will look like the following. A B leaderGenId startOffset leaderGenId startOffset 1 0 1 0 2 2 When A comes back, by comparing its leader generation vector with that from the current leader B, A knows that its latest messages are produced by the leader in generation 1, which ends at offset 1. So, it will keep m2 at offset 1 and get m3 and m4 from B. Again, this will make A's log consistent with B. This approach doesn't pay the extra network roundtrip to commit a message. The becoming follower process will be a bit slower since It now needs to issue a new request to get the leader vector before it can start fetching from the leader. However, since leader changes are rare, this probably provides a better tradeoff. There are also other details that need to be worked out. 1. We need to deal with the case that the leader generation vector may have a gap, i.e., no messages are produced in a leader generation. 2. We probably need to remove old leader generations from the LeaderGeneration file so that it won't grow forever. Perhaps we need to configure a max # of generations to keep. Since this problem is relatively rare and the fix is a bit involved, we can probably put it off until 0.9 or beyond.
          Hide
          Guozhang Wang added a comment -

          Jun, I think in your review of KAFKA-1430's patch, you are already suggesting to wait for leader HW to be larger than the produce offset instead of just log end offset for ack=-1.

          So as for ack > 1, but not = to num.replicas, since data loss may happen anyways because of the leader election logic may choose a follower which does not have all the committed data, this issue would just potentially increase the data loss by a bit under such scenarios. For its complexity and the benefit maybe it is not an optimization worth doing?

          Show
          Guozhang Wang added a comment - Jun, I think in your review of KAFKA-1430 's patch, you are already suggesting to wait for leader HW to be larger than the produce offset instead of just log end offset for ack=-1. So as for ack > 1, but not = to num.replicas, since data loss may happen anyways because of the leader election logic may choose a follower which does not have all the committed data, this issue would just potentially increase the data loss by a bit under such scenarios. For its complexity and the benefit maybe it is not an optimization worth doing?
          Hide
          Jun Rao added a comment -

          The issue is that this problem not only affects ack>1, but only affects ack=-1. Suppose that you have 3 replicas A, B, and C and A is the leader initially. If A fails and B takes over as the new leader, C will first truncate its log, which could include committed data. Now, if immediately after the truncation, B fails, C has to be the new leader. Now, we may have lost previously committed messages, even though we had only 2 failures.

          Show
          Jun Rao added a comment - The issue is that this problem not only affects ack>1, but only affects ack=-1. Suppose that you have 3 replicas A, B, and C and A is the leader initially. If A fails and B takes over as the new leader, C will first truncate its log, which could include committed data. Now, if immediately after the truncation, B fails, C has to be the new leader. Now, we may have lost previously committed messages, even though we had only 2 failures.
          Hide
          Guozhang Wang added a comment -

          What I meant is that the ack=-1 should be already handled in KAFKA-1430, as we are not wait for leader HW. Right?

          Show
          Guozhang Wang added a comment - What I meant is that the ack=-1 should be already handled in KAFKA-1430 , as we are not wait for leader HW. Right?
          Hide
          Jun Rao added a comment -

          Not quite. The case that I described above could happen with ack = -1 too.

          Show
          Jun Rao added a comment - Not quite. The case that I described above could happen with ack = -1 too.

            People

            • Assignee:
              Guozhang Wang
              Reporter:
              Guozhang Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:

                Development