diff --git a/core/src/test/scala/unit/kafka/controller/BrokerChangeListenerTest.scala b/core/src/test/scala/unit/kafka/controller/BrokerChangeListenerTest.scala new file mode 100644 index 0000000..7513ce9 --- /dev/null +++ b/core/src/test/scala/unit/kafka/controller/BrokerChangeListenerTest.scala @@ -0,0 +1,100 @@ +package unit.kafka.controller + +import java.util.concurrent.locks.ReentrantLock + +import kafka.controller.{ControllerChannelManager, ControllerContext, KafkaController, ReplicaStateMachine} +import kafka.server.KafkaConfig +import org.I0Itec.zkclient.ZkClient +import org.apache.zookeeper.data.Stat +import org.easymock.EasyMock +import org.junit.Test +import org.scalatest.junit.JUnit3Suite + +import scala.collection.JavaConversions + +class BrokerChangeListenerTest extends JUnit3Suite { + + val controllerMock = EasyMock.createNiceMock(classOf[KafkaController]) + val controllerContextMock = EasyMock.createNiceMock(classOf[ControllerContext]) + val zookeeperClientMock = EasyMock.createNiceMock(classOf[ZkClient]) + var controllerChannelManagerMock = EasyMock.createNiceMock(classOf[ControllerChannelManager]) + val controllerLock = new ReentrantLock() + var kafkaConfigMock = EasyMock.createNiceMock(classOf[KafkaConfig]) + + override def setUp() { + EasyMock.expect(controllerMock.config).andReturn(kafkaConfigMock).anyTimes() + EasyMock.expect(kafkaConfigMock.brokerId).andReturn(25) + EasyMock.expect(controllerMock.controllerContext).andReturn(controllerContextMock).anyTimes() + EasyMock.expect(controllerContextMock.zkClient).andReturn(zookeeperClientMock).anyTimes() + EasyMock.expect(controllerContextMock.partitionReplicaAssignment).andReturn(scala.collection.mutable.Map.empty) + EasyMock.expect(controllerContextMock.allLiveReplicas()).andReturn(Set.empty) + EasyMock.expect(controllerContextMock.controllerChannelManager).andReturn(controllerChannelManagerMock).anyTimes() + EasyMock.expect(controllerContextMock.controllerLock).andReturn(controllerLock).anyTimes() + + val brokerInfos = Map("/brokers/ids/1" -> """{"port":9991, "host":"host1"}""" + , "/brokers/ids/2" -> """{"port":9992, "host":"host2"}""" + , "/brokers/ids/3" -> """{"port":9993, "host":"host3"}""" + , "/brokers/ids/4" -> """{"port":9994, "host":"host4"}""") + + for (key <- brokerInfos.keys) { + EasyMock.expect(zookeeperClientMock.readData(EasyMock.eq(key), EasyMock.anyObject[Stat]())) + .andReturn(brokerInfos(key)).anyTimes() + } + } + + @Test + def testNewBrokersAdded() { + // Arrange + val liveOrShuttingDownBrokerIds = Set(1, 3) + val currentBrokerList = JavaConversions.asList(List("1", "2", "3", "4")) + + EasyMock.expect(controllerContextMock.liveOrShuttingDownBrokerIds).andReturn(liveOrShuttingDownBrokerIds).anyTimes() + EasyMock.expect(controllerMock.onBrokerStartup(Seq(2, 4))) + + EasyMock.replay(controllerMock) + EasyMock.replay(controllerContextMock) + EasyMock.replay(zookeeperClientMock) + EasyMock.replay(controllerChannelManagerMock) + EasyMock.replay(kafkaConfigMock) + + val replicaStateMachine = new ReplicaStateMachine(controllerMock) + val subject = new replicaStateMachine.BrokerChangeListener() + + // Act + replicaStateMachine.startup() + subject.handleChildChange("brokerIds path", currentBrokerList) + + // Assert + EasyMock.verify(controllerMock) + EasyMock.verify(zookeeperClientMock) + EasyMock.verify(controllerContextMock) + } + + @Test + def testNewBrokersRemoved() { + // Arrange + val liveOrShuttingDownBrokerIds = Set(1, 2, 3, 4) + val currentBrokerList = JavaConversions.asList(List("2", "3")) + + EasyMock.expect(controllerContextMock.liveOrShuttingDownBrokerIds).andReturn(liveOrShuttingDownBrokerIds).anyTimes() + EasyMock.expect(controllerMock.onBrokerFailure(Seq(1, 4))) + + EasyMock.replay(controllerMock) + EasyMock.replay(controllerContextMock) + EasyMock.replay(zookeeperClientMock) + EasyMock.replay(controllerChannelManagerMock) + EasyMock.replay(kafkaConfigMock) + + val replicaStateMachine = new ReplicaStateMachine(controllerMock) + val subject = new replicaStateMachine.BrokerChangeListener() + + // Act + replicaStateMachine.startup() + subject.handleChildChange("brokerIds path", currentBrokerList) + + // Assert + EasyMock.verify(controllerMock) + EasyMock.verify(zookeeperClientMock) + EasyMock.verify(controllerContextMock) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/controller/PreferredReplicaElectionListenerTest.scala b/core/src/test/scala/unit/kafka/controller/PreferredReplicaElectionListenerTest.scala new file mode 100644 index 0000000..74f88aa --- /dev/null +++ b/core/src/test/scala/unit/kafka/controller/PreferredReplicaElectionListenerTest.scala @@ -0,0 +1,99 @@ +package unit.kafka.controller + +import java.util.concurrent.locks.ReentrantLock + +import kafka.common.TopicAndPartition +import kafka.controller._ +import kafka.server.KafkaConfig +import org.I0Itec.zkclient.ZkClient +import org.easymock.{EasyMock, IArgumentMatcher} +import org.junit.Test +import org.scalatest.junit.JUnit3Suite + +import scala.collection.mutable + +class PreferredReplicaElectionListenerTest extends JUnit3Suite { + + val controllerMock = EasyMock.createNiceMock(classOf[KafkaController]) + val controllerContextMock = EasyMock.createNiceMock(classOf[ControllerContext]) + val zookeeperClientMock = EasyMock.createNiceMock(classOf[ZkClient]) + var topicDeletionManagerMock = EasyMock.createNiceMock(classOf[TopicDeletionManager]) + val controllerLock = new ReentrantLock() + var kafkaConfigMock = EasyMock.createNiceMock(classOf[KafkaConfig]) + var reassignmentData: String = _ + + override def setUp() { + EasyMock.expect(controllerMock.deleteTopicManager).andReturn(topicDeletionManagerMock).anyTimes() + EasyMock.expect(controllerMock.config).andReturn(kafkaConfigMock).anyTimes() + EasyMock.expect(controllerMock.controllerContext).andReturn(controllerContextMock).anyTimes() + EasyMock.expect(controllerContextMock.zkClient).andReturn(zookeeperClientMock).anyTimes() + EasyMock.expect(controllerContextMock.controllerLock).andReturn(controllerLock).anyTimes() + + reassignmentData = + """{"version":1, "partitions":[ + | {"topic": "topic1", "partition": 1, "replicas": [3, 4]}, + | {"topic": "topic2", "partition": 3, "replicas": [5]} + |]}""".stripMargin + } + + @Test + def testDataChangeThatIsNotQueuedUpForDeletion() { + // Arrange + val partionsBeingReassigned = mutable.HashMap(TopicAndPartition("topic2", 3) -> ReassignedPartitionsContext()) + + EasyMock.expect(controllerContextMock.partitionsBeingReassigned).andReturn(partionsBeingReassigned).anyTimes() + EasyMock.expect(controllerMock.initiateReassignReplicasForTopicPartition(EasyMock.eq(TopicAndPartition("topic1", 1)), matchReassignedPartitionsContext(Seq(3, 4)))).atLeastOnce() + EasyMock.expect(topicDeletionManagerMock.isTopicQueuedUpForDeletion(EasyMock.eq("topic1"))).andReturn(false).once() + + EasyMock.replay(controllerMock) + EasyMock.replay(controllerContextMock) + EasyMock.replay(topicDeletionManagerMock) + + val subject = new PartitionsReassignedListener(controllerMock) + + // Act + subject.handleDataChange("partition reassignment path", reassignmentData) + + // Assert + EasyMock.verify(controllerMock) + EasyMock.verify(topicDeletionManagerMock) + EasyMock.verify(controllerContextMock) + } + + @Test + def testDataChangeThatIsQueuedUpForDeletion() { + // Arrange + val partionsBeingReassigned = mutable.HashMap(TopicAndPartition("topic1", 3) -> ReassignedPartitionsContext()) + + EasyMock.expect(controllerContextMock.partitionsBeingReassigned).andReturn(partionsBeingReassigned).anyTimes() + EasyMock.expect(controllerMock.removePartitionFromReassignedPartitions(EasyMock.eq(TopicAndPartition("topic2", 3)))).once() + EasyMock.expect(topicDeletionManagerMock.isTopicQueuedUpForDeletion(EasyMock.eq("topic2"))).andReturn(true).once() + + EasyMock.replay(controllerMock) + EasyMock.replay(controllerContextMock) + EasyMock.replay(topicDeletionManagerMock) + + val subject = new PartitionsReassignedListener(controllerMock) + + // Act + subject.handleDataChange("partition reassignment path", reassignmentData) + + // Assert + EasyMock.verify(controllerMock) + EasyMock.verify(topicDeletionManagerMock) + EasyMock.verify(controllerContextMock) + } + + def matchReassignedPartitionsContext(newReplicas: Seq[Int]): ReassignedPartitionsContext = { + EasyMock.reportMatcher(new ReassignedPartitionsContextMatcher(newReplicas)) + null + } + + class ReassignedPartitionsContextMatcher(val newReplicas: Seq[Int]) extends IArgumentMatcher { + def matches(actual: AnyRef): Boolean = { + actual.asInstanceOf[ReassignedPartitionsContext].newReplicas == newReplicas + } + + def appendTo(buffer: StringBuffer): Unit = buffer append "reassign partition context matching" + } +}