From ebe89966370a3f7af9e418649f0868acf24a1f88 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 12 Nov 2014 20:43:37 -0800 Subject: [PATCH 1/3] rewrite the patch --- .../consumer/ZookeeperConsumerConnector.scala | 28 +++++++++- .../consumer/ConsumerRebalanceListener.java | 55 ++++++++++++++++++++ .../consumer/ZookeeperConsumerConnector.scala | 4 ++ .../consumer/ZookeeperConsumerConnectorTest.scala | 60 ++++++++++++++++++++++ 4 files changed, 145 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index f476973..9600c49 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -28,6 +28,7 @@ import kafka.api._ import kafka.client.ClientUtils import kafka.cluster._ import kafka.common._ +import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.metrics._ import kafka.network.BlockingChannel import kafka.serializer._ @@ -102,6 +103,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val offsetsChannelLock = new Object private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null + private var consumerRebalanceListener: ConsumerRebalanceListener = null // useful for tracking migration of consumers to store offsets in kafka private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS) @@ -160,6 +162,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, wildcardStreamsHandler.streams } + def setConsumerRebalanceListener(listener: ConsumerRebalanceListener) { + consumerRebalanceListener = listener + } + private def createFetcher() { if (enableFetcher) fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient)) @@ -598,7 +604,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, var cluster: Cluster = null try { cluster = getCluster(zkClient) + if (consumerRebalanceListener != null) { + info("Calling beforeConsumserRebalance() from rebalance listener.") + consumerRebalanceListener.beforeConsumerRebalance() + } done = rebalance(cluster) + if (consumerRebalanceListener != null) { + info("Calling afterConsumerRebalance() from rebalance listener.") + consumerRebalanceListener.afterConsumerRebalance(done) + } } catch { case e: Throwable => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. @@ -646,9 +660,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * partitions in parallel. So, not stopping the fetchers leads to duplicate data. */ closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) - + if (consumerRebalanceListener != null) { + info("Calling afterStoppingFetcherThread() from rebalance listener.") + consumerRebalanceListener.afterStoppingFetcherThreads() + } releasePartitionOwnership(topicRegistry) - + if (consumerRebalanceListener != null) { + info("Calling afterReleasingPartitionOwnership() from rebalance listener.") + consumerRebalanceListener.afterReleasingPartitionOwnership() + } val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( @@ -685,6 +705,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } topicRegistry = currentTopicRegistry + if (consumerRebalanceListener != null) { + info("Calling afterPartitionReassignment() from rebalance listener.") + consumerRebalanceListener.afterPartitionReassignment() + } updateFetcher(cluster) true } else { diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java new file mode 100644 index 0000000..0614e72 --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi.consumer; + +/** + * This listener is used for execution of tasks defined by user when a consumer rebalance + * occurs in {@link kafka.consumer.ZookeeperConsumerConnector} + */ +public class ConsumerRebalanceListener { + + /** + * This method is called before stopping all the fetcher threads. + */ + public void beforeConsumerRebalance() {} + + /** + * This method is called after all the fetcher threads are stopped but before the + * ownership of partitions are released. Depending on whether auto offset commit is + * enabled or not, offsets may or may not have been committed. + */ + public void afterStoppingFetcherThreads() {} + + /** + * This method is called after all the partition owners has been release but before + * starting the partition reassignment. + */ + public void afterReleasingPartitionOwnership() {} + + /** + * This method is called after all the partition reassignment is done but before + * the fetcher threads start. + */ + public void afterPartitionReassignment() {} + + /** + * This method is called after the rebalance is finished. + */ + public void afterConsumerRebalance(boolean success) {} + +} diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 1f98db5..fd37c2a 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -115,6 +115,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, underlying.commitOffsets(retryOnFailure) } + def setConsumerRebalanceListener(consumerRebalanceListener: ConsumerRebalanceListener) { + underlying.setConsumerRebalanceListener(consumerRebalanceListener) + } + def shutdown() { underlying.shutdown } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index e1d8711..cc25270 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -19,6 +19,7 @@ package kafka.consumer import junit.framework.Assert._ import kafka.integration.KafkaServerTestHarness +import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.server._ import scala.collection._ import org.scalatest.junit.JUnit3Suite @@ -345,6 +346,36 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkClient.close() } + def testConsumerRebalanceListener() { + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) + val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) + // register consumer rebalance listener + val rebalanceListener1 = new TestConsumerRebalanceListener() + zkConsumerConnector1.setConsumerRebalanceListener(rebalanceListener1) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + // check if rebalance listener is fired + for (i <- 0 to 4) + assertEquals(1, rebalanceListener1.callingCounts(i)) + + val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) + val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) + // register consumer rebalance listener + val rebalanceListener2 = new TestConsumerRebalanceListener() + zkConsumerConnector1.setConsumerRebalanceListener(rebalanceListener2) + val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + + // check if rebalance listener is fired + for (i <- 0 to 4) { + assertEquals(2, rebalanceListener1.callingCounts(i)) + assertEquals(1, rebalanceListener2.callingCounts(i)) + } + + } + def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, @@ -420,4 +451,33 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String])) } + private class TestConsumerRebalanceListener extends ConsumerRebalanceListener { + val callingCounts: Array[Int] = new Array[Int](5) + var failureCount = 0 + + override def beforeConsumerRebalance() { + callingCounts(0) += 1 + } + + override def afterStoppingFetcherThreads() { + callingCounts(1) += 1 + } + + override def afterReleasingPartitionOwnership() { + callingCounts(2) += 1 + } + + override def afterPartitionReassignment() { + callingCounts(3) += 1 + } + + override def afterConsumerRebalance(success: Boolean) { + if (success) + callingCounts(4) += 1 + else + failureCount += 1 + } + + } + } -- 1.8.3.4 (Apple Git-47) From 611a0e7f0e33daaf7cf416999dcac451f27e6443 Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 13 Nov 2014 17:10:07 -0800 Subject: [PATCH 2/3] Added new unit test. --- .../consumer/ZookeeperConsumerConnectorTest.scala | 34 ++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index cc25270..b6aa2df 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -347,32 +347,42 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } def testConsumerRebalanceListener() { + // Send messages to create topic + sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) + sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) - // register consumer rebalance listener + // Register consumer rebalance listener val rebalanceListener1 = new TestConsumerRebalanceListener() zkConsumerConnector1.setConsumerRebalanceListener(rebalanceListener1) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - // check if rebalance listener is fired + // Check if rebalance listener is fired for (i <- 0 to 4) - assertEquals(1, rebalanceListener1.callingCounts(i)) + assertEquals(1, rebalanceListener1.callingCounts(1)) + + val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) + val expected_1 = List(("0", "group1_consumer1-0"), + ("1", "group1_consumer1-0")) + assertEquals(expected_1, actual_1) val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) - // register consumer rebalance listener + // Register consumer rebalance listener val rebalanceListener2 = new TestConsumerRebalanceListener() - zkConsumerConnector1.setConsumerRebalanceListener(rebalanceListener2) + zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) + val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir) + val expected_2 = List(("0", "group1_consumer1-0"), + ("1", "group1_consumer2-0")) + assertEquals(expected_2, actual_2) - // check if rebalance listener is fired - for (i <- 0 to 4) { - assertEquals(2, rebalanceListener1.callingCounts(i)) - assertEquals(1, rebalanceListener2.callingCounts(i)) - } + // Check if rebalance listener is fired. Only check the successful rebalance because + // the failure time is nondeterministic. + assertEquals(1, rebalanceListener2.callingCounts(4)) + assertEquals(2, rebalanceListener1.callingCounts(4)) } -- 1.8.3.4 (Apple Git-47) From 0cd4d8d9f3d4a2cd8558a249b9591c384ffe0241 Mon Sep 17 00:00:00 2001 From: jqin Date: Sat, 15 Nov 2014 00:59:54 -0800 Subject: [PATCH 3/3] Incorporated Joel's comments --- .../consumer/ZookeeperConsumerConnector.scala | 28 ++++++++++------------ .../consumer/ConsumerRebalanceListener.java | 25 +++++++++++++------ .../consumer/ZookeeperConsumerConnectorTest.scala | 26 +++++++++++++++----- 3 files changed, 51 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 9600c49..9effa5f 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -40,7 +40,7 @@ import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, import org.apache.zookeeper.Watcher.Event.KeeperState import scala.collection._ - +import scala.collection.JavaConversions._ /** * This class handles the consumers interaction with zookeeper @@ -604,15 +604,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, var cluster: Cluster = null try { cluster = getCluster(zkClient) - if (consumerRebalanceListener != null) { - info("Calling beforeConsumserRebalance() from rebalance listener.") - consumerRebalanceListener.beforeConsumerRebalance() - } + if (consumerRebalanceListener != null) + consumerRebalanceListener.onConsumerRebalanceStart() done = rebalance(cluster) - if (consumerRebalanceListener != null) { - info("Calling afterConsumerRebalance() from rebalance listener.") - consumerRebalanceListener.afterConsumerRebalance(done) - } + if (consumerRebalanceListener != null) + consumerRebalanceListener.onConsumerRebalanceCompletion(done) } catch { case e: Throwable => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. @@ -661,13 +657,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, */ closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) if (consumerRebalanceListener != null) { - info("Calling afterStoppingFetcherThread() from rebalance listener.") - consumerRebalanceListener.afterStoppingFetcherThreads() + info("Calling onFetcherThreadsStopped() from rebalance listener.") + consumerRebalanceListener.onFetcherThreadsStopped() } releasePartitionOwnership(topicRegistry) if (consumerRebalanceListener != null) { - info("Calling afterReleasingPartitionOwnership() from rebalance listener.") - consumerRebalanceListener.afterReleasingPartitionOwnership() + info("Calling onPartitionsRevoked() from rebalance listener.") + consumerRebalanceListener.onPartitionsRevoked() } val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) @@ -706,8 +702,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, topicRegistry = currentTopicRegistry if (consumerRebalanceListener != null) { - info("Calling afterPartitionReassignment() from rebalance listener.") - consumerRebalanceListener.afterPartitionReassignment() + info("Calling onPartitionsAssigned() from rebalance listener.") + consumerRebalanceListener.onPartitionsAssigned(partitionOwnershipDecision.map(decision => { + decision._1 -> decision._2.toString + })) } updateFetcher(cluster) true diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java index 0614e72..ee01715 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java @@ -17,39 +17,50 @@ package kafka.javaapi.consumer; +import kafka.common.TopicAndPartition; + +import java.util.Map; + /** * This listener is used for execution of tasks defined by user when a consumer rebalance * occurs in {@link kafka.consumer.ZookeeperConsumerConnector} */ -public class ConsumerRebalanceListener { +public interface ConsumerRebalanceListener { /** - * This method is called before stopping all the fetcher threads. + * This method will be called when consumer rebalance is about to start, i.e. before stopping fetcher + * threads. This method could be used to save any states before rebalance starts. */ - public void beforeConsumerRebalance() {} + public void onConsumerRebalanceStart(); /** * This method is called after all the fetcher threads are stopped but before the * ownership of partitions are released. Depending on whether auto offset commit is * enabled or not, offsets may or may not have been committed. + * One use case of this method is in mirror maker. To avoid data loss, we disabled + * offsets auto commit, so in this callback method we could wait until the message + * in data channel are drained and delivered before commit offsets, so we can avoid + * getting duplicates on consumer rebalance. */ - public void afterStoppingFetcherThreads() {} + public void onFetcherThreadsStopped(); /** * This method is called after all the partition owners has been release but before * starting the partition reassignment. */ - public void afterReleasingPartitionOwnership() {} + public void onPartitionsRevoked(); /** * This method is called after all the partition reassignment is done but before * the fetcher threads start. + * This method can be used to update some partition assignment related states in user application. */ - public void afterPartitionReassignment() {} + public void onPartitionsAssigned(Map partitionAssignment); /** * This method is called after the rebalance is finished. + * This method could be used to refresh the states after consumer rebalance if needed. */ - public void afterConsumerRebalance(boolean success) {} + public void onConsumerRebalanceCompletion(boolean success); } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index b6aa2df..e0dd2e4 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -22,6 +22,7 @@ import kafka.integration.KafkaServerTestHarness import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.server._ import scala.collection._ +import scala.collection.JavaConversions._ import org.scalatest.junit.JUnit3Suite import kafka.message._ import kafka.serializer._ @@ -31,7 +32,7 @@ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} import java.util.{Collections, Properties} import org.apache.log4j.{Logger, Level} import kafka.utils.TestUtils._ -import kafka.common.MessageStreamsExistException +import kafka.common.{TopicAndPartition, MessageStreamsExistException} class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -366,6 +367,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val expected_1 = List(("0", "group1_consumer1-0"), ("1", "group1_consumer1-0")) assertEquals(expected_1, actual_1) + val expected_parition_assignment1: java.util.Map[TopicAndPartition, String] = + Map[TopicAndPartition, String](TopicAndPartition(topic, 0) -> "group1_consumer1-0", + TopicAndPartition(topic, 1) -> "group1_consumer1-0") + assertEquals(expected_parition_assignment1, rebalanceListener1.partitionAssignment) val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) @@ -379,6 +384,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar ("1", "group1_consumer2-0")) assertEquals(expected_2, actual_2) + val expected_parition_assignment2_1: java.util.Map[TopicAndPartition, String] = + Map[TopicAndPartition, String](TopicAndPartition(topic, 0) -> "group1_consumer1-0") + val expected_parition_assignment2_2: java.util.Map[TopicAndPartition, String] = + Map[TopicAndPartition, String](TopicAndPartition(topic, 1) -> "group1_consumer2-0") + assertEquals(expected_parition_assignment2_1, rebalanceListener1.partitionAssignment) + assertEquals(expected_parition_assignment2_2, rebalanceListener2.partitionAssignment) + // Check if rebalance listener is fired. Only check the successful rebalance because // the failure time is nondeterministic. assertEquals(1, rebalanceListener2.callingCounts(4)) @@ -464,24 +476,26 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar private class TestConsumerRebalanceListener extends ConsumerRebalanceListener { val callingCounts: Array[Int] = new Array[Int](5) var failureCount = 0 + var partitionAssignment: java.util.Map[TopicAndPartition, String] = null - override def beforeConsumerRebalance() { + override def onConsumerRebalanceStart() { callingCounts(0) += 1 } - override def afterStoppingFetcherThreads() { + override def onFetcherThreadsStopped() { callingCounts(1) += 1 } - override def afterReleasingPartitionOwnership() { + override def onPartitionsRevoked() { callingCounts(2) += 1 } - override def afterPartitionReassignment() { + override def onPartitionsAssigned(partitionAssignment: java.util.Map[TopicAndPartition, String]) { + this.partitionAssignment = partitionAssignment callingCounts(3) += 1 } - override def afterConsumerRebalance(success: Boolean) { + override def onConsumerRebalanceCompletion(success: Boolean) { if (success) callingCounts(4) += 1 else -- 1.8.3.4 (Apple Git-47)