Thanks for the patch. Overall, the patch is pretty good and is well thought out. Some comments:
1.1 In handle(), I don't think we need to add the if test in the following statement. The reason is that a message could fail to be sent because the leader changes immediately after the previous metadata refresh. Normally, leaders are elected very quickly. So, it makes sense to refresh the metadata again.
1.2 In handle(), it seems that it's better to call the following code before dispatchSerializedData().
if (topicMetadataRefreshInterval >= 0 &&
SystemTime.milliseconds - lastTopicMetadataRefresh > topicMetadataRefreshInterval)
lastTopicMetadataRefresh = SystemTime.milliseconds
1.3 getPartition(): If none of the partitions is available, we should throw LeaderNotAvailableException, instead of UnknownTopicOrPartitionException.
2. DefaultPartitioner: Since key is not expected to be null, we should remove the code that deals with null key.
3. The consumer side logic is fine. The consumer rebalance is only triggered when there are changes in partitions, not when there are changes in the availability of the partition. The rebalance logic doesn't depend on a partition being available. If a partition is not available, ConsumerFetcherManager will keep refreshing metadata. If you have a replication factor of 1, you will need to set a larger refresh.leader.backoff.ms, if a broker is expected to go down for a long time.