From 770773fb78431e32ee5c9ac74e1877b7f113ed71 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 6 Jul 2015 23:28:46 -0500 Subject: [PATCH] Make closed flag atomic on consumer --- .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 1f0e515..7a66573 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -412,7 +412,7 @@ public class KafkaConsumer implements Consumer { private final long autoCommitIntervalMs; private final ConsumerRebalanceCallback rebalanceCallback; private long lastCommitAttemptMs; - private boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean wakeup = new AtomicBoolean(false); // currentThread holds the threadId of the current thread accessing KafkaConsumer @@ -984,7 +984,7 @@ public class KafkaConsumer implements Consumer { public void close() { acquire(); try { - if (closed) return; + if (closed.get()) return; close(false); } finally { release(); @@ -1004,7 +1004,7 @@ public class KafkaConsumer implements Consumer { private void close(boolean swallowException) { log.trace("Closing the Kafka consumer."); AtomicReference firstException = new AtomicReference(); - this.closed = true; + this.closed.set(true); ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); ClientUtils.closeQuietly(client, "consumer network client", firstException); ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException); @@ -1342,7 +1342,7 @@ public class KafkaConsumer implements Consumer { * Check that the consumer hasn't been closed. */ private void ensureNotClosed() { - if (this.closed) + if (this.closed.get()) throw new IllegalStateException("This consumer has already been closed."); } -- 2.4.4