Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
1.15.1
-
None
-
AWS EMR
- Standard AWS EMR Cluster (1 master YARN node, 1 core node) - 2 vCore, 8 GB memory each
- JDK 11 Coretto
Kafka Consumer Config
acks = 1 batch.size = 16384 bootstrap.servers = [...] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = class software.amazon.msk.auth.iam.IAMClientCallbackHandler sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = AWS_MSK_IAM security.protocol = SASL_SSL security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 3600000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Flink Config
taskmanager.memory.process.size=3g taskmanager.memory.jvm-metaspace.size=512m taskmanager.numberOfTaskSlots=2 jobmanager.memory.process.size=3g jobmanager.memory.jvm-metaspace.size=256m jobmanager.web.address0.0.0.0 env.java.opts.all-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${FLINK_LOG_PREFIX}.hprof
AWS EMR Standard AWS EMR Cluster (1 master YARN node, 1 core node) - 2 vCore, 8 GB memory each JDK 11 Coretto Kafka Consumer Config acks = 1 batch.size = 16384 bootstrap.servers = [...] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = class software.amazon.msk.auth.iam.IAMClientCallbackHandler sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = AWS_MSK_IAM security.protocol = SASL_SSL security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 3600000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer Flink Config taskmanager.memory.process.size=3g taskmanager.memory.jvm-metaspace.size=512m taskmanager.numberOfTaskSlots=2 jobmanager.memory.process.size=3g jobmanager.memory.jvm-metaspace.size=256m jobmanager.web.address0.0.0.0 env.java.opts.all-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${FLINK_LOG_PREFIX}.hprof
Description
Hello!
I'm running a Flink application on AWS EMR which consumes from a Kafka Topic using the official Flink Kafka consumer. I'm running the application as a Flink batch job every 30 minutes and I see that the JobManager's metaspace is increasing every time I submit a new job and doesn't reduce once a job has finished executing. Eventually the metaspace overflows and I get a OutOfMemory metaspace exception. I've tried increasing the metaspace to 512m, but this just delays the problem - hence it's definitely a classloading leak.
I debugged the issue by creating a simple Flink application with a Kafka consumer only and the issue still occurred, hence I suppose the issue is somewhere in the Kafka consumer. Only other third party plugin I was using when doing so was the AWS IAM Kafka dependency (software.amazon.msk:aws-msk-iam-auth:1.1.5).
I also tried debugging the issue by generating a heap dump as described here ([https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leak|https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks),] but I wasn't really able to spot the origin of the memory leak. I can see references to org.apache.kafka.common.utils.AppInfoParser$AppInfo though:
I have attached the heap dump - if you need any more information from my set up feel free to ask, I can provide anything you need.
Many thanks in advance!