Kafka
  1. Kafka
  2. KAFKA-46

Commit thread, ReplicaFetcherThread for intra-cluster replication

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      We need to implement the commit thread at the leader and the fetcher thread at the follower for replication the data from the leader.

      1. kafka-46-draft.patch
        77 kB
        Neha Narkhede
      2. kafka-46-v1.patch
        136 kB
        Neha Narkhede
      3. kafka-46-v2.patch
        139 kB
        Neha Narkhede
      4. kafka-46-v3.patch
        124 kB
        Neha Narkhede
      5. kafka-46-v4.patch
        130 kB
        Neha Narkhede

        Issue Links

          Activity

          Hide
          Jun Rao added a comment -

          +1 for patch v4.

          Show
          Jun Rao added a comment - +1 for patch v4.
          Hide
          Neha Narkhede added a comment -

          Attaching an updated patch that includes the rebase changes from KAFKA-348

          Also, updated the follow up JIRAs - KAFKA-350 and KAFKA-351

          Show
          Neha Narkhede added a comment - Attaching an updated patch that includes the rebase changes from KAFKA-348 Also, updated the follow up JIRAs - KAFKA-350 and KAFKA-351
          Hide
          Jun Rao added a comment -

          + 1 from me for patch v3. Let's see if there are more comments from others.

          For 27, it's ok to resolve this in kafka-350. Could you update the jira so that we remember all the changes that need to be made? Ditto for kafka-351.

          Show
          Jun Rao added a comment - + 1 from me for patch v3. Let's see if there are more comments from others. For 27, it's ok to resolve this in kafka-350. Could you update the jira so that we remember all the changes that need to be made? Ditto for kafka-351.
          Hide
          Neha Narkhede added a comment -

          Updated patch to address Jun's suggestions -

          1. Fixed the ISR expiration for stuck followers case
          2. HW maintenance work is postponed to KAFKA-350
          3. System test (KAFKA-341), that tests message replication without failures, works on this patch

          More detailed comments -

          21. Removed it

          22. Partition.getOutOfSyncReplicas(): Good catch ! Fixed the logic and added another test case for this.

          23. ReplicaManager: Done

          24. Replica: Changed the name to logEndOffsetUpdateTime()

          25. KafkaConfig: Changed the variable and config names to replica.*

          26. KafkaServer: Well, become* makes sense on the entity that is changing its state (Replica), make*, I thought made sense on the actor (KafkaServer). But that is just a matter of personal taste

          27. There are some nitty gritty details about HW maintenance that I would like to fix as part of KAFKA-350

          28. ISRExpirationTest: Done

          29. PrimitiveApiTest.testConsumerNotExistTopic(): I think the right fix is to throw a descriptive exception is UnknownTopicException when a client makes a produce/consume request for a topic that has never been created. Filed KAFKA-351 to fix it.

          30. TestUtils:Fixed it

          Show
          Neha Narkhede added a comment - Updated patch to address Jun's suggestions - 1. Fixed the ISR expiration for stuck followers case 2. HW maintenance work is postponed to KAFKA-350 3. System test ( KAFKA-341 ), that tests message replication without failures, works on this patch More detailed comments - 21. Removed it 22. Partition.getOutOfSyncReplicas(): Good catch ! Fixed the logic and added another test case for this. 23. ReplicaManager: Done 24. Replica: Changed the name to logEndOffsetUpdateTime() 25. KafkaConfig: Changed the variable and config names to replica.* 26. KafkaServer: Well, become* makes sense on the entity that is changing its state (Replica), make*, I thought made sense on the actor (KafkaServer). But that is just a matter of personal taste 27. There are some nitty gritty details about HW maintenance that I would like to fix as part of KAFKA-350 28. ISRExpirationTest: Done 29. PrimitiveApiTest.testConsumerNotExistTopic(): I think the right fix is to throw a descriptive exception is UnknownTopicException when a client makes a produce/consume request for a topic that has never been created. Filed KAFKA-351 to fix it. 30. TestUtils:Fixed it
          Hide
          Jun Rao added a comment -

          Thanks for patch v2. To help people review the code, I summarized the logic of handling the produce and fetch request on the server in this wiki: https://cwiki.apache.org/confluence/display/KAFKA/Handling+Produce+and+Fetch+Requests+in+KafkaApi

          Some new comments:
          21. ISRExpirationThread: It seems that this class is no longer used. Let's remove it.

          22. Partition.getOutOfSyncReplicas(): The first condition doesn't seem to implement what's in the comment. It doesn't check the leader's leo update time. Also, the condition specified in the comment doesn't seem sufficient. Suppose that the leader gets 100 bytes of more data, after which no more data is coming. A follower gets the first 50 bytes and then stopped. The follower's leo has been updated after the leader's leo was last updated. However, we still need to take the follower out of ISR. How about changing the condition to: select replicas whose leo is less than the leo of leader and whose leo hasn't been updated for keepInsyncTime.

          23. ReplicaManager:
          23.1 makeLeader(): remove comment "also add this partition to the ISR expiration priority queue"
          23.2 makeFollower(): If a follower switches leader, we should stop the old FetchThread before starting the new one.

          24. Replica.leoUpdateTime(): use logEndOffsetUpdateTime to be consistent.

          25. KafkaConfig: Let's keep the variable name and property name consistent. If we choose to use replication as the prefix for property name, use the same prefix for variable names.

          26. KafkaServer: To be consistent, we should probably name becomeLeader and becomeFollower as makeLeader and makeFollower, respectively.

          27. Log.recoverUptoLastCheckpointedHW(): not sure if comment 16.2 is addressed. Removed segments are not physically deleted.

          28. ISRExpirationTest:
          28.1 testISRExpirationForSlowFollowers(): the comment says set leo of remote replica to sth like 2, but the code set it to 4.
          28.2 testISRExpirationForStuckFollowers() and testISRExpirationForSlowFollowers(): is Thread.sleep() really needed? testISRExpirationForMultiplePartitions() didn't seem to use Thread.sleep().

          29. PrimitiveApiTest.testConsumerNotExistTopic(): I think this test is just to make sure that the client can get the error code on a non-existing topic.

          30. TestUtils:follower.socket.timeout.ms is now renamed to replication.socket.timeout.ms

          Show
          Jun Rao added a comment - Thanks for patch v2. To help people review the code, I summarized the logic of handling the produce and fetch request on the server in this wiki: https://cwiki.apache.org/confluence/display/KAFKA/Handling+Produce+and+Fetch+Requests+in+KafkaApi Some new comments: 21. ISRExpirationThread: It seems that this class is no longer used. Let's remove it. 22. Partition.getOutOfSyncReplicas(): The first condition doesn't seem to implement what's in the comment. It doesn't check the leader's leo update time. Also, the condition specified in the comment doesn't seem sufficient. Suppose that the leader gets 100 bytes of more data, after which no more data is coming. A follower gets the first 50 bytes and then stopped. The follower's leo has been updated after the leader's leo was last updated. However, we still need to take the follower out of ISR. How about changing the condition to: select replicas whose leo is less than the leo of leader and whose leo hasn't been updated for keepInsyncTime. 23. ReplicaManager: 23.1 makeLeader(): remove comment "also add this partition to the ISR expiration priority queue" 23.2 makeFollower(): If a follower switches leader, we should stop the old FetchThread before starting the new one. 24. Replica.leoUpdateTime(): use logEndOffsetUpdateTime to be consistent. 25. KafkaConfig: Let's keep the variable name and property name consistent. If we choose to use replication as the prefix for property name, use the same prefix for variable names. 26. KafkaServer: To be consistent, we should probably name becomeLeader and becomeFollower as makeLeader and makeFollower, respectively. 27. Log.recoverUptoLastCheckpointedHW(): not sure if comment 16.2 is addressed. Removed segments are not physically deleted. 28. ISRExpirationTest: 28.1 testISRExpirationForSlowFollowers(): the comment says set leo of remote replica to sth like 2, but the code set it to 4. 28.2 testISRExpirationForStuckFollowers() and testISRExpirationForSlowFollowers(): is Thread.sleep() really needed? testISRExpirationForMultiplePartitions() didn't seem to use Thread.sleep(). 29. PrimitiveApiTest.testConsumerNotExistTopic(): I think this test is just to make sure that the client can get the error code on a non-existing topic. 30. TestUtils:follower.socket.timeout.ms is now renamed to replication.socket.timeout.ms
          Hide
          Neha Narkhede added a comment -

          Filed KAFKA-350 for improving the high watermark maintenance
          Filed KAFKA-351 to cover the refactoring suggestions.

          Now, we need some serious system testing for all this code !

          Show
          Neha Narkhede added a comment - Filed KAFKA-350 for improving the high watermark maintenance Filed KAFKA-351 to cover the refactoring suggestions. Now, we need some serious system testing for all this code !
          Hide
          Neha Narkhede added a comment - - edited

          Jay,

          Thanks for thinking through the code structure, I've included more refactoring changes in this patch. Some of the suggestions are orthogonal to this patch and I'd prefer to fix it in another JIRA, given the complexity of this patch. Maybe I can create a 'refactoring' JIRA after this one to cover some of these -

          2. Makes sense. I guess that was an over optimization.
          3. This is a good suggestion, al though would prefer keeping it to refactoring JIRA
          4. Picked descriptive names
          5. Somehow I like the idea of wrapping up enough logic inside Replica to figure out if it is a follower or leader. ReplicaFetcherThread inside Replica allows that. Al though, I'm not sure that is the best way to achieve it.
          6. Yeah, probably something to think about. Will move it to the refactoring JIRA
          7. I like Option 4 there, hoping that can be fixed in a separate JIRA
          8. Yeah, I moved some zookeeper client access to ReplicaManager so that all replication specific logic can be moved there.
          9. Changed configs to replication.*
          11. Simplified the ISR expiration. Looks better now.
          12. Hmm, Utils.newThread returns Thread, but I think it is useful to use some APIs specific to ReplicaFetcherThread like getIfFollowerAndLeader(). But I see your point here. Given a choice, it is always better to use a helper method. I set the daemon property and the thread handles all Throwables.
          13. Yeah, this is a good suggestion. This also fits in generic refactoring category that can be fixed separately.
          14. This is another great suggestion. Please see the included patch if you like it.
          15. Fixed it
          16. Fixed it
          17. Yeah, this will keep changing with the v3 code. Will be good to keep this in mind though.

          Overall, I liked your refactoring suggestions, and I might have been lazy to describe all of the changes I made here. Will really appreciate it if you can read through the new patch and suggest improvements. I'm fine with working through more in this patch itself, if you feel that works better.

          Show
          Neha Narkhede added a comment - - edited Jay, Thanks for thinking through the code structure, I've included more refactoring changes in this patch. Some of the suggestions are orthogonal to this patch and I'd prefer to fix it in another JIRA, given the complexity of this patch. Maybe I can create a 'refactoring' JIRA after this one to cover some of these - 2. Makes sense. I guess that was an over optimization. 3. This is a good suggestion, al though would prefer keeping it to refactoring JIRA 4. Picked descriptive names 5. Somehow I like the idea of wrapping up enough logic inside Replica to figure out if it is a follower or leader. ReplicaFetcherThread inside Replica allows that. Al though, I'm not sure that is the best way to achieve it. 6. Yeah, probably something to think about. Will move it to the refactoring JIRA 7. I like Option 4 there, hoping that can be fixed in a separate JIRA 8. Yeah, I moved some zookeeper client access to ReplicaManager so that all replication specific logic can be moved there. 9. Changed configs to replication.* 11. Simplified the ISR expiration. Looks better now. 12. Hmm, Utils.newThread returns Thread, but I think it is useful to use some APIs specific to ReplicaFetcherThread like getIfFollowerAndLeader(). But I see your point here. Given a choice, it is always better to use a helper method. I set the daemon property and the thread handles all Throwables. 13. Yeah, this is a good suggestion. This also fits in generic refactoring category that can be fixed separately. 14. This is another great suggestion. Please see the included patch if you like it. 15. Fixed it 16. Fixed it 17. Yeah, this will keep changing with the v3 code. Will be good to keep this in mind though. Overall, I liked your refactoring suggestions, and I might have been lazy to describe all of the changes I made here. Will really appreciate it if you can read through the new patch and suggest improvements. I'm fine with working through more in this patch itself, if you feel that works better.
          Hide
          Jay Kreps added a comment -

          Indeed, this is replication! The rest of it is just a simple matter of handling failures and a little tooling. Very nicely done.

          Show
          Jay Kreps added a comment - Indeed, this is replication! The rest of it is just a simple matter of handling failures and a little tooling. Very nicely done.
          Hide
          Neha Narkhede added a comment -

          Regarding Jun's comments -

          4, 6: Done

          New review comments:
          11. KafkaApis:
          11.1 Makes sense
          11.2 Added a new exception class NotLeaderForPartitionException. We can improve the naming going forward.
          11.3 Done

          12. ReplicaManager:
          12.1 Done
          12.2 Good catch
          12.3 Ideally, I would like to get rid of allReplicas, maybe do it differently. I'm thinking of fixing this in another JIRA. Let me know if you prefer fixing it in this one.

          13. Replica:
          13.1 Done

          14. KafkaServer:
          14.1 Removed the TODOs. They are addressed.

          15. ISRExpirationThread:
          15.1 Done
          15.2 I've included logic for handling slow and stuck followers, and unit tested it.
          15.3 It has completely disappeared now.
          15.4 Agreed

          16. LogDisk: recoverUptoLastCheckpointedHW():
          16.1 That's a good point.
          16.2 Done

          17. LogOffsetTest:
          17.1 Deleted it

          18. PrimitiveApiTest:
          18.1 testConsumerNotExistTopic() Actually I'm not too sure this test makes sense in the replication branch. Is this testing that the server returns some meaningful error code if it receives a request for an unknown topic ? If yes, maybe we don't need a consumer to test that logic. I haven't fixed this test, maybe we can think more on what exactly we want to test here.

          19. ProducerTest:
          19.1 testZKSendToNewTopic(): Done

          20. ReplicaFetchTest:
          20.1 Right
          20.2 testReplicaFetcherThreadI(): That is a good suggestion. I'd like to clean up unit tests and add related helper APIs, maybe in another JIRA.

          Show
          Neha Narkhede added a comment - Regarding Jun's comments - 4, 6: Done New review comments: 11. KafkaApis: 11.1 Makes sense 11.2 Added a new exception class NotLeaderForPartitionException. We can improve the naming going forward. 11.3 Done 12. ReplicaManager: 12.1 Done 12.2 Good catch 12.3 Ideally, I would like to get rid of allReplicas, maybe do it differently. I'm thinking of fixing this in another JIRA. Let me know if you prefer fixing it in this one. 13. Replica: 13.1 Done 14. KafkaServer: 14.1 Removed the TODOs. They are addressed. 15. ISRExpirationThread: 15.1 Done 15.2 I've included logic for handling slow and stuck followers, and unit tested it. 15.3 It has completely disappeared now. 15.4 Agreed 16. LogDisk: recoverUptoLastCheckpointedHW(): 16.1 That's a good point. 16.2 Done 17. LogOffsetTest: 17.1 Deleted it 18. PrimitiveApiTest: 18.1 testConsumerNotExistTopic() Actually I'm not too sure this test makes sense in the replication branch. Is this testing that the server returns some meaningful error code if it receives a request for an unknown topic ? If yes, maybe we don't need a consumer to test that logic. I haven't fixed this test, maybe we can think more on what exactly we want to test here. 19. ProducerTest: 19.1 testZKSendToNewTopic(): Done 20. ReplicaFetchTest: 20.1 Right 20.2 testReplicaFetcherThreadI(): That is a good suggestion. I'd like to clean up unit tests and add related helper APIs, maybe in another JIRA.
          Hide
          Neha Narkhede added a comment -

          Thanks for the great feedback ! This is probably the largest jira for KAFKA-50, I've done my best to include the review changes in this JIRA. I will file separate JIRAs to include other review suggestions. Let me attempt to describe the changes made in this patch -

          1. Simplied the ISR maintenance logic to iterate through the partitions every keepInSyncTimeMs ms. I guess the overhead is O for isr expiration and O(1) for replica fetch requests. This seems reasonable since keepInSyncTimeMs is expected to be in the order of several seconds and replica fetch requests are easily more frequent than that.
          2. Fixed ISR expiration logic to remove a slow follower as well as a stuck follower from the ISR
          2. Moved replication specific logic inside ReplicaManager and Partition. So KafkaApis and KafkaServer have minimum replication specific code
          3. Removed InMemoryLog, I guess that was an over optimization
          4. Kept the high watermarks in a separate file, will fix it in a separate JIRA to contain the changes in this JIRA.

          Show
          Neha Narkhede added a comment - Thanks for the great feedback ! This is probably the largest jira for KAFKA-50 , I've done my best to include the review changes in this JIRA. I will file separate JIRAs to include other review suggestions. Let me attempt to describe the changes made in this patch - 1. Simplied the ISR maintenance logic to iterate through the partitions every keepInSyncTimeMs ms. I guess the overhead is O for isr expiration and O(1) for replica fetch requests. This seems reasonable since keepInSyncTimeMs is expected to be in the order of several seconds and replica fetch requests are easily more frequent than that. 2. Fixed ISR expiration logic to remove a slow follower as well as a stuck follower from the ISR 2. Moved replication specific logic inside ReplicaManager and Partition. So KafkaApis and KafkaServer have minimum replication specific code 3. Removed InMemoryLog, I guess that was an over optimization 4. Kept the high watermarks in a separate file, will fix it in a separate JIRA to contain the changes in this JIRA.
          Hide
          Jun Rao added a comment -

          Jay's comments remind another thing:

          21. The follower's HW should be min(follower LEO, leader HW). This is to handle the case that a follower is still catching up.

          Show
          Jun Rao added a comment - Jay's comments remind another thing: 21. The follower's HW should be min(follower LEO, leader HW). This is to handle the case that a follower is still catching up.
          Hide
          Jay Kreps added a comment -

          Comments
          Some of these we discussed in person but I wanted to post them here for anyone else following along. A number of these are really just structural or naming comments. When we get closer to final form I will do a pass on trying to understand all the logic and see if I can find any corner cases, but I haven't done that yet. As a result I am not really sure if I understand everything I am commenting on, so take it all with a grain of salt.

          1. This patch pays really good attention to code structure and testability which is awesome since we are adding gobs of hard logic. Nice to see things getting cleaner as we do this.
          2. For some reason I preferred just having Log instead of DiskLog and MemoryLog. I feel like doing it twice tends to lead to similar logic in both. I do like the idea of lightening unit tests. I wonder if a helper method to set up a log wouldn't be good enough, though? Not sure if this is a rational preference or just inertia on my part, though, so feel free to ignore. If we do separate out a Log trait I feel we should clean up that interface a bit it is kind of a disaster right now (probably we should do that regardless).
          3. Maybe HW mark should move out of Log since now it really indicates something about the state of other servers and Log is meant to be just a simple stand alone log implementation.
          4. I think expanding out some of the acronyms in public methods would be nice: i.e. highWatermark and logEndOffset. Having a concise local variable name is helpful but for the poor person trying to learn the code i think the slightly more verbose name is helpful. If you prefer the more concise naming then just having good javadoc that explains the abbreviations would be good.
          5. Consider making Replica just be a dumb pojos (posos?) and move the ReplicaFetcherThread elsewhere.
          6. We currently have Partition and BrokerPartitionInfo. We should clarify why both of these and make the naming make sense. To me a partition is logically just (topic, partId). I think BrokerPartitionInfo is really a bit hard to understand, though that is unrelated to this patch. Partition is more like the broker's information about replicas of that partition. Also both Partition and Replica are in the cluster package which was originally shared by client and server. Now with all the additional stuff this is really part of the server only, right? Probably we should change the package name...?
          7. I sent a separate email on setters/getters. I think overall there area a lot of setter/getter methods. We should pick a style for these and go with that uniformly (we haven't been consistent so far).
          8. I think we should figure out a general strategy for managing the zookeeper interactions. I think it is wise to wrap up the zookeeper dependency but having everything in one class is too much. Maybe the way to go is to have generic ZkUtils and reusable infra like ZkQueue and then split KafkaZookeeper into the logical functions it covers.
          9. For the config I recommend the prefix "replication" or "replica" instead of "follower" (e.g. replication.socket.timeout.ms), I think this is more clear to someone who hasn't read about the internals of our replication design and doesn't know the terminology.
          10. A bit of internals have spilled into KafkaServer, such as locking, ISR management, etc. I think KafkaServer should just interact with the main subsystems of kafka in a very abstract way. I think the core problem is that we need to think more about the functionality and API of ReplicaManager. To my mind the replica manager should be the one running the ISR maintence, doing its own locking, etc. To me the main subsystems are (1) logs managed by the LogManager, (2) the network layer wrapped up by SocketServer, (3) the request processor pool (4) and now the ReplicaManager or ReplicationManager or whatever we want to call it.
          11. ISRMaintenceThread--I think you are right that we will need to be very prompt about handling ISR expiration since this is effectively part of our failure detection time. It might be good, though to just stick in the polling loop Jun suggested for now, and then come back to optimize it later (even though we almost certain will have to), just to reduce the scope in this iteration.
          12. Also make sure the ISR thread either uses the Utils.newThread helper or handles the common gotchas (thread name, set daemon properly, set uncaught exception handler). Also think through the details of the lifecycle.
          13. We have a lot of threads that basically run in a loop and use an isRunning atomic boolean and count down latch. You added two but I think we had a few others. Consider factoring this out into a helper runnable that these can extend. Verifying the lifecycle details for each is kind of a pain and it pretty easy to either not cleanly shutdown all the threads or block indefinitely or whatever.
          14. The changes in KafkaApis seem kind of brute forced. ensureLeaderOnThisBroker and the massive expansion of logic in readMessageSets seems like we are just brute forcing through this problem. We need to find a way to structure this into methods that make sense and don't reach into the internals of other parts of the system. readMessageSet is already doing crazy funky stuff that needs to be fixed. I think restructuring readMessageSet will help with some of the problems, and the rest can maybe be solved by pushing all the replica/leo/isr logic here into ReplicaManager. Basically the API level should just say ReplicaManager.leaderForPartition(id) as part of the request validation and ReplicaManager.recordFollowerPosition(...) and move all the other details out of the KafkaApis. Not sure if I understand this well enough for that to make sense...
          15. We should always return the hw mark in the PartitionData, right? This way we can do monitoring on the consumers. Currently it looks like we only do this for replicas.
          16. Name for ReplicaManager.makeSurePartitionExists and ReplicaManager.assurePartitionExists doesn't really call out the difference. I would recommend calling them getOrCreatePartition() and ensureExists()
          17. Overall I would think through the public API for ReplicaManager. I think it may be possible to move much more replica/replication/partitioning logic under this classes wrapper and out of other parts of the system which would be good.

          Show
          Jay Kreps added a comment - Comments Some of these we discussed in person but I wanted to post them here for anyone else following along. A number of these are really just structural or naming comments. When we get closer to final form I will do a pass on trying to understand all the logic and see if I can find any corner cases, but I haven't done that yet. As a result I am not really sure if I understand everything I am commenting on, so take it all with a grain of salt. 1. This patch pays really good attention to code structure and testability which is awesome since we are adding gobs of hard logic. Nice to see things getting cleaner as we do this. 2. For some reason I preferred just having Log instead of DiskLog and MemoryLog. I feel like doing it twice tends to lead to similar logic in both. I do like the idea of lightening unit tests. I wonder if a helper method to set up a log wouldn't be good enough, though? Not sure if this is a rational preference or just inertia on my part, though, so feel free to ignore. If we do separate out a Log trait I feel we should clean up that interface a bit it is kind of a disaster right now (probably we should do that regardless). 3. Maybe HW mark should move out of Log since now it really indicates something about the state of other servers and Log is meant to be just a simple stand alone log implementation. 4. I think expanding out some of the acronyms in public methods would be nice: i.e. highWatermark and logEndOffset. Having a concise local variable name is helpful but for the poor person trying to learn the code i think the slightly more verbose name is helpful. If you prefer the more concise naming then just having good javadoc that explains the abbreviations would be good. 5. Consider making Replica just be a dumb pojos (posos?) and move the ReplicaFetcherThread elsewhere. 6. We currently have Partition and BrokerPartitionInfo. We should clarify why both of these and make the naming make sense. To me a partition is logically just (topic, partId). I think BrokerPartitionInfo is really a bit hard to understand, though that is unrelated to this patch. Partition is more like the broker's information about replicas of that partition. Also both Partition and Replica are in the cluster package which was originally shared by client and server. Now with all the additional stuff this is really part of the server only, right? Probably we should change the package name...? 7. I sent a separate email on setters/getters. I think overall there area a lot of setter/getter methods. We should pick a style for these and go with that uniformly (we haven't been consistent so far). 8. I think we should figure out a general strategy for managing the zookeeper interactions. I think it is wise to wrap up the zookeeper dependency but having everything in one class is too much. Maybe the way to go is to have generic ZkUtils and reusable infra like ZkQueue and then split KafkaZookeeper into the logical functions it covers. 9. For the config I recommend the prefix "replication" or "replica" instead of "follower" (e.g. replication.socket.timeout.ms), I think this is more clear to someone who hasn't read about the internals of our replication design and doesn't know the terminology. 10. A bit of internals have spilled into KafkaServer, such as locking, ISR management, etc. I think KafkaServer should just interact with the main subsystems of kafka in a very abstract way. I think the core problem is that we need to think more about the functionality and API of ReplicaManager. To my mind the replica manager should be the one running the ISR maintence, doing its own locking, etc. To me the main subsystems are (1) logs managed by the LogManager, (2) the network layer wrapped up by SocketServer, (3) the request processor pool (4) and now the ReplicaManager or ReplicationManager or whatever we want to call it. 11. ISRMaintenceThread--I think you are right that we will need to be very prompt about handling ISR expiration since this is effectively part of our failure detection time. It might be good, though to just stick in the polling loop Jun suggested for now, and then come back to optimize it later (even though we almost certain will have to), just to reduce the scope in this iteration. 12. Also make sure the ISR thread either uses the Utils.newThread helper or handles the common gotchas (thread name, set daemon properly, set uncaught exception handler). Also think through the details of the lifecycle. 13. We have a lot of threads that basically run in a loop and use an isRunning atomic boolean and count down latch. You added two but I think we had a few others. Consider factoring this out into a helper runnable that these can extend. Verifying the lifecycle details for each is kind of a pain and it pretty easy to either not cleanly shutdown all the threads or block indefinitely or whatever. 14. The changes in KafkaApis seem kind of brute forced. ensureLeaderOnThisBroker and the massive expansion of logic in readMessageSets seems like we are just brute forcing through this problem. We need to find a way to structure this into methods that make sense and don't reach into the internals of other parts of the system. readMessageSet is already doing crazy funky stuff that needs to be fixed. I think restructuring readMessageSet will help with some of the problems, and the rest can maybe be solved by pushing all the replica/leo/isr logic here into ReplicaManager. Basically the API level should just say ReplicaManager.leaderForPartition(id) as part of the request validation and ReplicaManager.recordFollowerPosition(...) and move all the other details out of the KafkaApis. Not sure if I understand this well enough for that to make sense... 15. We should always return the hw mark in the PartitionData, right? This way we can do monitoring on the consumers. Currently it looks like we only do this for replicas. 16. Name for ReplicaManager.makeSurePartitionExists and ReplicaManager.assurePartitionExists doesn't really call out the difference. I would recommend calling them getOrCreatePartition() and ensureExists() 17. Overall I would think through the public API for ReplicaManager. I think it may be possible to move much more replica/replication/partitioning logic under this classes wrapper and out of other parts of the system which would be good.
          Hide
          Jun Rao added a comment -

          Thanks for the patch. Overall, a very encouraging patch given the complexity of this jira. Some comments:

          From previous reviews:
          Were 4.1 and 4.2 addressed in the patch? I still see CUR and reassignedReplicas.
          For 4.4, I think highWatermarkUpdateTime can be used as described in 15.2 below.
          For 6, I meant that all local variable names should also be prefixed with follower.

          New review comments:
          11. KafkaApis:
          11.1 handleFetchRequest(): if the leader of one partition is not on this broker, we reject the whole request. Ideally, we should just send the error code for that partition in the response and fulfill the rest of the request.
          11.2 handleFetchRequest() and readMessageSets(): If the leader is not on this broker, we should probably return a new type of error like NotLeaderException, instead of using InvalidPartionException or throwing IllegalStateException.
          11.3 readMessageSets(): add a comment of what -1 means for replicaId

          12. ReplicaManager:
          12.1 remove unused imports
          12.2 maybeIncrementLeaderHW(): if(newHw < oldHw) should be if(newHw > oldHw)
          12.3 We will need to either synchronize the methods in this class or use ConcurrentHashMap for allReplicas since allReplicas can be read and updated concurrently.

          13. Replica:
          13.1 hw(): to be consistent, we should probably throw IllegalStateException, instead of InvalidPartitionException.

          14. KafkaServer:
          14.1 There are a couple of TODOs. Will they be addressed in this jira or separate jiras?

          15. ISRExpirationThread:
          15.1 It seems that when the time expires, we always update the ISR. We should only update ISR if it actually shrinks.
          15.2 Currently, we take a replica out of ISR if its LEO is less than leaderHW after keepInSync time. We probably should use the following condition:
          leaderHW - r.leo > keepInSyncBytes || currentTime - r.highWatermarkUpdateTime > keepInSyncTime
          The first condition handles a slow follower and the second condition handles a stuck follower.
          15.3 I think we can potentially get rid of the inner while loop by putting all the logic when time expires in a if statement and the awaitUtil part in the else clause of the if statement.
          15.4 Also, instead of using a priority queue and keep adding and deleting partitions into the queue, would it be simpler to have the thread just check the isInsyncCondition for each partition every keepInSyncTime?

          16. LogDisk: recoverUptoLastCheckpointedHW():
          16.1 The second condition in
          segments.view.find(segment => lastKnownHW >= segment.start && lastKnownHW < segment.size)
          seems incorrect. It seems that you want to use "lastKnownHW < segment.messageSet.getEndOffset"
          16.2 The files of all deleted segments should be deleted like that in LogManager.cleanupExpiredSegments().

          17. LogOffsetTest:
          17.1 There is no need to keep testEmptyLogs(), since we have a test that covers fetching from a non-existing topic using SimpleConsumer.

          18. PrimitiveApiTest:
          18.1 testConsumerNotExistTopic(): we probably shouldn't create the topic in this case.

          19. ProducerTest:
          19.1 testZKSendToNewTopic(): Which should fix the comment that says "Available partition ids should be 0, 1, 2 and 3" since there is only 1 partition created.

          20. ReplicaFetchTest:
          20.1 Since the test is already using in-memory log, we can remove TODO in testReplicaFetcherThread().
          20.2 testReplicaFetcherThreadI(): Instead of sleeping and then checking log.get.getLogEndOffset, could we create a utility method that keeps checking until LEO reaches certain value up to a certain max wait time? Maybe we should make a more general util that waits up to a certain amount of time until a condition is satisfied.

          Show
          Jun Rao added a comment - Thanks for the patch. Overall, a very encouraging patch given the complexity of this jira. Some comments: From previous reviews: Were 4.1 and 4.2 addressed in the patch? I still see CUR and reassignedReplicas. For 4.4, I think highWatermarkUpdateTime can be used as described in 15.2 below. For 6, I meant that all local variable names should also be prefixed with follower. New review comments: 11. KafkaApis: 11.1 handleFetchRequest(): if the leader of one partition is not on this broker, we reject the whole request. Ideally, we should just send the error code for that partition in the response and fulfill the rest of the request. 11.2 handleFetchRequest() and readMessageSets(): If the leader is not on this broker, we should probably return a new type of error like NotLeaderException, instead of using InvalidPartionException or throwing IllegalStateException. 11.3 readMessageSets(): add a comment of what -1 means for replicaId 12. ReplicaManager: 12.1 remove unused imports 12.2 maybeIncrementLeaderHW(): if(newHw < oldHw) should be if(newHw > oldHw) 12.3 We will need to either synchronize the methods in this class or use ConcurrentHashMap for allReplicas since allReplicas can be read and updated concurrently. 13. Replica: 13.1 hw(): to be consistent, we should probably throw IllegalStateException, instead of InvalidPartitionException. 14. KafkaServer: 14.1 There are a couple of TODOs. Will they be addressed in this jira or separate jiras? 15. ISRExpirationThread: 15.1 It seems that when the time expires, we always update the ISR. We should only update ISR if it actually shrinks. 15.2 Currently, we take a replica out of ISR if its LEO is less than leaderHW after keepInSync time. We probably should use the following condition: leaderHW - r.leo > keepInSyncBytes || currentTime - r.highWatermarkUpdateTime > keepInSyncTime The first condition handles a slow follower and the second condition handles a stuck follower. 15.3 I think we can potentially get rid of the inner while loop by putting all the logic when time expires in a if statement and the awaitUtil part in the else clause of the if statement. 15.4 Also, instead of using a priority queue and keep adding and deleting partitions into the queue, would it be simpler to have the thread just check the isInsyncCondition for each partition every keepInSyncTime? 16. LogDisk: recoverUptoLastCheckpointedHW(): 16.1 The second condition in segments.view.find(segment => lastKnownHW >= segment.start && lastKnownHW < segment.size) seems incorrect. It seems that you want to use "lastKnownHW < segment.messageSet.getEndOffset" 16.2 The files of all deleted segments should be deleted like that in LogManager.cleanupExpiredSegments(). 17. LogOffsetTest: 17.1 There is no need to keep testEmptyLogs(), since we have a test that covers fetching from a non-existing topic using SimpleConsumer. 18. PrimitiveApiTest: 18.1 testConsumerNotExistTopic(): we probably shouldn't create the topic in this case. 19. ProducerTest: 19.1 testZKSendToNewTopic(): Which should fix the comment that says "Available partition ids should be 0, 1, 2 and 3" since there is only 1 partition created. 20. ReplicaFetchTest: 20.1 Since the test is already using in-memory log, we can remove TODO in testReplicaFetcherThread(). 20.2 testReplicaFetcherThreadI(): Instead of sleeping and then checking log.get.getLogEndOffset, could we create a utility method that keeps checking until LEO reaches certain value up to a certain max wait time? Maybe we should make a more general util that waits up to a certain amount of time until a condition is satisfied.
          Hide
          Neha Narkhede added a comment -

          Prashanth,

          Regarding your high level comment -

          If you use a scheduled thread pool executor to schedule every partition greadily, you would be spinning up a bunch of threads that might not do any work, since the priority of the items in the queue keeps changing. Also, you will need to change the priority of an already scheduled task, which might not be possible to do. What's more, if the threadpool maxes out, a partition that requires immediate ISR expiration might not get scheduled on time.

          I'm guessing it is unlikely that all the partitions on a node expire at the same time. Even if they do, it might take maybe a few seconds for the last partition to shrink its ISR, which is not a big deal. In reality, there would be very few partitions, maybe 1 or 2, that need to shrink their ISR due to slower followers. That's why a single thread seems to suffice for handling the ISR expiration for all partitions.

          Regarding your detailed review comments -

          1. Fixed that
          2. That is a good suggestion. I've attempted a refactoring, let me know if you have more feedback on it. The ISR thread updates the ISR in ZK AND in memory cache, where as the become leader API is passed in the latest ISR read from ZK and it just has to update its cache with that state, on becoming a leader. So, I wrapped up the cache + Zk update in an updateLeaderAndISR API in KafkaServer. This will be used by the ISR expiration thread. The become leader should not be updating anything in Zk, it should just be reading from Zk and updating its cache.
          3. The patch already does it at the end of the while loop. Or do you mean something else ?
          5. Yes
          6. The replica should be added to the ISR as soon as the fetch request is received by the server. I intended to add the replica to the ISR even if it might get ended up in the purgatory waiting for additional data. Ideally, would like to get rid of it from the readMessageSets() API.
          7. Good point. Added that.

          Jun

          Regarding your high level comments -

          1. Yes, I kept it simple in this patch, since the main goal is to get the message replication to work. Persisting all the HW in one file would definitely be a better approach and can be another JIRA
          2. Idle partitions will cause one delete and one insert into the priority queue. It doesn't look like an issue, but could be resolved by adding a TTL to items in the queue. Idle partitions will expire from the queue and will be added back to the queue when the leader receives a produce request for that partition. However, I'd like to push that to later, since it does not improve correctness and does not look like a performance issue.

          Regarding your detailed review comments -

          3.2 Good point. I think that is better too.
          3.3 Yes, this is one of those “details” that I didn't include in the draft patch.
          4.1 Removed it
          4.2 Removed CUR
          4.3 Kept it simple since replication factors that make sense in production are typically 3-5.
          4.4 Yes, but when will it be used ?

          5.1 Good catch !

          6. That exists in the patch. Did I miss any ?
          7. Good point. Fixed it

          Show
          Neha Narkhede added a comment - Prashanth, Regarding your high level comment - If you use a scheduled thread pool executor to schedule every partition greadily, you would be spinning up a bunch of threads that might not do any work, since the priority of the items in the queue keeps changing. Also, you will need to change the priority of an already scheduled task, which might not be possible to do. What's more, if the threadpool maxes out, a partition that requires immediate ISR expiration might not get scheduled on time. I'm guessing it is unlikely that all the partitions on a node expire at the same time. Even if they do, it might take maybe a few seconds for the last partition to shrink its ISR, which is not a big deal. In reality, there would be very few partitions, maybe 1 or 2, that need to shrink their ISR due to slower followers. That's why a single thread seems to suffice for handling the ISR expiration for all partitions. Regarding your detailed review comments - 1. Fixed that 2. That is a good suggestion. I've attempted a refactoring, let me know if you have more feedback on it. The ISR thread updates the ISR in ZK AND in memory cache, where as the become leader API is passed in the latest ISR read from ZK and it just has to update its cache with that state, on becoming a leader. So, I wrapped up the cache + Zk update in an updateLeaderAndISR API in KafkaServer. This will be used by the ISR expiration thread. The become leader should not be updating anything in Zk, it should just be reading from Zk and updating its cache. 3. The patch already does it at the end of the while loop. Or do you mean something else ? 5. Yes 6. The replica should be added to the ISR as soon as the fetch request is received by the server. I intended to add the replica to the ISR even if it might get ended up in the purgatory waiting for additional data. Ideally, would like to get rid of it from the readMessageSets() API. 7. Good point. Added that. Jun Regarding your high level comments - 1. Yes, I kept it simple in this patch, since the main goal is to get the message replication to work. Persisting all the HW in one file would definitely be a better approach and can be another JIRA 2. Idle partitions will cause one delete and one insert into the priority queue. It doesn't look like an issue, but could be resolved by adding a TTL to items in the queue. Idle partitions will expire from the queue and will be added back to the queue when the leader receives a produce request for that partition. However, I'd like to push that to later, since it does not improve correctness and does not look like a performance issue. Regarding your detailed review comments - 3.2 Good point. I think that is better too. 3.3 Yes, this is one of those “details” that I didn't include in the draft patch. 4.1 Removed it 4.2 Removed CUR 4.3 Kept it simple since replication factors that make sense in production are typically 3-5. 4.4 Yes, but when will it be used ? 5.1 Good catch ! 6. That exists in the patch. Did I miss any ? 7. Good point. Fixed it
          Hide
          Jun Rao added a comment -

          Some comments on the draft.

          High level:
          1. We should consider whether to have 1 HW checkpoint file per partition vs 1 HW checkpoint file for all partitions. The benefit of the latter is fewer file writes during checkpoint and fewer file reads during broker startup. Also, to avoid corrupting the checkpointed file, we should probably first write the file to a tmp file and rename to the actual checkpointed file. This probably can be done in a separate jira.

          2. The benefit of using an ISRExpirationThread is that it's relatively simple since there is 1 thread doing all the ISR expiration. One drawback I can see is that idle partitions are still constantly checked by the thread. This may or may not be a big concern.

          Low level:
          3. KafkaApis:
          3.1 Agreed with #6 in Prashanth's comment. Probably don't need to call maybeAddReplicaToISR directly from handlFetchRequest.
          3.2 A subtle issue is that we should probably wait until a (replica) fetch request is successful before updating the follower replica's LEO. This is because during an unclean failover (no live brokers in ISR), the offset of the first fetch request from a follower may not be valid.
          3.3 We need to update ISR in ZK and in memory atomically since the ISR can be expanded and shrunk from different threads.

          4. Partition:
          4.1 We probably don't need to add reassignedReplicas in the patch and can add it later when we get to kafka-42, if necessary.
          4.2 We probably don't need both catchUpReplicas and assignedReplicas since we can always derive one from another together with ISR.
          4.3 Do we need to maintain a HashMap of <replica_id., Replica>, instead of a set of replicas for faster lookup? This may not be a big deal since the replica set is small.
          4.4 Should we keep highWatermarkUpdateTime in Log where the HW is stored?

          5. Replica:
          5.1 leo(), if log is present, we should return l.leo not l.getHighwaterMark.

          6. KafkaConfig: All follower related properties should be probably be prefixed with "follower".

          7. Log:
          7.1 recoverUptoLastCheckpointedHW(): if there are k+1 log segment files need to be truncated, we should delete the last k and truncate the first one.

          Show
          Jun Rao added a comment - Some comments on the draft. High level: 1. We should consider whether to have 1 HW checkpoint file per partition vs 1 HW checkpoint file for all partitions. The benefit of the latter is fewer file writes during checkpoint and fewer file reads during broker startup. Also, to avoid corrupting the checkpointed file, we should probably first write the file to a tmp file and rename to the actual checkpointed file. This probably can be done in a separate jira. 2. The benefit of using an ISRExpirationThread is that it's relatively simple since there is 1 thread doing all the ISR expiration. One drawback I can see is that idle partitions are still constantly checked by the thread. This may or may not be a big concern. Low level: 3. KafkaApis: 3.1 Agreed with #6 in Prashanth's comment. Probably don't need to call maybeAddReplicaToISR directly from handlFetchRequest. 3.2 A subtle issue is that we should probably wait until a (replica) fetch request is successful before updating the follower replica's LEO. This is because during an unclean failover (no live brokers in ISR), the offset of the first fetch request from a follower may not be valid. 3.3 We need to update ISR in ZK and in memory atomically since the ISR can be expanded and shrunk from different threads. 4. Partition: 4.1 We probably don't need to add reassignedReplicas in the patch and can add it later when we get to kafka-42, if necessary. 4.2 We probably don't need both catchUpReplicas and assignedReplicas since we can always derive one from another together with ISR. 4.3 Do we need to maintain a HashMap of <replica_id., Replica>, instead of a set of replicas for faster lookup? This may not be a big deal since the replica set is small. 4.4 Should we keep highWatermarkUpdateTime in Log where the HW is stored? 5. Replica: 5.1 leo(), if log is present, we should return l.leo not l.getHighwaterMark. 6. KafkaConfig: All follower related properties should be probably be prefixed with "follower". 7. Log: 7.1 recoverUptoLastCheckpointedHW(): if there are k+1 log segment files need to be truncated, we should delete the last k and truncate the first one.
          Hide
          Neha Narkhede added a comment -

          Prashanth,
          Thanks for reviewing the patch, in detail. Like I mentioned earlier, this is just a draft patch and will clearly miss some details like you've pointed out. I'm looking more for high level feedback on data structures used, any ideas for refactoring etc.

          Show
          Neha Narkhede added a comment - Prashanth, Thanks for reviewing the patch, in detail. Like I mentioned earlier, this is just a draft patch and will clearly miss some details like you've pointed out. I'm looking more for high level feedback on data structures used, any ideas for refactoring etc.
          Hide
          Prashanth Menon added a comment -

          I can comment on point 5, related to KAFKA-302. After a first pass this is what I've got, let me know what you think:

          1. In KafkaZooKeeper.leaderElection, if the replica isn't elected as the leader, it should issue a becomeFollower after reading who became the leader from ZK.
          2. In KafkaServer.becomeLeader, the leader should probably update the ISR and CUR list by performing the same type of logic as the ISRExpirationThread does. If the intention was to rely on the ISRET to perform this asynchronously, I think it'll need to be modified to update the partition's CUR list along with the ISR.
          3. In ISRExpirationThread, you'll need to add the slowest partition back into the queue in every case.
          4. It seems like there are two ways to update a leader replica's HW, either through Replica.hw itself or Partition.leaderHW. To avoid confusion, can we simplify and provide only one API through which all clients to perform this? The latter seems to do the same thing but just update the hw update time.
          5. Can the leo and hw methods in Replica make use of the isLocal method? The logic is a little more clear this way, IMO.
          6. In KafkaApis.handleFetchRequest, it looks the maybeAddReplicaToISR call is unncessary and is rolled into readMessageSets? Any reason we need it there?
          7. KafkaApis.readMessageSets should probably verify that the leader for the partition exists on the broker.

          In general, I'm not entirely sold on the ISRExpirationThread. From my point of view, there is a function that, given a partition, determines whether its ISR/CUR list needs to be updated in-memory and in ZK. Right now, there is a single thread that uses a heap to pick off the partitions with the oldest update times, waits for expiry if necessary, then updates accordingly. I'm wondering if it's possible instead to leverage a scheduled executor that appropriately schedules the execution of the above function on a given partition based on the same criteria (the partition's HW updated time); when the task actually executes, it's possible the hw would have moved, making the task an no-op. The benefit there is simplicity, added concurrency and a slightly more accurate/real-time reflection of the ISR list in ZK meaning a possible reduction in message loss during leadership changes?

          Show
          Prashanth Menon added a comment - I can comment on point 5, related to KAFKA-302 . After a first pass this is what I've got, let me know what you think: 1. In KafkaZooKeeper.leaderElection, if the replica isn't elected as the leader, it should issue a becomeFollower after reading who became the leader from ZK. 2. In KafkaServer.becomeLeader, the leader should probably update the ISR and CUR list by performing the same type of logic as the ISRExpirationThread does. If the intention was to rely on the ISRET to perform this asynchronously, I think it'll need to be modified to update the partition's CUR list along with the ISR. 3. In ISRExpirationThread, you'll need to add the slowest partition back into the queue in every case. 4. It seems like there are two ways to update a leader replica's HW, either through Replica.hw itself or Partition.leaderHW. To avoid confusion, can we simplify and provide only one API through which all clients to perform this? The latter seems to do the same thing but just update the hw update time. 5. Can the leo and hw methods in Replica make use of the isLocal method? The logic is a little more clear this way, IMO. 6. In KafkaApis.handleFetchRequest, it looks the maybeAddReplicaToISR call is unncessary and is rolled into readMessageSets? Any reason we need it there? 7. KafkaApis.readMessageSets should probably verify that the leader for the partition exists on the broker. In general, I'm not entirely sold on the ISRExpirationThread. From my point of view, there is a function that, given a partition, determines whether its ISR/CUR list needs to be updated in-memory and in ZK. Right now, there is a single thread that uses a heap to pick off the partitions with the oldest update times, waits for expiry if necessary, then updates accordingly. I'm wondering if it's possible instead to leverage a scheduled executor that appropriately schedules the execution of the above function on a given partition based on the same criteria (the partition's HW updated time); when the task actually executes, it's possible the hw would have moved, making the task an no-op. The benefit there is simplicity, added concurrency and a slightly more accurate/real-time reflection of the ISR list in ZK meaning a possible reduction in message loss during leadership changes?
          Hide
          Neha Narkhede added a comment -

          Attaching a draft patch for message replication. This is just to give a high level overview of the changes involved and is by no means ready to be committed. So no need for a detailed review. There are probably a few bugs lurking around. Since the changes are pretty significant, I was hoping to get some early feedback.

          1. Added ReplicaFetcherThread that reads data from the leader and appends to local replica log

          2. Added highwatermark maintenance at the leader and the follower. The highwatermark is checkpointed in the partition directory in a file named highwatermark.

          3. Added ISR maintenance logic on the leader. This involves possibly expanding the ISR while handling a fetch request from a follower.

          4. Also added ISRExpirationThread that tracks the highwatermark update time for all partitions that the leader owns and shrinks it if (hw update time + keep.in.sync.time.ms) < current time.

          5. Note that to get this patch going, I had to put in code to cover KAFKA-302. I would encourage a more detailed review for the becomeLeader() and becomeFollower() APIs. We would either like to check it in from this patch, or if Prashanth has some patch, review that one too.

          I will probably add v1 patch with unit tests early next week.

          Also, I would like to check this in parts, if possible. Starting with probably KAFKA-302, then the actual message replication logic. But that is open for discussion as well.

          Show
          Neha Narkhede added a comment - Attaching a draft patch for message replication. This is just to give a high level overview of the changes involved and is by no means ready to be committed. So no need for a detailed review. There are probably a few bugs lurking around. Since the changes are pretty significant, I was hoping to get some early feedback. 1. Added ReplicaFetcherThread that reads data from the leader and appends to local replica log 2. Added highwatermark maintenance at the leader and the follower. The highwatermark is checkpointed in the partition directory in a file named highwatermark. 3. Added ISR maintenance logic on the leader. This involves possibly expanding the ISR while handling a fetch request from a follower. 4. Also added ISRExpirationThread that tracks the highwatermark update time for all partitions that the leader owns and shrinks it if (hw update time + keep.in.sync.time.ms) < current time. 5. Note that to get this patch going, I had to put in code to cover KAFKA-302 . I would encourage a more detailed review for the becomeLeader() and becomeFollower() APIs. We would either like to check it in from this patch, or if Prashanth has some patch, review that one too. I will probably add v1 patch with unit tests early next week. Also, I would like to check this in parts, if possible. Starting with probably KAFKA-302 , then the actual message replication logic. But that is open for discussion as well.
          Hide
          Neha Narkhede added a comment -

          Started work on this, will upload a patch, once KAFKA-45 is resolved

          Show
          Neha Narkhede added a comment - Started work on this, will upload a patch, once KAFKA-45 is resolved
          Hide
          Neha Narkhede added a comment -

          >> Should this be broken down into two sub tasks: One for the commiting thread on the leader and one for the fetcher thread on the follower?

          Lets not worry about that for now. I'll take care of this JIRA, once we resolve the other JIRAs. This one has many other dependencies.

          Show
          Neha Narkhede added a comment - >> Should this be broken down into two sub tasks: One for the commiting thread on the leader and one for the fetcher thread on the follower? Lets not worry about that for now. I'll take care of this JIRA, once we resolve the other JIRAs. This one has many other dependencies.
          Hide
          Jun Rao added a comment -

          Some notes on the implementation:

          1. If required_acks for a produce request is not 0 or 1, the commit thread will put the request in a DelayedQueue and adds it to a watch list for that topic/partition (similar implementation as long poll in kafka-48).
          2. There will be an "offset watcher" per topic/partition. The watcher fires everytime a follower in the in-sync set advances the offset. Every time the watcher fires, it checks from the head of watcher list and see if any produce request can be satisfied on this partition. If so, marks this partition as satisfied, if all partitions of the produce requests are satisfied, dequeue the request an send back the ack.
          3. How to add/delete follower from in-sync set? We can run a background insync-check thread that does the following:
          for each topic/partition
          get and save the offset of each partition of the leader replica
          wait for KeepInSyncTime
          check if any insync replica hasn't caught up to the saved offset, if so, drop it out of insync set (need to fire the corresponding offset watcher)
          check if any replica (not in insync set) has caught up to the saved offset, if so, add it to insync set.

          If we follow this approach, we can have 2 subtasks that implement "offset watcher" and insync-check thread. We can also have 1 separate subtask for the FetcherThread and another for the commit thread.

          Show
          Jun Rao added a comment - Some notes on the implementation: 1. If required_acks for a produce request is not 0 or 1, the commit thread will put the request in a DelayedQueue and adds it to a watch list for that topic/partition (similar implementation as long poll in kafka-48). 2. There will be an "offset watcher" per topic/partition. The watcher fires everytime a follower in the in-sync set advances the offset. Every time the watcher fires, it checks from the head of watcher list and see if any produce request can be satisfied on this partition. If so, marks this partition as satisfied, if all partitions of the produce requests are satisfied, dequeue the request an send back the ack. 3. How to add/delete follower from in-sync set? We can run a background insync-check thread that does the following: for each topic/partition get and save the offset of each partition of the leader replica wait for KeepInSyncTime check if any insync replica hasn't caught up to the saved offset, if so, drop it out of insync set (need to fire the corresponding offset watcher) check if any replica (not in insync set) has caught up to the saved offset, if so, add it to insync set. If we follow this approach, we can have 2 subtasks that implement "offset watcher" and insync-check thread. We can also have 1 separate subtask for the FetcherThread and another for the commit thread.
          Hide
          Prashanth Menon added a comment -

          Should this be broken down into two sub tasks: One for the commiting thread on the leader and one for the fetcher thread on the follower?

          Show
          Prashanth Menon added a comment - Should this be broken down into two sub tasks: One for the commiting thread on the leader and one for the fetcher thread on the follower?

            People

            • Assignee:
              Neha Narkhede
              Reporter:
              Jun Rao
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development