From 2e2562d8518e163d6f199c9b90cbafdda2e7fb97 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 8 Jan 2015 14:44:59 -0800 Subject: [PATCH 1/4] Locally commit minor fix --- .../consumer/ZookeeperConsumerConnector.scala | 57 +++++++++++----------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 191a867..5487259 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -613,36 +613,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def syncedRebalance() { rebalanceLock synchronized { rebalanceTimer.time { - if(isShuttingDown.get()) { - return - } else { - for (i <- 0 until config.rebalanceMaxRetries) { - info("begin rebalancing consumer " + consumerIdString + " try #" + i) - var done = false - var cluster: Cluster = null - try { - cluster = getCluster(zkClient) - done = rebalance(cluster) - } catch { - case e: Throwable => - /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. - * For example, a ZK node can disappear between the time we get all children and the time we try to get - * the value of a child. Just let this go since another rebalance will be triggered. - **/ - info("exception during rebalance ", e) - } - info("end rebalancing consumer " + consumerIdString + " try #" + i) - if (done) { - return - } else { - /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should - * clear the cache */ - info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") - } - // stop all fetchers and clear all the queues to avoid data duplication - closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) - Thread.sleep(config.rebalanceBackoffMs) + for (i <- 0 until config.rebalanceMaxRetries) { + if(isShuttingDown.get()) { + return + } + info("begin rebalancing consumer " + consumerIdString + " try #" + i) + var done = false + var cluster: Cluster = null + try { + cluster = getCluster(zkClient) + done = rebalance(cluster) + } catch { + case e: Throwable => + /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. + * For example, a ZK node can disappear between the time we get all children and the time we try to get + * the value of a child. Just let this go since another rebalance will be triggered. + **/ + info("exception during rebalance ", e) + } + info("end rebalancing consumer " + consumerIdString + " try #" + i) + if (done) { + return + } else { + /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should + * clear the cache */ + info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") } + // stop all fetchers and clear all the queues to avoid data duplication + closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) + Thread.sleep(config.rebalanceBackoffMs) } } } -- 1.7.12.4 From be2d81d8ef391d78e97782a80f1c50f4a905a30f Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 22 Jan 2015 14:30:42 -0800 Subject: [PATCH 2/4] Fixing KAFKA-1886. Forcing the SimpleConsumer to throw a ClosedByInterruptException if thrown and not retry --- .../main/scala/kafka/consumer/SimpleConsumer.scala | 3 +++ .../unit/kafka/integration/PrimitiveApiTest.scala | 25 ++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index e53ee51..c06f090 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -22,6 +22,7 @@ import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} import org.apache.kafka.common.utils.Utils._ +import java.nio.channels.ClosedByInterruptException /** * A consumer of kafka messages @@ -70,6 +71,8 @@ class SimpleConsumer(val host: String, blockingChannel.send(request) response = blockingChannel.receive() } catch { + case e : ClosedByInterruptException => + throw e case e : Throwable => info("Reconnect due to socket error: %s".format(e.toString)) // retry once diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index a5386a0..1246023 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -32,6 +32,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionExc import kafka.utils.{StaticPartitioner, TestUtils, Utils} import kafka.serializer.StringEncoder import java.util.Properties +import java.nio.channels.ClosedByInterruptException /** * End to end tests of the primitive apis against a local server @@ -213,6 +214,30 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) } + def testInterruptSimpleConsumer() { + val newTopic = "new-topic" + TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) + var exception : Throwable = null + val thread = new Thread { + override def run { + try + { + val fetchResponse = consumer.fetch(new FetchRequestBuilder().minBytes(100000).maxWait(3000).addFetch(newTopic, 0, 0, 10000).build()) + } + catch { + case e: Throwable =>{ + exception = e + } + } + } + } + thread.start() + Thread.sleep(1000) + thread.interrupt() + thread.join() + assertTrue("Exception thrown from SimpleConsumer should be an instance of ClosedByInterruptException", exception.isInstanceOf[ClosedByInterruptException]) + } + def testPipelinedProduceRequests() { val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0) createSimpleTopicsAndAwaitLeader(zkClient, topics.keys) -- 1.7.12.4 From 6bc2af21b61dcc681e877704cca466e54d2a94e9 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 2 Feb 2015 13:36:54 -0800 Subject: [PATCH 3/4] Reducing sleep in PrimitiveApiTest --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 1246023..93cc958 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -232,7 +232,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with } } thread.start() - Thread.sleep(1000) + Thread.sleep(100) thread.interrupt() thread.join() assertTrue("Exception thrown from SimpleConsumer should be an instance of ClosedByInterruptException", exception.isInstanceOf[ClosedByInterruptException]) -- 1.7.12.4 From eb1df7b3e2443cf3a8a00cd55a8722f743202b48 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 2 Feb 2015 13:42:10 -0800 Subject: [PATCH 4/4] Reducing sleep in PrimitiveApiTest --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 93cc958..08528db 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -214,6 +214,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) } + /** + * This test creates a blocking request using a SimpleConsumer to a topic that does not contain any data. + * This checks that the thread performing the fetch throws a ClosedByInterruptException if interrupted. + */ def testInterruptSimpleConsumer() { val newTopic = "new-topic" TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) -- 1.7.12.4