From 0f2b2c1491fb723314ec7f68b13270b7a652ab3e Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 23 Jun 2015 09:35:13 -0700 Subject: [PATCH] KAFKA-2168; make refcount in KafkaConsumer an AtomicInteger --- .../main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 9be8fbc..99c7abe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -332,6 +332,7 @@ import static org.apache.kafka.common.utils.Utils.min; * } * } * + * // Shutdown hook which can be called from a separate thread * public void shutdown() { * closed.set(true); * consumer.wakeup(); @@ -417,7 +418,7 @@ public class KafkaConsumer implements Consumer { // and is used to prevent multi-threaded access private final AtomicReference currentThread = new AtomicReference(); // refcount is used to allow reentrant access by the thread who has acquired currentThread - private int refcount = 0; // reference count for reentrant access + private final AtomicInteger refcount = new AtomicInteger(0); // TODO: This timeout controls how long we should wait before retrying a request. We should be able // to leverage the work of KAFKA-2120 to get this value from configuration. @@ -1355,14 +1356,14 @@ public class KafkaConsumer implements Consumer { Long threadId = Thread.currentThread().getId(); if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId)) throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); - refcount++; + refcount.incrementAndGet(); } /** * Release the light lock protecting the consumer from multi-threaded access. */ private void release() { - if (--refcount == 0) + if (refcount.decrementAndGet() == 0) currentThread.set(null); } } -- 2.3.2 (Apple Git-55)