From dd919345ffbb84e177ba54240cc4182996439c07 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 23 Jun 2015 09:35:13 -0700 Subject: [PATCH 1/2] KAFKA-2168; make refcount in KafkaConsumer an AtomicInteger --- .../main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 9be8fbc..99c7abe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -332,6 +332,7 @@ import static org.apache.kafka.common.utils.Utils.min; * } * } * + * // Shutdown hook which can be called from a separate thread * public void shutdown() { * closed.set(true); * consumer.wakeup(); @@ -417,7 +418,7 @@ public class KafkaConsumer implements Consumer { // and is used to prevent multi-threaded access private final AtomicReference currentThread = new AtomicReference(); // refcount is used to allow reentrant access by the thread who has acquired currentThread - private int refcount = 0; // reference count for reentrant access + private final AtomicInteger refcount = new AtomicInteger(0); // TODO: This timeout controls how long we should wait before retrying a request. We should be able // to leverage the work of KAFKA-2120 to get this value from configuration. @@ -1355,14 +1356,14 @@ public class KafkaConsumer implements Consumer { Long threadId = Thread.currentThread().getId(); if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId)) throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); - refcount++; + refcount.incrementAndGet(); } /** * Release the light lock protecting the consumer from multi-threaded access. */ private void release() { - if (--refcount == 0) + if (refcount.decrementAndGet() == 0) currentThread.set(null); } } -- 2.3.2 (Apple Git-55) From 84f38303d7527ffe60654aade995026234a1b055 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 30 Jun 2015 10:53:39 -0700 Subject: [PATCH 2/2] KAFKA-2168; minor fixes --- .../org/apache/kafka/clients/consumer/KafkaConsumer.java | 10 ++++++---- .../scala/integration/kafka/api/ConsumerBounceTest.scala | 14 ++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 99c7abe..1f0e515 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -42,6 +42,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -796,7 +797,7 @@ public class KafkaConsumer implements Consumer { * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. *

- * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails. + * A non-blocking commit will attempt to commit offsets asynchronously. No error will be thrown if the commit fails. * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until * the commit succeeds. * @@ -833,7 +834,9 @@ public class KafkaConsumer implements Consumer { public void commit(CommitType commitType) { acquire(); try { - commit(this.subscriptions.allConsumed(), commitType); + // Need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance) + Map allConsumed = new HashMap(this.subscriptions.allConsumed()); + commit(allConsumed, commitType); } finally { release(); } @@ -979,10 +982,9 @@ public class KafkaConsumer implements Consumer { @Override public void close() { - if (closed) return; - acquire(); try { + if (closed) return; close(false); } finally { release(); diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index f56096b..b0750fa 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -14,14 +14,10 @@ package kafka.api import kafka.server.KafkaConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.consumer.CommitType +import kafka.utils.{Logging, ShutdownableThread, TestUtils} +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition - -import kafka.utils.{ShutdownableThread, TestUtils, Logging} - import org.junit.Assert._ import scala.collection.JavaConversions._ @@ -85,9 +81,11 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { assertEquals(consumed.toLong, record.offset()) consumed += 1 } + consumer.commit(CommitType.SYNC) + assertEquals(consumer.position(tp), consumer.committed(tp)) - if (consumed == numRecords) { + if (consumer.position(tp) == numRecords) { consumer.seekToBeginning() consumed = 0 } -- 2.3.2 (Apple Git-55)