From 943b35f6f2d6fdbb1bb38c774cf8d8b0d61eb2ea Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 12 Feb 2015 11:35:22 -0800 Subject: [PATCH] KAFKA-1948.v1 --- core/src/main/scala/kafka/controller/KafkaController.scala | 2 ++ core/src/test/scala/integration/kafka/api/ConsumerTest.scala | 7 +++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 66df6d2..772ff2d 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -337,6 +337,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * required to clean up internal controller data structures */ def onControllerResignation() { + info("Broker %d starting resign controller state transition".format(config.brokerId)) + // de-register listeners deregisterReassignedPartitionsListener() deregisterPreferredReplicaElectionListener() diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 798f035..a469a76 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -230,7 +230,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertEquals(1, parts.size) assertNotNull(parts(0).leader()) - // shutdown the co-ordinator + // shutdown the coordinator val coordinator = parts(0).leader().id() this.servers(coordinator).shutdown() @@ -239,6 +239,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumer0.poll(50) assertEquals(2, callback.callsToAssigned) assertEquals(2, callback.callsToRevoked) + + // restart the coordinator + this.servers(coordinator).startup() consumer0.close() } @@ -253,7 +256,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) { info("onPartitionsRevoked called.") callsToRevoked += 1 - } + } } private def sendRecords(numRecords: Int) { -- 1.7.10.2 (Apple Git-33)