Here is the scenario to reproduce the issue
- partition discovery is disabled
- open method throws an exception (e.g. when broker SSL authorization denies request)
In this scenario, run method won't be executed. As a result, partitionDiscoverer.close() won't be called. that caused the connection leak, because KafkaConsumer is initialized but not closed. That has caused outage that brought down our Kafka cluster, when a high-parallelism job got into a restart/failure loop.