Kafka
  1. Kafka
  2. KAFKA-763

Add an option to replica from the largest offset during unclean leader election

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:

      Description

      If there is an unclean leader election, a follower may have an offset out of the range of the leader. Currently, the follower will delete all its data and refetch from the smallest offset of the leader. It would be useful to add an option to let the follower refetch from the largest offset of the leader since refetching from the smallest offset may take some time.

      1. kafka-763_v1.patch
        8 kB
        Jun Rao
      2. kafka-763-new-v1.patch
        6 kB
        Swapnil Ghike
      3. kafka-763-new-v2.patch
        10 kB
        Swapnil Ghike
      4. kafka-763-new-v3.patch
        5 kB
        Swapnil Ghike
      5. kafka-763-new-v4.patch
        2 kB
        Swapnil Ghike

        Activity

        Jun Rao created issue -
        Hide
        Jun Rao added a comment -

        Attach a patch.

        Show
        Jun Rao added a comment - Attach a patch.
        Jun Rao made changes -
        Field Original Value New Value
        Attachment kafka-763_v1.patch [ 12569902 ]
        Jun Rao made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Guozhang Wang added a comment -

        Just a quick question: what is an "unclean leader election"?

        Show
        Guozhang Wang added a comment - Just a quick question: what is an "unclean leader election"?
        Hide
        Neha Narkhede added a comment -

        Why is this option useful ? The reason we set the follower to fetch from the beginning is because we want the follower to be an exact replica of the leader. If it fetches from the tail, it may not necessarily be identical

        Show
        Neha Narkhede added a comment - Why is this option useful ? The reason we set the follower to fetch from the beginning is because we want the follower to be an exact replica of the leader. If it fetches from the tail, it may not necessarily be identical
        Neha Narkhede made changes -
        Labels p2
        Hide
        Jun Rao added a comment -

        Unclean leader election happens when we have to select a replica not in ISR as the new leader.

        Thinking a bit more about this. We can patch this better. The follower can issue a getOffset request to the leader to get the offset of the first message in every segment. The follower can then pick the largest offset that is in the range of its local log. That offset will be used to truncate the follower's log and for refetching from the leader. This way, in the common case, the follower only has to refetch 1 segment worth of data during unclean leader election.

        Show
        Jun Rao added a comment - Unclean leader election happens when we have to select a replica not in ISR as the new leader. Thinking a bit more about this. We can patch this better. The follower can issue a getOffset request to the leader to get the offset of the first message in every segment. The follower can then pick the largest offset that is in the range of its local log. That offset will be used to truncate the follower's log and for refetching from the leader. This way, in the common case, the follower only has to refetch 1 segment worth of data during unclean leader election.
        Hide
        Neha Narkhede added a comment -

        Not sure if I understood that. But what needs to happen is the new follower should fetch the earliest and latest message from the new leader. If its latest offset < leader's latest offset after truncating from the last checkpointed highwatermark, it can just start fetching from the leader. If its latest offset > leader's latest offset, then it should truncate to leader's latest offset and begin fetching from there. But there is a problem if the leader's latest offset is already garbage collected from the follower since that will truncate all of the followers data. So we might also have to check follower's earliest offset == leader's earliest offset.

        But basically, it seems very tricky to do this correctly and I'm wondering what is the data backing up that this fix is required ? Is there one particular case that we are trying to solve ?

        Show
        Neha Narkhede added a comment - Not sure if I understood that. But what needs to happen is the new follower should fetch the earliest and latest message from the new leader. If its latest offset < leader's latest offset after truncating from the last checkpointed highwatermark, it can just start fetching from the leader. If its latest offset > leader's latest offset, then it should truncate to leader's latest offset and begin fetching from there. But there is a problem if the leader's latest offset is already garbage collected from the follower since that will truncate all of the followers data. So we might also have to check follower's earliest offset == leader's earliest offset. But basically, it seems very tricky to do this correctly and I'm wondering what is the data backing up that this fix is required ? Is there one particular case that we are trying to solve ?
        Hide
        Guozhang Wang added a comment - - edited

        Can these cover all the cases (the bars represents offset range) ?

        1. Follower can just remove all its data and start fetching from earliest offset from leader.

        new leader .................|------------------|

        follower ....|------|

        2. Follower truncate its log head according to leader's earliest offset (is this necessary?), and start fetching from the point that is closest to its latest offset.

        new leader .................|------------------|

        follower ..................|------|

        3. Same to 2.

        new leader .................|------------------|

        follower ..........................|------|

        4. Follower truncate its log tail, and start fetching from the follower's latest offset.

        new leader .................|------------------|

        follower .....................................|------|

        5. Follower remove all its data, and start fetching from the follower's earliest offset.

        new leader .................|------------------|

        follower ...............................................|------|

        In order to support all these cases, the follower needs to know 1) earliest and latest offset of the leader, 2) start offset of each segment in order to find the closest offset to its earliest/latest offset.

        Show
        Guozhang Wang added a comment - - edited Can these cover all the cases (the bars represents offset range) ? 1. Follower can just remove all its data and start fetching from earliest offset from leader. new leader .................|------------------| follower ....|------| 2. Follower truncate its log head according to leader's earliest offset (is this necessary?), and start fetching from the point that is closest to its latest offset. new leader .................|------------------| follower ..................|------| 3. Same to 2. new leader .................|------------------| follower ..........................|------| 4. Follower truncate its log tail, and start fetching from the follower's latest offset. new leader .................|------------------| follower .....................................|------| 5. Follower remove all its data, and start fetching from the follower's earliest offset. new leader .................|------------------| follower ...............................................|------| In order to support all these cases, the follower needs to know 1) earliest and latest offset of the leader, 2) start offset of each segment in order to find the closest offset to its earliest/latest offset.
        Hide
        Neha Narkhede added a comment -

        Yes, al though I'm questioning the need to cover these complex cases since if we don't do it correctly there is a much bigger impact and the performance gain will not be worth it.

        Show
        Neha Narkhede added a comment - Yes, al though I'm questioning the need to cover these complex cases since if we don't do it correctly there is a much bigger impact and the performance gain will not be worth it.
        Neha Narkhede made changes -
        Assignee Jun Rao [ junrao ] Sriram Subramanian [ sriramsub ]
        Neha Narkhede made changes -
        Assignee Sriram Subramanian [ sriramsub ] Swapnil Ghike [ swapnilghike ]
        Hide
        Jun Rao added a comment -

        The intention of this jira is just to avoid fetching too much data than necessary, where the follower hits OffsetOutOfRangeException. There are 2 common cases when this can happen.

        1. There is an unclean leader election and the follower's offset is larger than leader's last offset. In this case, the follower should truncate its log to the leader's last offset and fetch from that offset.

        2. A follower has been down for a long time and when it starts up, its offset is smaller than the smallest offset in the leader (because old logs have been deleted). In this case, the follower should truncate its log to the leader's first offset and fetch from that offset.

        So, when a follower receives an OffsetOutOfRangeException, it probably should do the following:
        1. get the last offset LO from leader
        2. if LO is within the offset range of the follower, follower truncates its log to LO and fetch from LO.
        3. otherwise, follower gets the first offset FO from leader, deletes all data in its log and starts with an empty log with offset FO, fetches from FO.

        There a separate issue that when there is an unclean leader election, after follower recovers, the replicas may not be identical for a chunk of offsets. It's possible, but not easy to fix this issue. We can follow up on this one post 0.8.

        Show
        Jun Rao added a comment - The intention of this jira is just to avoid fetching too much data than necessary, where the follower hits OffsetOutOfRangeException. There are 2 common cases when this can happen. 1. There is an unclean leader election and the follower's offset is larger than leader's last offset. In this case, the follower should truncate its log to the leader's last offset and fetch from that offset. 2. A follower has been down for a long time and when it starts up, its offset is smaller than the smallest offset in the leader (because old logs have been deleted). In this case, the follower should truncate its log to the leader's first offset and fetch from that offset. So, when a follower receives an OffsetOutOfRangeException, it probably should do the following: 1. get the last offset LO from leader 2. if LO is within the offset range of the follower, follower truncates its log to LO and fetch from LO. 3. otherwise, follower gets the first offset FO from leader, deletes all data in its log and starts with an empty log with offset FO, fetches from FO. There a separate issue that when there is an unclean leader election, after follower recovers, the replicas may not be identical for a chunk of offsets. It's possible, but not easy to fix this issue. We can follow up on this one post 0.8.
        Hide
        Guozhang Wang added a comment -

        About the "the replicas may not be identical for a chunk of offsets" issue, if we do not want to have any non-resolvable divergence, the following may fix it:

        When an non-ISR replica is elected as the leader (assuming itself could know that from ZK, but I'm not sure if it is true), the new leader records its current earliest/latest offset, whose messages are still in sync with the old leader. When an follower wakes up and request LO and FO, the leader returns the recorded LO and FO instead of its current earliest/latest offset.

        But this could result in lots of fetching also. If we choose availability over consistency in this case, we can just live with it.

        Show
        Guozhang Wang added a comment - About the "the replicas may not be identical for a chunk of offsets" issue, if we do not want to have any non-resolvable divergence, the following may fix it: When an non-ISR replica is elected as the leader (assuming itself could know that from ZK, but I'm not sure if it is true), the new leader records its current earliest/latest offset, whose messages are still in sync with the old leader. When an follower wakes up and request LO and FO, the leader returns the recorded LO and FO instead of its current earliest/latest offset. But this could result in lots of fetching also. If we choose availability over consistency in this case, we can just live with it.
        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] Open [ 1 ]
        Swapnil Ghike made changes -
        Labels p2 kafka-0.8 p2
        Hide
        Swapnil Ghike added a comment -

        Copy pasting the comments from patch new-v1:

        1. The leader's log could be partially overlapping with the follower's log. The only way to get an OffsetOutOfRangeException in such a situation is when the follower's end offset is ahead of the leader's end offset. This is possible if there is unclean leader election:
        A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up and before it has completely caught up with the leader's logs, the ISR goes down. The follower is now uncleanly elected as the new leader, and it appends messages. The old leader comes back up, becomes a follower and it may find that the current leader's end offset falls between its own start offset and its own end offset.

        In such a case, truncate the follower's log to the current leader's end offset and continue fetching.

        There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.

        2. Otherwise, the leader's log could be completely non-overlapping with the follower's log:
        i. The follower could have been down for a long time and when it starts up, its end offset could be smaller than or equal to
        the leader's start offset because the leader has deleted old logs (log.logEndOffset <= leaderStartOffset). OR
        ii. Unclean leader election: A follower could be down for a long time. When it starts up, the ISR goes down before the follower
        has the opportunity to even start catching up with the leader's logs. The follower is now uncleanly elected as the new leader.
        The old leader comes back up, becomes a follower and it may find that the current leader's end offset is smaller than or
        equal to its own start offset (log.logStartOffset >= leaderEndOffset).

        In both these cases, roll out a new log at the follower with the start offset equal to the current leader's start offset
        and continue fetching.

        Other changes:
        1. Fixed the error message for autoOffsetReset in ConsumerConfig.
        2. Added a method logStartOffset in Log.

        Show
        Swapnil Ghike added a comment - Copy pasting the comments from patch new-v1: 1. The leader's log could be partially overlapping with the follower's log. The only way to get an OffsetOutOfRangeException in such a situation is when the follower's end offset is ahead of the leader's end offset. This is possible if there is unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up and before it has completely caught up with the leader's logs, the ISR goes down. The follower is now uncleanly elected as the new leader, and it appends messages. The old leader comes back up, becomes a follower and it may find that the current leader's end offset falls between its own start offset and its own end offset. In such a case, truncate the follower's log to the current leader's end offset and continue fetching. There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. 2. Otherwise, the leader's log could be completely non-overlapping with the follower's log: i. The follower could have been down for a long time and when it starts up, its end offset could be smaller than or equal to the leader's start offset because the leader has deleted old logs (log.logEndOffset <= leaderStartOffset). OR ii. Unclean leader election: A follower could be down for a long time. When it starts up, the ISR goes down before the follower has the opportunity to even start catching up with the leader's logs. The follower is now uncleanly elected as the new leader. The old leader comes back up, becomes a follower and it may find that the current leader's end offset is smaller than or equal to its own start offset (log.logStartOffset >= leaderEndOffset). In both these cases, roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching. Other changes: 1. Fixed the error message for autoOffsetReset in ConsumerConfig. 2. Added a method logStartOffset in Log.
        Swapnil Ghike made changes -
        Attachment kafka-763-new-v1.patch [ 12573258 ]
        Swapnil Ghike made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Some comments:

        1. We should try to reuse SimpleConsumer.earliestOrLatestOffset(). Currently, there is one version using broker and another using zkclient. The latter is actually not used. We can remove the latter and modify the former to take a SimpleConsumer, instead of Broker. Then we can reuse this method in SimpleConsumerShell, ReplicaFetcherThread, and ConsumerFetcherThread. The logic in SimpleConsumer.earliestOrLatestOffset() actually has a bug. It doesn't check the error code in the fetch offset response. We should fix that.

        2. ReplicaFetcherThread:
        2.1 In the following line, the test should be <= and >=, respectively.
        if (log.logStartOffset < leaderEndOffset && log.logEndOffset > leaderEndOffset) {
        In trunk, we probably don't need to check the first condition log.logStartOffset < leaderEndOffset. This is because in trunk, log.truncateTo() handles the case when targetOffset < log.logStartOffset already. It starts a new log with the targetOffset, which is what we want.
        2.2 There are two places in the comment that have "the ISR goes down". It should be "the leader goes down".
        2.3 The comment for "2. Unclean leader election" is redundant since it's covered by other comments already.

        Show
        Jun Rao added a comment - Thanks for the patch. Some comments: 1. We should try to reuse SimpleConsumer.earliestOrLatestOffset(). Currently, there is one version using broker and another using zkclient. The latter is actually not used. We can remove the latter and modify the former to take a SimpleConsumer, instead of Broker. Then we can reuse this method in SimpleConsumerShell, ReplicaFetcherThread, and ConsumerFetcherThread. The logic in SimpleConsumer.earliestOrLatestOffset() actually has a bug. It doesn't check the error code in the fetch offset response. We should fix that. 2. ReplicaFetcherThread: 2.1 In the following line, the test should be <= and >=, respectively. if (log.logStartOffset < leaderEndOffset && log.logEndOffset > leaderEndOffset) { In trunk, we probably don't need to check the first condition log.logStartOffset < leaderEndOffset. This is because in trunk, log.truncateTo() handles the case when targetOffset < log.logStartOffset already. It starts a new log with the targetOffset, which is what we want. 2.2 There are two places in the comment that have "the ISR goes down". It should be "the leader goes down". 2.3 The comment for "2. Unclean leader election" is redundant since it's covered by other comments already.
        Hide
        Swapnil Ghike added a comment -

        1. Moved earliestOrLatestOffset to SimpleConsumer class. Checking for error code in the fetch offset response.

        2.1 Removed the first condition, since we roll a new log starting from the targetOffSet if the targetOffset is less than the startOffset. The patch will be applicable to trunk as well.
        2.2 Changed comments to "all replicas in the ISR go down"
        2.3 Removed this comment as it is no longer needed due to 2.1.

        Show
        Swapnil Ghike added a comment - 1. Moved earliestOrLatestOffset to SimpleConsumer class. Checking for error code in the fetch offset response. 2.1 Removed the first condition, since we roll a new log starting from the targetOffSet if the targetOffset is less than the startOffset. The patch will be applicable to trunk as well. 2.2 Changed comments to "all replicas in the ISR go down" 2.3 Removed this comment as it is no longer needed due to 2.1.
        Swapnil Ghike made changes -
        Attachment kafka-763-new-v2.patch [ 12573455 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. +1 Committed to 0.8 with the following minor change.

        In SimpleConsumerShell, if we get an exception when getting the earliest/latest offset, print to error and exit.

        Show
        Jun Rao added a comment - Thanks for patch v2. +1 Committed to 0.8 with the following minor change. In SimpleConsumerShell, if we get an exception when getting the earliest/latest offset, print to error and exit.
        Jun Rao made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Fix Version/s 0.8 [ 12317244 ]
        Resolution Fixed [ 1 ]
        Hide
        Neha Narkhede added a comment -

        Sorry for coming late to the review. Two suggestions -

        1. In handleOffsetOutOfRange(), we are using an ordinary consumer id to fetch the log end offset from the leader. That will just fetch the hw, but we should be fetching the log end offset.
        2. It is a good idea to avoid using return statements to exit from scala functions, if possible. It breaks the functional programming paradigm. What you can do here is -
        if (leaderEndOffset < log.logEndOffset)

        { log.truncateTo(leaderEndOffset) leaderEndOffset }

        else

        { val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) log.truncateAndStartWithNewOffset(leaderStartOffset) leaderStartOffset }
        Show
        Neha Narkhede added a comment - Sorry for coming late to the review. Two suggestions - 1. In handleOffsetOutOfRange(), we are using an ordinary consumer id to fetch the log end offset from the leader. That will just fetch the hw, but we should be fetching the log end offset. 2. It is a good idea to avoid using return statements to exit from scala functions, if possible. It breaks the functional programming paradigm. What you can do here is - if (leaderEndOffset < log.logEndOffset) { log.truncateTo(leaderEndOffset) leaderEndOffset } else { val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) log.truncateAndStartWithNewOffset(leaderStartOffset) leaderStartOffset }
        Hide
        Jun Rao added a comment -

        For 1, I think the logic in the patch is correct, but confusing. ReplicaId in OffsetRequest can be of three forms: (1) a value >= 0, which indicates that the request is from a follower and therefore logEndOffset can be returned as the latest offset if needed; (2) -2, which indicates that the request is from SimpleConsumerShell and thus allows the offset request to be served from non-leader; (3) -1, for other consumers and HW is returned as the latest offset, if needed. The confusing part is that in simpleConsumer.earliestOrLatestOffset(), we have both consumerId and isFromOrdinaryConsumer to set replicaId in the OffsetRequest. It would probably be simpler if the method only takes one parameter replicaId.

        Show
        Jun Rao added a comment - For 1, I think the logic in the patch is correct, but confusing. ReplicaId in OffsetRequest can be of three forms: (1) a value >= 0, which indicates that the request is from a follower and therefore logEndOffset can be returned as the latest offset if needed; (2) -2, which indicates that the request is from SimpleConsumerShell and thus allows the offset request to be served from non-leader; (3) -1, for other consumers and HW is returned as the latest offset, if needed. The confusing part is that in simpleConsumer.earliestOrLatestOffset(), we have both consumerId and isFromOrdinaryConsumer to set replicaId in the OffsetRequest. It would probably be simpler if the method only takes one parameter replicaId.
        Hide
        Neha Narkhede added a comment -

        Yes, that method should only take replicaId. Let's change it, its very confusing

        Show
        Neha Narkhede added a comment - Yes, that method should only take replicaId. Let's change it, its very confusing
        Hide
        Swapnil Ghike added a comment -

        Since patch new-v2 has been committed, this patch contains diff on top of 0.8 HEAD. Addressed the comments above.

        Show
        Swapnil Ghike added a comment - Since patch new-v2 has been committed, this patch contains diff on top of 0.8 HEAD. Addressed the comments above.
        Swapnil Ghike made changes -
        Attachment kafka-763-new-v3.patch [ 12573561 ]
        Hide
        Neha Narkhede added a comment -

        Thanks for the follow up patch, Swapnil! LGTM, +1

        Show
        Neha Narkhede added a comment - Thanks for the follow up patch, Swapnil! LGTM, +1
        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v3. Looks good. However, I overlooked one thing in my previous review. Could you patch ConsumerFetcherThread.handleOffsetOutOfRange() to use SimpleConsumer.earliestOrLatestOffset() too?

        Show
        Jun Rao added a comment - Thanks for patch v3. Looks good. However, I overlooked one thing in my previous review. Could you patch ConsumerFetcherThread.handleOffsetOutOfRange() to use SimpleConsumer.earliestOrLatestOffset() too?
        Hide
        Swapnil Ghike added a comment -

        Modified ConsumerFetcherThread to use SimpleConsumer.earliestOrLatestOffset(). Also added clientId while creating OffsetRequest in earliestOrLatestOffset().

        Show
        Swapnil Ghike added a comment - Modified ConsumerFetcherThread to use SimpleConsumer.earliestOrLatestOffset(). Also added clientId while creating OffsetRequest in earliestOrLatestOffset().
        Swapnil Ghike made changes -
        Attachment kafka-763-new-v4.patch [ 12573587 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v4. Committed to 0.8.

        Show
        Jun Rao added a comment - Thanks for patch v4. Committed to 0.8.

          People

          • Assignee:
            Swapnil Ghike
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development