From 9cfee2a1cb70acb8529cb8aee58ea9b780d3e7cd Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 12 Nov 2014 20:43:37 -0800 Subject: [PATCH 1/2] rewrite the patch --- .../consumer/ZookeeperConsumerConnector.scala | 28 +++++++++- .../consumer/ConsumerRebalanceListener.java | 55 ++++++++++++++++++++ .../consumer/ZookeeperConsumerConnector.scala | 4 ++ .../consumer/ZookeeperConsumerConnectorTest.scala | 60 ++++++++++++++++++++++ 4 files changed, 145 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..484f7c9 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -28,6 +28,7 @@ import kafka.api._ import kafka.client.ClientUtils import kafka.cluster._ import kafka.common._ +import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.metrics._ import kafka.network.BlockingChannel import kafka.serializer._ @@ -102,6 +103,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val offsetsChannelLock = new Object private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null + private var consumerRebalanceListener: ConsumerRebalanceListener = null // useful for tracking migration of consumers to store offsets in kafka private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS) @@ -160,6 +162,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, wildcardStreamsHandler.streams } + def setConsumerRebalanceListener(listener: ConsumerRebalanceListener) { + consumerRebalanceListener = listener + } + private def createFetcher() { if (enableFetcher) fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient)) @@ -598,7 +604,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, var cluster: Cluster = null try { cluster = getCluster(zkClient) + if (consumerRebalanceListener != null) { + info("Calling beforeConsumserRebalance() from rebalance listener.") + consumerRebalanceListener.beforeConsumerRebalance() + } done = rebalance(cluster) + if (consumerRebalanceListener != null) { + info("Calling afterConsumerRebalance() from rebalance listener.") + consumerRebalanceListener.afterConsumerRebalance(done) + } } catch { case e: Throwable => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. @@ -646,9 +660,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * partitions in parallel. So, not stopping the fetchers leads to duplicate data. */ closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) - + if (consumerRebalanceListener != null) { + info("Calling afterStoppingFetcherThread() from rebalance listener.") + consumerRebalanceListener.afterStoppingFetcherThreads() + } releasePartitionOwnership(topicRegistry) - + if (consumerRebalanceListener != null) { + info("Calling afterReleasingPartitionOwnership() from rebalance listener.") + consumerRebalanceListener.afterReleasingPartitionOwnership() + } val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( @@ -685,6 +705,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } topicRegistry = currentTopicRegistry + if (consumerRebalanceListener != null) { + info("Calling afterPartitionReassignment() from rebalance listener.") + consumerRebalanceListener.afterPartitionReassignment() + } updateFetcher(cluster) true } else { diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java new file mode 100644 index 0000000..0614e72 --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi.consumer; + +/** + * This listener is used for execution of tasks defined by user when a consumer rebalance + * occurs in {@link kafka.consumer.ZookeeperConsumerConnector} + */ +public class ConsumerRebalanceListener { + + /** + * This method is called before stopping all the fetcher threads. + */ + public void beforeConsumerRebalance() {} + + /** + * This method is called after all the fetcher threads are stopped but before the + * ownership of partitions are released. Depending on whether auto offset commit is + * enabled or not, offsets may or may not have been committed. + */ + public void afterStoppingFetcherThreads() {} + + /** + * This method is called after all the partition owners has been release but before + * starting the partition reassignment. + */ + public void afterReleasingPartitionOwnership() {} + + /** + * This method is called after all the partition reassignment is done but before + * the fetcher threads start. + */ + public void afterPartitionReassignment() {} + + /** + * This method is called after the rebalance is finished. + */ + public void afterConsumerRebalance(boolean success) {} + +} diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 1f98db5..fd37c2a 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -115,6 +115,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, underlying.commitOffsets(retryOnFailure) } + def setConsumerRebalanceListener(consumerRebalanceListener: ConsumerRebalanceListener) { + underlying.setConsumerRebalanceListener(consumerRebalanceListener) + } + def shutdown() { underlying.shutdown } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index e1d8711..cc25270 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -19,6 +19,7 @@ package kafka.consumer import junit.framework.Assert._ import kafka.integration.KafkaServerTestHarness +import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.server._ import scala.collection._ import org.scalatest.junit.JUnit3Suite @@ -345,6 +346,36 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkClient.close() } + def testConsumerRebalanceListener() { + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) + val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) + // register consumer rebalance listener + val rebalanceListener1 = new TestConsumerRebalanceListener() + zkConsumerConnector1.setConsumerRebalanceListener(rebalanceListener1) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + // check if rebalance listener is fired + for (i <- 0 to 4) + assertEquals(1, rebalanceListener1.callingCounts(i)) + + val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) + val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) + // register consumer rebalance listener + val rebalanceListener2 = new TestConsumerRebalanceListener() + zkConsumerConnector1.setConsumerRebalanceListener(rebalanceListener2) + val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + + // check if rebalance listener is fired + for (i <- 0 to 4) { + assertEquals(2, rebalanceListener1.callingCounts(i)) + assertEquals(1, rebalanceListener2.callingCounts(i)) + } + + } + def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, @@ -420,4 +451,33 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String])) } + private class TestConsumerRebalanceListener extends ConsumerRebalanceListener { + val callingCounts: Array[Int] = new Array[Int](5) + var failureCount = 0 + + override def beforeConsumerRebalance() { + callingCounts(0) += 1 + } + + override def afterStoppingFetcherThreads() { + callingCounts(1) += 1 + } + + override def afterReleasingPartitionOwnership() { + callingCounts(2) += 1 + } + + override def afterPartitionReassignment() { + callingCounts(3) += 1 + } + + override def afterConsumerRebalance(success: Boolean) { + if (success) + callingCounts(4) += 1 + else + failureCount += 1 + } + + } + } -- 1.8.3.4 (Apple Git-47) From 9f5c183b1955fb3e26c2aef85f6c67d22afc49fd Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 13 Nov 2014 17:10:07 -0800 Subject: [PATCH 2/2] Added new unit test. --- .../consumer/ZookeeperConsumerConnectorTest.scala | 34 ++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index cc25270..b6aa2df 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -347,32 +347,42 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } def testConsumerRebalanceListener() { + // Send messages to create topic + sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) + sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) - // register consumer rebalance listener + // Register consumer rebalance listener val rebalanceListener1 = new TestConsumerRebalanceListener() zkConsumerConnector1.setConsumerRebalanceListener(rebalanceListener1) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - // check if rebalance listener is fired + // Check if rebalance listener is fired for (i <- 0 to 4) - assertEquals(1, rebalanceListener1.callingCounts(i)) + assertEquals(1, rebalanceListener1.callingCounts(1)) + + val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) + val expected_1 = List(("0", "group1_consumer1-0"), + ("1", "group1_consumer1-0")) + assertEquals(expected_1, actual_1) val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) - // register consumer rebalance listener + // Register consumer rebalance listener val rebalanceListener2 = new TestConsumerRebalanceListener() - zkConsumerConnector1.setConsumerRebalanceListener(rebalanceListener2) + zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir) + val expected_2 = List(("0", "group1_consumer1-0"), + ("1", "group1_consumer2-0")) + assertEquals(expected_2, actual_2) - // check if rebalance listener is fired - for (i <- 0 to 4) { - assertEquals(2, rebalanceListener1.callingCounts(i)) - assertEquals(1, rebalanceListener2.callingCounts(i)) - } + // Check if rebalance listener is fired. Only check the successful rebalance because + // the failure time is nondeterministic. + assertEquals(1, rebalanceListener2.callingCounts(4)) + assertEquals(2, rebalanceListener1.callingCounts(4)) } -- 1.8.3.4 (Apple Git-47)