Kafka
  1. Kafka
  2. KAFKA-45

Broker startup, leader election, becoming a leader/follower for intra-cluster replication

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      We need to implement the logic for starting a broker with replicated partitions, the leader election logic and how to become a leader and a follower.

        Issue Links

        There are no Sub-Tasks for this issue.

          Activity

          Hide
          Prashanth Menon added a comment -

          Hey everyone. Since the ZK structures are effectively done as part of KAFKA-47, I thought I'd start on this ticket. Something that came up was the log cleanup functionality within LogManager will need to be tweaked. My thinking is that Replica's should manage the cleanup of their local logs (moving that functionality from LogManager) and need to be in-sync with the leader with respect to hw, logEndOffset, and also the logMinOffset; essentially, the amount of data available on all ISR for replica R must be the same reglardless of each individual broker's log cleanup configuration. Not sure how that information should be propagated, whether through the follower's fetch request or somewhere in ZK, that should be left up to discussion. Regardless, non-leader replica's can use this minOffset to perform local log cleanups, I suppose. Please do let me know if I'm missing a peice of the puzzle here or if there's a simpler solution.

          Show
          Prashanth Menon added a comment - Hey everyone. Since the ZK structures are effectively done as part of KAFKA-47 , I thought I'd start on this ticket. Something that came up was the log cleanup functionality within LogManager will need to be tweaked. My thinking is that Replica's should manage the cleanup of their local logs (moving that functionality from LogManager) and need to be in-sync with the leader with respect to hw, logEndOffset, and also the logMinOffset; essentially, the amount of data available on all ISR for replica R must be the same reglardless of each individual broker's log cleanup configuration. Not sure how that information should be propagated, whether through the follower's fetch request or somewhere in ZK, that should be left up to discussion. Regardless, non-leader replica's can use this minOffset to perform local log cleanups, I suppose. Please do let me know if I'm missing a peice of the puzzle here or if there's a simpler solution.
          Hide
          Neha Narkhede added a comment -

          Prashanth,

          Thanks for helping out on replication and getting in the format changes. Before starting on this, we are blocked on KAFKA-49 and KAFKA-44. It seems like it will be most effective to get some help on these JIRAs first. Just a suggestion, comments are welcome!

          Show
          Neha Narkhede added a comment - Prashanth, Thanks for helping out on replication and getting in the format changes. Before starting on this, we are blocked on KAFKA-49 and KAFKA-44 . It seems like it will be most effective to get some help on these JIRAs first. Just a suggestion, comments are welcome!
          Hide
          Prashanth Menon added a comment -

          No worries. The reason I chose to start here was that I felt it required the creation of some of the base entities required for KAFKA-49 and KAFKA-44. Since 49 is still semi-blocked by 240 on the produce-side, I didn't want to make too many changes there. 44 on the other hand, makes use of some of algorithms used as part of start up. Thinking about it, 44 is probably a better place to start in terms of complexity. Thanks!

          Show
          Prashanth Menon added a comment - No worries. The reason I chose to start here was that I felt it required the creation of some of the base entities required for KAFKA-49 and KAFKA-44 . Since 49 is still semi-blocked by 240 on the produce-side, I didn't want to make too many changes there. 44 on the other hand, makes use of some of algorithms used as part of start up. Thinking about it, 44 is probably a better place to start in terms of complexity. Thanks!
          Hide
          Jun Rao added a comment -

          Prashanth,

          That's a good question. We'd like to make replicas identical with each other. This is easy for size-based log retention, but harder for time-based log retention. What you proposed is to have only the leader delete old log segments and propagate this information to the followers. This seems like a reasonable approach. The question is how should the leader communicate such information to the followers. One possibility is to piggyback on the FetchResponse returned to the followers. This will mean some extra optional fields in FetchResponse.

          Show
          Jun Rao added a comment - Prashanth, That's a good question. We'd like to make replicas identical with each other. This is easy for size-based log retention, but harder for time-based log retention. What you proposed is to have only the leader delete old log segments and propagate this information to the followers. This seems like a reasonable approach. The question is how should the leader communicate such information to the followers. One possibility is to piggyback on the FetchResponse returned to the followers. This will mean some extra optional fields in FetchResponse.
          Hide
          Jay Kreps added a comment -

          I think we are overthinking this. Currently cleanup is not a precise SLA, it is just a guarantee of the form "we will never delete anything younger than X OR we will always maintain at least Y bytes of messages". Trying to maintain this in synchronous form across nodes is overkill I think. It is fine if every node acts independently as long as each of them respects the SLA. I think this should be much simpler and more likely to work.

          Show
          Jay Kreps added a comment - I think we are overthinking this. Currently cleanup is not a precise SLA, it is just a guarantee of the form "we will never delete anything younger than X OR we will always maintain at least Y bytes of messages". Trying to maintain this in synchronous form across nodes is overkill I think. It is fine if every node acts independently as long as each of them respects the SLA. I think this should be much simpler and more likely to work.
          Hide
          Neha Narkhede added a comment -

          Here is something to think about wrt to leader election and replica failures -

          If there are 3 replicas for a partition, and the leader acks the produce request once the request is acked by the 2 followers. The produce request doesn't care about the replication factor. So if one of the followers is slow, the leader will receive less than 2 acks from the followers, and it will go ahead and send a success ACK to the producer. The replicas update their HW only on the next replica fetch response. Since the HW committer thread is running independently. it is possible that the checkpointed HW of one of the 3 replicas is lower than the others.

          If at this point, if leader fails, it will trigger the leader election procedure. According to the current design proposal, any replica in the ISR can become the leader. If the replica with the lower HW becomes the leader, then it will truncate its log upto this last checkpointed HW and start taking produce requests from there. The other 2 replicas, will send ReplicaFetchRequests with an offset that doesn't exist on the leader.

          Effectively, it seems that we will end up losing some successfully acknowledged produce requests. Probably, the leader election procedure should check the HW of the participating replicas and give preference to replica with highest HW ?

          Show
          Neha Narkhede added a comment - Here is something to think about wrt to leader election and replica failures - If there are 3 replicas for a partition, and the leader acks the produce request once the request is acked by the 2 followers. The produce request doesn't care about the replication factor. So if one of the followers is slow, the leader will receive less than 2 acks from the followers, and it will go ahead and send a success ACK to the producer. The replicas update their HW only on the next replica fetch response. Since the HW committer thread is running independently. it is possible that the checkpointed HW of one of the 3 replicas is lower than the others. If at this point, if leader fails, it will trigger the leader election procedure. According to the current design proposal, any replica in the ISR can become the leader. If the replica with the lower HW becomes the leader, then it will truncate its log upto this last checkpointed HW and start taking produce requests from there. The other 2 replicas, will send ReplicaFetchRequests with an offset that doesn't exist on the leader. Effectively, it seems that we will end up losing some successfully acknowledged produce requests. Probably, the leader election procedure should check the HW of the participating replicas and give preference to replica with highest HW ?
          Hide
          Jun Rao added a comment -

          If there are 2 followers and leader receives ack from only follow 1, but not follower 2 (within timeout), the leader will kick follower 2 from ISR before it can commit the message and ack the producer. So, follower 2 will never get a chance to become the new leader should the current leader fail.

          Show
          Jun Rao added a comment - If there are 2 followers and leader receives ack from only follow 1, but not follower 2 (within timeout), the leader will kick follower 2 from ISR before it can commit the message and ack the producer. So, follower 2 will never get a chance to become the new leader should the current leader fail.
          Hide
          Neha Narkhede added a comment -

          Resolved as part of KAFKA-46

          Show
          Neha Narkhede added a comment - Resolved as part of KAFKA-46

            People

            • Assignee:
              Neha Narkhede
              Reporter:
              Jun Rao
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development