This patch changes the way a Kafka server handles metadata requests. Metadata requests are currently a bottleneck in the system since the server reads several paths from zookeeper to serve one metadata request. The latency is also proportional to the number of topics in a metadata request. A faster way to serve metadata requests is through the controller. The reason is the controller makes all state change decisions for the cluster, so it has a cache with the latest leadership information. The patch uses the following algorithm to serve metadata requests -
1. If broker is controller, read leadership information from cache and send the response
2. If broker is not controller, instantiate a sync producer to forward metadata request to the controller
3. If broker is not controller and a controller is unavailable, send ControllerNotAvailable error code back to the client
There are some things to note here -
1. How does a broker know who the current controller and its host/port ? If we read from zookeeper, then that makes metadata requests slow, although less slower than before. However, upon controller failover, the controller sends a LeaderAndIsr request to all* brokers. So when a broker receives any state change request from the controller, it stores the controller's host/port in cache from zookeeper. Since state change requests are rare, this is ok
Now there is a corner case we need to take care of. If a new broker is brought up and it doesn't have any partitions assigned to it, it won't receive any LeaderAndIsr request in the current code base. This patch takes care of this by changing controller to always send leader and isr request to newly restarted brokers, even if there are no partitions in the request.
2. What timeout should be used when a broker wants to forward a metadata request to the controller ?
Since there isn't a way to know the timeout specified by the original metadata request, we can potentially set the socket timeout for the forwarded request to be Integer.MAX. This will ensure that the forwarded request will not prematurely time out. However, we need to ensure that the controller always sends a response back to the broker OR closes the socket. This will prevent the broker's I/O thread from waiting indefinitely for a response from the controller. This also requires the controller to not block when serving metadata from its cache OR it will block some other broker's I/O thread, which is bad.
3. On the controller, should it acquire a lock when reading its cache to serve a metadata request ?
This is not required and is potentially dangerous. It is not required since even if the controller's cache information is undergoing change and we send stale information to the client, it will get an error and retry. Eventually, the cache will be consistent and the accurate metadata will be sent to the client. This is also ok since changes to the controller's cache are relatively rare so chances of stale metadata are low.
4. Do we need to make the timeout for the forwarded metadata request configurable ?
Ideally no, except for unit tests. The reason is unit tests involve shutting down kafka brokers. However, when a broker is shut down in a unit test, it does not close the socket connections associated with that broker. The impact of that is if a controller broker is shut down, it will not respond to few forwarded metadata requests and it will not close the socket. That leads to some other broker indefinitely wait on receiving a response from the controller. This doesn't happen in non unit test environments, since if the broker is shutdown or fails, the processes releases all socket connections associated with it. So the other broker gets a broken pipe exception and doesn't end up waiting forever