Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
3.6.1
-
None
-
None
Description
Based on custom predicate, our application is filtering messages during mirroring.
When the HasHeader:test method of the predicate returns true (when it has to drop messages from mirroring), it encounters below exceptions.
However when it returns false (the messages are forwarded for mirroring), it works fine without OOM.
Note: This issue doesn't occur with the same load in version 2.8.0.
JVM heap size increased till 15G, but still OOM hits.
Exception stacktraces:
line java.lang.OutOfMemoryError: Java heap space line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289) line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252) line at org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270) line at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) line at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) line at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) line at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) line at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) line at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262) line at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) line at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) line at org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153) line at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469) line at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357) line at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) line at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) line at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) line at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) line at org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown Source) line at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) line at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) line at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) line at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) line at java.base/java.lang.Thread.run(Thread.java:840)
line java.lang.OutOfMemoryError: Java heap space line at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)