From e49c786176ae233f803bd5332a89af5bddc48f2b Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 15 Jan 2015 05:54:55 -0800 Subject: [PATCH 1/3] Added callback of beforeStartingFetchers --- .../scala/kafka/consumer/PartitionAssignor.scala | 24 +++++++++------ .../consumer/ZookeeperConsumerConnector.scala | 18 ++++++++++- .../consumer/ConsumerRebalanceListener.java | 6 ++++ core/src/main/scala/kafka/tools/MirrorMaker.scala | 5 +++ .../kafka/consumer/PartitionAssignorTest.scala | 2 +- .../consumer/ZookeeperConsumerConnectorTest.scala | 36 ++++++++++++++++++---- 6 files changed, 73 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index e6ff768..2ab1fd3 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -19,7 +19,7 @@ package kafka.consumer import org.I0Itec.zkclient.ZkClient import kafka.common.TopicAndPartition -import kafka.utils.{Utils, ZkUtils, Logging} +import kafka.utils.{Pool, Utils, ZkUtils, Logging} trait PartitionAssignor { @@ -28,7 +28,7 @@ trait PartitionAssignor { * @return An assignment map of partition to consumer thread. This only includes assignments for threads that belong * to the given assignment-context's consumer. */ - def assign(ctx: AssignmentContext): scala.collection.Map[TopicAndPartition, ConsumerThreadId] + def assign(ctx: AssignmentContext): Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]] } @@ -69,7 +69,10 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { - val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + + val partitionOwnershipDecision = new Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]]( + Some( (topic: String) => new collection.mutable.HashMap[TopicAndPartition, ConsumerThreadId]) + ) if (ctx.consumersForTopic.size > 0) { // check conditions (a) and (b) @@ -102,8 +105,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { allTopicPartitions.foreach(topicPartition => { val threadId = threadAssignor.next() - if (threadId.consumer == ctx.consumerId) - partitionOwnershipDecision += (topicPartition -> threadId) + partitionOwnershipDecision.getAndMaybePut(threadId.consumer) += (topicPartition -> threadId) }) } @@ -123,9 +125,10 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { class RangeAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { - val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() - - for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) { + val partitionOwnershipDecision = new Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]]( + Some( (topic: String) => new collection.mutable.HashMap[TopicAndPartition, ConsumerThreadId]) + ) + for (topic <- ctx.myTopicThreadIds.keySet) { val curConsumers = ctx.consumersForTopic(topic) val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) @@ -135,7 +138,7 @@ class RangeAssignor() extends PartitionAssignor with Logging { info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers) - for (consumerThreadId <- consumerThreadIdSet) { + for (consumerThreadId <- curConsumers) { val myConsumerPosition = curConsumers.indexOf(consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) @@ -152,7 +155,8 @@ class RangeAssignor() extends PartitionAssignor with Logging { val partition = curPartitions(i) info(consumerThreadId + " attempting to claim partition " + partition) // record the partition ownership decision - partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) + partitionOwnershipDecision.getAndMaybePut(consumerThreadId.consumer) += + (TopicAndPartition(topic, partition) -> consumerThreadId) } } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 191a867..fde48bb 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -683,7 +683,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } releasePartitionOwnership(topicRegistry) val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) - val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) + val globalPartitionOwnershipDecision = partitionAssignor.assign(assignmentContext) + val partitionOwnershipDecision = globalPartitionOwnershipDecision.get(assignmentContext.consumerId) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) @@ -720,6 +721,21 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } topicRegistry = currentTopicRegistry + // Invoke beforeStartingFetchers callback if the consumerRebalanceListener is set. + if (consumerRebalanceListener != null) { + info("Calling beforeStartingFetchers() from rebalance listener.") + consumerRebalanceListener.beforeStartingFetchers( + globalPartitionOwnershipDecision.values.flatten.groupBy[String]( + partitionOwnership => partitionOwnership._1.topic + ).map({ + case (topic, partitionOwnerShips) => + topic -> partitionOwnerShips.map({ + case(topicAndPartition, consumerThreadId) => + topicAndPartition.partition -> consumerThreadId.toString + }).toMap + }).toMap.asInstanceOf[java.util.Map[String, java.util.Map[java.lang.Integer, String]]] + ) + } updateFetcher(cluster) true } else { diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java index facf509..a0502a6 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java @@ -39,4 +39,10 @@ public interface ConsumerRebalanceListener { */ public void beforeReleasingPartitions(Map> partitionOwnership); + /** + * This method is called after the new partition assignment is finished but before fetcher + * threads start. A map of new global partition assignment is passed in as parameter. + */ + public void beforeStartingFetchers(Map> partitionAssignment); + } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5cbc810..5ededa3 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -630,6 +630,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { if (customRebalanceListener.isDefined) customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) } + + override def beforeStartingFetchers(partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, String]]) { + if (customRebalanceListener.isDefined) + customRebalanceListener.get.beforeStartingFetchers(partitionAssignment) + } } private[kafka] class MirrorMakerRecord (val sourceTopic: String, diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index 24954de..8056349 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -164,7 +164,7 @@ private object PartitionAssignorTest extends Logging { verifyAssignmentIsUniform: Boolean = false) { val assignments = scenario.subscriptions.map{ case(consumer, subscription) => val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkClient) - assignor.assign(ctx) + assignor.assign(ctx).get(consumer) } // check for uniqueness (i.e., any partition should be assigned to exactly one consumer stream) diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index a17e853..d5d5029 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -17,6 +17,8 @@ package kafka.consumer +import java.util + import junit.framework.Assert._ import kafka.integration.KafkaServerTestHarness import kafka.javaapi.consumer.ConsumerRebalanceListener @@ -360,10 +362,17 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // Check if rebalance listener is fired - assertEquals(true, rebalanceListener1.listenerCalled) + assertEquals(true, rebalanceListener1.beforeReleasingPartitionsCalled) + assertEquals(true, rebalanceListener1.beforeStartingFetchersCalled) assertEquals(null, rebalanceListener1.partitionOwnership.get(topic)) + val expectedGlobalPartitionOwnership1 = new java.util.HashMap[String, java.util.Map[java.lang.Integer, String]] + expectedGlobalPartitionOwnership1.put("topic1", new java.util.HashMap[java.lang.Integer, String]) + expectedGlobalPartitionOwnership1.get("topic1").put(0, "group1_consumer1-0") + expectedGlobalPartitionOwnership1.get("topic1").put(1, "group1_consumer1-0") + assertEquals(expectedGlobalPartitionOwnership1, rebalanceListener1.globalPartitionOwnership) // reset the flag - rebalanceListener1.listenerCalled = false + rebalanceListener1.beforeReleasingPartitionsCalled = false + rebalanceListener1.beforeStartingFetchersCalled = false val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) val expected_1 = List(("0", "group1_consumer1-0"), @@ -383,10 +392,18 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertEquals(expected_2, actual_2) // Check if rebalance listener is fired - assertEquals(true, rebalanceListener1.listenerCalled) + assertEquals(true, rebalanceListener1.beforeReleasingPartitionsCalled) + assertEquals(true, rebalanceListener1.beforeStartingFetchersCalled) assertEquals(Set[Int](0, 1), rebalanceListener1.partitionOwnership.get(topic)) - assertEquals(true, rebalanceListener2.listenerCalled) + val expectedGlobalPartitionOwnership2 = new java.util.HashMap[String, java.util.Map[java.lang.Integer, String]] + expectedGlobalPartitionOwnership1.put("topic1", new java.util.HashMap[java.lang.Integer, String]) + expectedGlobalPartitionOwnership1.get("topic1").put(0, "group1_consumer1-0") + expectedGlobalPartitionOwnership1.get("topic1").put(1, "group1_consumer2-0") + assertEquals(expectedGlobalPartitionOwnership2, rebalanceListener1.globalPartitionOwnership) + assertEquals(true, rebalanceListener2.beforeReleasingPartitionsCalled) + assertEquals(true, rebalanceListener2.beforeStartingFetchersCalled) assertEquals(null, rebalanceListener2.partitionOwnership.get(topic)) + assertEquals(expectedGlobalPartitionOwnership2, rebalanceListener2.globalPartitionOwnership) zkConsumerConnector1.shutdown() zkConsumerConnector2.shutdown() } @@ -403,13 +420,20 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } private class TestConsumerRebalanceListener extends ConsumerRebalanceListener { - var listenerCalled: Boolean = false + var beforeReleasingPartitionsCalled: Boolean = false + var beforeStartingFetchersCalled: Boolean = false var partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]] = null + var globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, String]] = null override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { - listenerCalled = true + beforeReleasingPartitionsCalled = true this.partitionOwnership = partitionOwnership } + + override def beforeStartingFetchers(globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, String]]) { + beforeStartingFetchersCalled = true + this.globalPartitionOwnership = globalPartitionOwnership + } } } -- 1.8.3.4 (Apple Git-47) From 883399dbfc4222493331495b1f752611e592da2f Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 20 Jan 2015 10:37:51 -0800 Subject: [PATCH 2/3] Modified unit test. --- .../scala/kafka/consumer/PartitionAssignor.scala | 6 +++-- .../consumer/ZookeeperConsumerConnector.scala | 17 +++++++------ .../consumer/ConsumerRebalanceListener.java | 3 +-- core/src/main/scala/kafka/tools/MirrorMaker.scala | 6 ++--- .../consumer/ZookeeperConsumerConnectorTest.scala | 29 ++++++++++------------ 5 files changed, 30 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 2ab1fd3..ee2d74c 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -108,7 +108,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { partitionOwnershipDecision.getAndMaybePut(threadId.consumer) += (topicPartition -> threadId) }) } - + partitionOwnershipDecision.getAndMaybePut(ctx.consumerId) partitionOwnershipDecision } } @@ -161,7 +161,9 @@ class RangeAssignor() extends PartitionAssignor with Logging { } } } - + // If no this consumer owns no partitions, make sure an empty partition ownership assignment map is there + // for ZookeeperConsumerConnector to use. + partitionOwnershipDecision.getAndMaybePut(ctx.consumerId) partitionOwnershipDecision } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fde48bb..28f65a9 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -724,17 +724,18 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // Invoke beforeStartingFetchers callback if the consumerRebalanceListener is set. if (consumerRebalanceListener != null) { info("Calling beforeStartingFetchers() from rebalance listener.") - consumerRebalanceListener.beforeStartingFetchers( - globalPartitionOwnershipDecision.values.flatten.groupBy[String]( + consumerRebalanceListener.beforeStartingFetchers({ + val partitionAssigment = globalPartitionOwnershipDecision.values.flatten.groupBy[String]( partitionOwnership => partitionOwnership._1.topic ).map({ case (topic, partitionOwnerShips) => - topic -> partitionOwnerShips.map({ - case(topicAndPartition, consumerThreadId) => - topicAndPartition.partition -> consumerThreadId.toString - }).toMap - }).toMap.asInstanceOf[java.util.Map[String, java.util.Map[java.lang.Integer, String]]] - ) + topic -> mapAsJavaMap(collection.mutable.Map(partitionOwnerShips.map({ + case (topicAndPartition, consumerThreadId) => + topicAndPartition.partition -> consumerThreadId + }).toSeq:_*)).asInstanceOf[java.util.Map[java.lang.Integer, ConsumerThreadId]] + }) + mapAsJavaMap(collection.mutable.Map(partitionAssigment.toSeq:_*)) + }) } updateFetcher(cluster) true diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java index a0502a6..1786708 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java @@ -17,7 +17,6 @@ package kafka.javaapi.consumer; -import kafka.common.TopicAndPartition; import kafka.consumer.ConsumerThreadId; import java.util.Map; @@ -43,6 +42,6 @@ public interface ConsumerRebalanceListener { * This method is called after the new partition assignment is finished but before fetcher * threads start. A map of new global partition assignment is passed in as parameter. */ - public void beforeStartingFetchers(Map> partitionAssignment); + public void beforeStartingFetchers(Map> partitionAssignment); } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5ededa3..bce7cfa 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -627,12 +627,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { commitOffsets() // invoke custom consumer rebalance listener - if (customRebalanceListener.isDefined) + if (customRebalanceListener.get != null) customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) } - override def beforeStartingFetchers(partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, String]]) { - if (customRebalanceListener.isDefined) + override def beforeStartingFetchers(partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) { + if (customRebalanceListener.get != null) customRebalanceListener.get.beforeStartingFetchers(partitionAssignment) } } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index d5d5029..354f838 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -365,11 +365,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertEquals(true, rebalanceListener1.beforeReleasingPartitionsCalled) assertEquals(true, rebalanceListener1.beforeStartingFetchersCalled) assertEquals(null, rebalanceListener1.partitionOwnership.get(topic)) - val expectedGlobalPartitionOwnership1 = new java.util.HashMap[String, java.util.Map[java.lang.Integer, String]] - expectedGlobalPartitionOwnership1.put("topic1", new java.util.HashMap[java.lang.Integer, String]) - expectedGlobalPartitionOwnership1.get("topic1").put(0, "group1_consumer1-0") - expectedGlobalPartitionOwnership1.get("topic1").put(1, "group1_consumer1-0") - assertEquals(expectedGlobalPartitionOwnership1, rebalanceListener1.globalPartitionOwnership) + // Check if partition assignment in rebalance listener is correct + assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(0).consumer) + assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(1).consumer) + assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(0).threadId) + assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(1).threadId) // reset the flag rebalanceListener1.beforeReleasingPartitionsCalled = false rebalanceListener1.beforeStartingFetchersCalled = false @@ -395,15 +395,12 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertEquals(true, rebalanceListener1.beforeReleasingPartitionsCalled) assertEquals(true, rebalanceListener1.beforeStartingFetchersCalled) assertEquals(Set[Int](0, 1), rebalanceListener1.partitionOwnership.get(topic)) - val expectedGlobalPartitionOwnership2 = new java.util.HashMap[String, java.util.Map[java.lang.Integer, String]] - expectedGlobalPartitionOwnership1.put("topic1", new java.util.HashMap[java.lang.Integer, String]) - expectedGlobalPartitionOwnership1.get("topic1").put(0, "group1_consumer1-0") - expectedGlobalPartitionOwnership1.get("topic1").put(1, "group1_consumer2-0") - assertEquals(expectedGlobalPartitionOwnership2, rebalanceListener1.globalPartitionOwnership) - assertEquals(true, rebalanceListener2.beforeReleasingPartitionsCalled) - assertEquals(true, rebalanceListener2.beforeStartingFetchersCalled) - assertEquals(null, rebalanceListener2.partitionOwnership.get(topic)) - assertEquals(expectedGlobalPartitionOwnership2, rebalanceListener2.globalPartitionOwnership) + // Check if global partition ownership in rebalance listener is correct + assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(0).consumer) + assertEquals("group1_consumer2", rebalanceListener1.globalPartitionOwnership.get(topic).get(1).consumer) + assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(0).threadId) + assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(1).threadId) + assertEquals(rebalanceListener1.globalPartitionOwnership, rebalanceListener2.globalPartitionOwnership) zkConsumerConnector1.shutdown() zkConsumerConnector2.shutdown() } @@ -423,14 +420,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar var beforeReleasingPartitionsCalled: Boolean = false var beforeStartingFetchersCalled: Boolean = false var partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]] = null - var globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, String]] = null + var globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]] = null override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { beforeReleasingPartitionsCalled = true this.partitionOwnership = partitionOwnership } - override def beforeStartingFetchers(globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, String]]) { + override def beforeStartingFetchers(globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) { beforeStartingFetchersCalled = true this.globalPartitionOwnership = globalPartitionOwnership } -- 1.8.3.4 (Apple Git-47) From 7f58ee6e17bcbaf8359e919c4863cb6c0f9bcf45 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 20 Jan 2015 13:28:06 -0800 Subject: [PATCH 3/3] Minor code change --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index bce7cfa..b5444e7 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -213,11 +213,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) val customRebalanceListener = { if (customRebalanceListenerClass != null) - Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass) + Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) else - null + None } - consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, Some(customRebalanceListener)) + consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, customRebalanceListener) connector.setConsumerRebalanceListener(consumerRebalanceListener) // create producer threads @@ -627,12 +627,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { commitOffsets() // invoke custom consumer rebalance listener - if (customRebalanceListener.get != null) + if (customRebalanceListener.isDefined) customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) } override def beforeStartingFetchers(partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) { - if (customRebalanceListener.get != null) + if (customRebalanceListener.isDefined) customRebalanceListener.get.beforeStartingFetchers(partitionAssignment) } } -- 1.8.3.4 (Apple Git-47)