Currently if when brokers join/leave the cluster without any partition states changes, controller will send out UpdateMetadataRequests containing the states of all partitions to all brokers. But for existing brokers in the cluster, the metadata diff between controller and the broker should only be the "live_brokers" info. Only the brokers with empty metadata cache need the full UpdateMetadataRequest. Sending the full UpdateMetadataRequest to all brokers can place nonnegligible memory pressure on the controller side.
Let's say in total we have N brokers, M partitions in the cluster and we want to add 1 brand new broker in the cluster. With RF=2, the memory footprint per partition in the UpdateMetadataRequest is ~200 Bytes. In the current controller implementation, if each of the N RequestSendThreads serializes and sends out the UpdateMetadataRequest at roughly the same time (which is very likely the case), we will end up using (N+1)*M*200B. In a large kafka cluster, we can have:
This issue kind of hurts the scalability of a kafka cluster. KIP-380 and KAFKA-7186 also help to further reduce the controller memory footprint.
In terms of implementation, we can keep some in-memory state in the controller side to differentiate existing brokers and uninitialized brokers (e.g. brand new brokers) so that if there is no change in partition states, we only send out live brokers info to existing brokers.