Description
While running SAMZA-394, I discovered a bug in KafkaSystemConsumer that causes it to stop consuming under failure scenarios. This does not cause data loss, but can wedge a container until it's restarted.
The trigger appears to be when a BrokerProxy fetches from a broker that's still coming up, and hasn't yet claimed ownership for a TopicAndPartition. When the fetch fails, the BrokerProxy abdicate()s the TopicAndPartition, and KafkaSystemConsumer tries to refresh to get the leader. If there is no leader, the KafkaSystemConsumer drops the SSP. This happens in KafkaSystemConsumer.refreshBrokers.