Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.4.0
-
None
-
None
Description
After close() called on a KafkaConsumer some registered MBeans are not unregistered causing leak.
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import java.lang.management.ManagementFactory; import java.util.HashMap; import java.util.Map; import javax.management.MBeanServer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; public class Leaker { private static String bootstrapServers = "hostname:9092"; public static void main(String[] args) throws InterruptedException { MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); Map<String, Object> props = new HashMap<>(); props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); int beans = mBeanServer.getMBeanCount(); for (int i = 0; i < 100; i++) { KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); consumer.close(); } int newBeans = mBeanServer.getMBeanCount(); System.out.println("\nbeans delta: " + (newBeans - beans)); } }