1. Refactoring of current procedural controller code to functional style
1.1. Using switch case statements instead of if-else
1.2. Avoid explicit return statements from try-catch blocks
1.3. Use Option instead of null. This is very important, there were several places that handled only the non-null case, basically making way for NPE's in error cases.
1.4. Rename variables in all capital letters to camel case
1.5. Fixed logging since a lot of log statements were at info and were somewhat unclear
1.6. Fixed indentation, some places uses 4 spaces, others used 2 spaces.
1.7.1 Refactored readDataMaybeNull to return an Option instead of null. This allows all usages of that API that query an ephemeral path to handle the case when the value in zookeeper does not exist anymore
1.7.2. Refactored getBrokerInfoFromIds to handle only a single broker id. The reason is that most usages of that API used it to query for a single broker id.
1.8 Many places in the code did not wrap the lines correctly. Fixed this as much as I could.
2.1/ Renamed deliverLeaderAndISRFromZookeeper to readLeaderAndIsrFromZookeeper
2.2 There is a race condition in KafkaController where it doesn't synchronize access to the controller's data structures while creating a new session. Basically, controllerRegisterOrFailover is a private API that modifies almost all the internal controller data structures that require synchronization. Since this API is not synchronizing on the controller lock, all usages of this API need to do this correctly. Fixed handleNewSession to synchronize the controllerRegisterOrFailover API, since that can be concurrently executed with the startup procedure.
2.3 In onBrokerChange, defaulting to empty list instead of null. This lets us avoid null checks
2.4 Refactored onBrokerChange() API and moved the leader election logic to a separate API. After starting on this path, I figured that this code is going to need a major refactoring that I'd like to fix in a separate patch. Filed
KAFKA-499 to cover that. For now, we can keep this although it will look complete only after KAFKA-499 is in.
2.5 onBrokerChange() and initLeaders() APIs are very similar and duplicate quite a lot of code. I refactored onBrokerChange() but realized later that initLeaders would have to refactored too. To keep the changes in this patch small enough, I will fix this as part of
3. ControllerChannelManager & KafkaController
3.1 There are 3 types of information that the controller maintains per broker - request thread, message queue and socket channel. Currently, they are maintained in 3 separate variables and we need to ensure all 3 are synchronized correctly. To simplify this, I created a case class to wrap this state in one object.
3.2 There is a lock object whose purpose is to synchronize access to the broker cache. Currently, the lock doesn't seem to protect all access to these caches, which looks like a synchronization bug. Fixed this to have startup API synchronize access to the broker cache.
3.3. The addBroker logic was duplicated in 2 places. Refactored the constructors to add an auxilary constructor to call the addBroker API that handles creating controller state info for a new broker and the appropriate synchronization. Currently, the constructor duplicates code to add a new broker, probably since it is not right to invoke an API from inside the primary constructor
3.4 There are 2 ways to remove a broker from the controller's broker cache - during shutdown of the channel manager or when a broker change listener fires. Since removeBroker is a public API, it is synchronized using the brokerLock. The shutdown API calls removeBroker internally and ends up acquiring and releasing the lock multiple times. Ideally, it is sufficient to acquire the brokerLock just once for the entire shutdown API. Refactored to move the common removeBroker logic to a private API that doesn't synchronize on the brokerLock. Changed removeBroker to acquire lock and call removeExistingBroker, changed shutdown to acquire lock once and call removeExistingBroker.
4. Renamed LeaderAndISR to LeaderAndIsr
5. Renamed BrokerNotExistException to BrokerNotAvailableException to remain consistent with other exceptions of the same type (LeaderNotAvailableException, ReplicaNotAvailableException)
NOTE: Apache svn seems to hang at the time of uploading this patch. It will apply cleanly on revision 1380945