Thanks for the patch. Overall, a very encouraging patch given the complexity of this jira. Some comments:
From previous reviews:
Were 4.1 and 4.2 addressed in the patch? I still see CUR and reassignedReplicas.
For 4.4, I think highWatermarkUpdateTime can be used as described in 15.2 below.
For 6, I meant that all local variable names should also be prefixed with follower.
New review comments:
11.1 handleFetchRequest(): if the leader of one partition is not on this broker, we reject the whole request. Ideally, we should just send the error code for that partition in the response and fulfill the rest of the request.
11.2 handleFetchRequest() and readMessageSets(): If the leader is not on this broker, we should probably return a new type of error like NotLeaderException, instead of using InvalidPartionException or throwing IllegalStateException.
11.3 readMessageSets(): add a comment of what -1 means for replicaId
12.1 remove unused imports
12.2 maybeIncrementLeaderHW(): if(newHw < oldHw) should be if(newHw > oldHw)
12.3 We will need to either synchronize the methods in this class or use ConcurrentHashMap for allReplicas since allReplicas can be read and updated concurrently.
13.1 hw(): to be consistent, we should probably throw IllegalStateException, instead of InvalidPartitionException.
14.1 There are a couple of TODOs. Will they be addressed in this jira or separate jiras?
15.1 It seems that when the time expires, we always update the ISR. We should only update ISR if it actually shrinks.
15.2 Currently, we take a replica out of ISR if its LEO is less than leaderHW after keepInSync time. We probably should use the following condition:
leaderHW - r.leo > keepInSyncBytes || currentTime - r.highWatermarkUpdateTime > keepInSyncTime
The first condition handles a slow follower and the second condition handles a stuck follower.
15.3 I think we can potentially get rid of the inner while loop by putting all the logic when time expires in a if statement and the awaitUtil part in the else clause of the if statement.
15.4 Also, instead of using a priority queue and keep adding and deleting partitions into the queue, would it be simpler to have the thread just check the isInsyncCondition for each partition every keepInSyncTime?
16. LogDisk: recoverUptoLastCheckpointedHW():
16.1 The second condition in
segments.view.find(segment => lastKnownHW >= segment.start && lastKnownHW < segment.size)
seems incorrect. It seems that you want to use "lastKnownHW < segment.messageSet.getEndOffset"
16.2 The files of all deleted segments should be deleted like that in LogManager.cleanupExpiredSegments().
17.1 There is no need to keep testEmptyLogs(), since we have a test that covers fetching from a non-existing topic using SimpleConsumer.
18.1 testConsumerNotExistTopic(): we probably shouldn't create the topic in this case.
19.1 testZKSendToNewTopic(): Which should fix the comment that says "Available partition ids should be 0, 1, 2 and 3" since there is only 1 partition created.
20.1 Since the test is already using in-memory log, we can remove TODO in testReplicaFetcherThread().
20.2 testReplicaFetcherThreadI(): Instead of sleeping and then checking log.get.getLogEndOffset, could we create a utility method that keeps checking until LEO reaches certain value up to a certain max wait time? Maybe we should make a more general util that waits up to a certain amount of time until a condition is satisfied.