KafkaConsumer is abstracted by AtlasKafkaConsumer. This is run using HookConsumer which is derived from kafka.utils.ShutdownableThread.
The ShutdownableThread manages the thread. It handles exceptions thrown in the doWork method and logs them.
Exception reported in the bug is thrown by KafkaConsumer is handed by HookConsumer. Exception is logged but the thread keeps running. In cases where Kafa is in irrecoverable state, this behavior ends up filling up logs and providing no value in return.
Let ShutdownableThread hand exceptions thrown by KafkaConsumer. Stop the thread if KafkaConsumer suffers from irrecoverable error. This will avoid the situation described in the problem section.