From 1631016201759f1d17b13044dbf3b150668c0ec0 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 6 Jul 2015 23:53:49 -0500 Subject: [PATCH] Use an atomic long for the 'light lock' opposed to an atomic reference. --- .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 1f0e515..b213fcf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -51,6 +51,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.kafka.common.utils.Utils.min; @@ -395,6 +396,7 @@ public class KafkaConsumer implements Consumer { private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); private static final long EARLIEST_OFFSET_TIMESTAMP = -2L; private static final long LATEST_OFFSET_TIMESTAMP = -1L; + private static final long NO_CURRENT_THREAD = -1L; private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private final Coordinator coordinator; @@ -417,7 +419,7 @@ public class KafkaConsumer implements Consumer { // currentThread holds the threadId of the current thread accessing KafkaConsumer // and is used to prevent multi-threaded access - private final AtomicReference currentThread = new AtomicReference(); + private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); // refcount is used to allow reentrant access by the thread who has acquired currentThread private final AtomicInteger refcount = new AtomicInteger(0); @@ -1355,8 +1357,8 @@ public class KafkaConsumer implements Consumer { */ private void acquire() { ensureNotClosed(); - Long threadId = Thread.currentThread().getId(); - if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId)) + long threadId = Thread.currentThread().getId(); + if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); refcount.incrementAndGet(); } @@ -1366,6 +1368,6 @@ public class KafkaConsumer implements Consumer { */ private void release() { if (refcount.decrementAndGet() == 0) - currentThread.set(null); + currentThread.set(NO_CURRENT_THREAD); } } -- 2.4.4