Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.8.0
-
None
-
None
Description
TL;DR, A Kafka client produce latency issue is identified caused by synchronized lock contention of metadata cache read/write in the native kafka producer.
Trigger Condition: A producer need to produce to large number of topics. such as in kafka rest-proxy
What is producer metadata cache
Kafka producer maintains a in-memory copy of cluster metadata, and it avoided fetch metadata every time when produce message to reduce latency
What’s the synchronized lock contention problem
Kafka producer metadata cache is a mutable object, read/write are isolated by a synchronized lock. Which means when the metadata cache is being updated, all read requests are blocked.
Topic metadata expiration frequency increase liner with number of topics. In a kafka cluster with large number of topic partitions, topic metadata expiration and refresh triggers high frequent metadata update. When read operation blocked by update, producer threads are blocked and caused high produce latency issue.
Proposed solution
TL;DR Optimize performance of metadata cache read operation of native kafka producer with copy-on-write strategy
What is copy-on-write strategy
It’s a solution to reduce synchronized lock contention by making the object immutable, and always create a new instance when updating, but since the object is immutable, read operation will be free from waiting, thus produce latency reduced significantly
Besides performance, it can also make the metadata cache immutable from unexpected modification, reduce occurrence of code bugs due to incorrect synchronization
Test result:
Environment: Kafka-rest-proxy
Client version: 2.8.0
Number of topic partitions: 250k
test result show 90%+ latency reduction on test cluster
P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part show latency after the improvement, lower part show before improvement)
Dump show details of the problem
Threads acquiring lock
Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ 0x00007f77d70121a0 ]
Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ 0x00007f77d70121a0 ]
...
at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111)
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019)
at org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144)
at io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39)
at io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117)
Threads hold the lock
kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ 0x00007f77d70121a0 ]
at java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655)
at java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484)
at org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162)
at org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152)
at org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177)
at org.apache.kafka.clients.MetadataCache$$Lambda$695/0x00007f75da3ddcb0.apply(Unknown Source)
at java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
at org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178)
at java.lang.Thread.run(java.base@11.0.18/Thread.java:829)
Attachments
Attachments
Issue Links
- duplicates
-
KAFKA-16226 Java client: Performance regression in Trogdor benchmark with high partition counts
- Resolved
- links to