Index: system_test/single_host_multi_brokers/bin/run-test.sh =================================================================== --- system_test/single_host_multi_brokers/bin/run-test.sh (revision 1374135) +++ system_test/single_host_multi_brokers/bin/run-test.sh (working copy) @@ -150,7 +150,7 @@ get_leader_brokerid() { log_line=`grep -i -h 'completed the leader state transition' ${base_dir}/kafka_server_*.log | sort | tail -1` info "found the log line: $log_line" - broker_id=`echo $log_line | sed s'/^.*INFO Replica Manager on Broker //g' | awk -F ',' '{print $1}'` + broker_id=`echo $log_line | sed s'/^.*INFO Replica Manager on Broker //g' | awk -F ':' '{print $1}'` return $broker_id } Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1374135) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -456,6 +456,19 @@ // should never hit here throw new RuntimeException("unexpected error") } + + def leaderLocalOnBroker(topic: String, partitionId: Int, server: KafkaServer): Boolean = { + val partitionOpt = server.replicaManager.getPartition(topic, partitionId) + partitionOpt match { + case None => false + case Some(partition) => + val replicaOpt = partition.leaderReplicaIfLocal + replicaOpt match { + case None => false + case Some(_) => true + } + } + } } object ControllerTestUtils{ Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (revision 1374135) +++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (working copy) @@ -30,7 +30,8 @@ import org.scalatest.junit.JUnit3Suite import kafka.admin.CreateTopicCommand import kafka.api.{FetchRequestBuilder, OffsetRequest} -import kafka.common.UnknownTopicException +import kafka.utils.TestUtils._ +import kafka.common.InvalidPartitionException object LogOffsetTest { val random = new Random() @@ -70,7 +71,7 @@ simpleConsumer.getOffsetsBefore("foo", 0, OffsetRequest.LatestTime, 10) fail("Should fail with UnknownTopicException since topic foo was never created") }catch { - case e: UnknownTopicException => // this is ok + case e: InvalidPartitionException => // this is ok } } @@ -97,6 +98,7 @@ val offsets = log.getOffsetsBefore(offsetRequest) assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long])) + waitUntilTrue(() => leaderLocalOnBroker(topic, part, server), 1000) val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, OffsetRequest.LatestTime, 10) assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long])) @@ -156,6 +158,7 @@ println("Offsets = " + offsets.mkString(",")) assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long])) + waitUntilTrue(() => leaderLocalOnBroker(topic, part, server), 1000) val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10) assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long])) } @@ -182,6 +185,7 @@ assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) + waitUntilTrue(() => leaderLocalOnBroker(topic, part, server), 1000) val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, OffsetRequest.EarliestTime, 10) assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (revision 1374135) +++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (working copy) @@ -21,14 +21,13 @@ import kafka.zk.ZooKeeperTestHarness import kafka.admin.CreateTopicCommand import java.nio.ByteBuffer -import kafka.log.LogManager import junit.framework.Assert._ import org.easymock.EasyMock import kafka.network._ import kafka.cluster.Broker import kafka.utils.TestUtils import kafka.utils.TestUtils._ -import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig} +import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig} import kafka.common.ErrorMapping import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest} @@ -99,16 +98,10 @@ } private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = { - // topic metadata request only requires 2 APIs from the log manager - val logManager = EasyMock.createMock(classOf[LogManager]) - val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper]) + // topic metadata request only requires 1 call from the replica manager val replicaManager = EasyMock.createMock(classOf[ReplicaManager]) EasyMock.expect(replicaManager.config).andReturn(configs.head) - EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient) - EasyMock.expect(logManager.config).andReturn(configs.head) EasyMock.replay(replicaManager) - EasyMock.replay(logManager) - EasyMock.replay(kafkaZookeeper) // create a topic metadata request val topicMetadataRequest = new TopicMetadataRequest(List(topic)) @@ -119,8 +112,7 @@ // create the kafka request handler val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper, null, - null, null, null, 1) + val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1) // mock the receive API to return the request buffer as created above val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) @@ -135,7 +127,6 @@ val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata // verify the expected calls to log manager occurred in the right order - EasyMock.verify(kafkaZookeeper) EasyMock.verify(receivedRequest) topicMetadata Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (revision 1374135) +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (working copy) @@ -154,7 +154,7 @@ Assert.assertEquals(request.correlationId, response.correlationId) Assert.assertEquals(response.errors.length, response.offsets.length) Assert.assertEquals(3, response.errors.length) - response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _)) + response.errors.foreach(Assert.assertEquals(ErrorMapping.InvalidPartitionCode.toShort, _)) response.offsets.foreach(Assert.assertEquals(-1L, _)) // #2 - test that we get correct offsets when partition is owned by broker @@ -176,7 +176,7 @@ Assert.assertEquals(messages.sizeInBytes, response2.offsets(2)) // the middle message should have been rejected because broker doesn't lead partition - Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, response2.errors(1)) + Assert.assertEquals(ErrorMapping.InvalidPartitionCode.toShort, response2.errors(1)) Assert.assertEquals(-1, response2.offsets(1)) } @@ -212,28 +212,4 @@ // make sure we don't wait fewer than timeoutMs for a response Assert.assertTrue((t2-t1) >= timeoutMs) } - - @Test - def testProduceRequestForUnknownTopic() { - val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) - props.put("buffer.size", "102400") - props.put("connect.timeout.ms", "300") - props.put("reconnect.interval", "500") - props.put("max.message.size", "100") - - val producer = new SyncProducer(new SyncProducerConfig(props)) - val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) - - val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1) - val response = producer.send(request) - - Assert.assertNotNull(response) - Assert.assertEquals(request.correlationId, response.correlationId) - Assert.assertEquals(response.errors.length, response.offsets.length) - Assert.assertEquals(3, response.errors.length) - response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _)) - } } Index: core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (revision 1374135) +++ core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (working copy) @@ -23,8 +23,7 @@ import org.easymock.EasyMock import kafka.log.Log import org.junit.Assert._ -import org.I0Itec.zkclient.ZkClient -import kafka.utils.{KafkaScheduler, Time, MockTime, TestUtils} +import kafka.utils._ class ISRExpirationTest extends JUnit3Suite { @@ -37,35 +36,24 @@ def testISRExpirationForStuckFollowers() { val time = new MockTime - // create leader replica - val log = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log.logEndOffset).andReturn(5L).times(12) - EasyMock.replay(log) + val log = getLogWithLogEndOffset(15L) // set logEndOffset for leader to 15L - // add one partition - val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 5L) + // create one partition and all replicas + val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head, log) assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get - // set remote replicas leo to something low, like 2 - (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 2)) - time.sleep(150) - leaderReplica.logEndOffset(Some(5L)) + // let the follower catch up to 10 + (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10) + var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) + assertEquals("no replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) - var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) - assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) - - // add all replicas back to the ISR - partition0.inSyncReplicas ++= partition0.assignedReplicas() - assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) - - leaderReplica.logEndOffset(Some(5L)) - // let the follower catch up only upto 3 - (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 3)) + // let some time pass time.sleep(150) - // now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't + + // now follower (broker id 1) has caught up to only 10, while the leader is at 15 AND the follower hasn't // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck - partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) + partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) EasyMock.verify(log) } @@ -73,104 +61,41 @@ def testISRExpirationForSlowFollowers() { val time = new MockTime // create leader replica - val log = getLogWithHW(15L) + val log = getLogWithLogEndOffset(15L) // add one partition - val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 15L) + val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head, log) assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get // set remote replicas leo to something low, like 4 - (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 4)) + (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 4L) - time.sleep(150) - leaderReplica.logEndOffset(Some(15L)) - time.sleep(10) - (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffset(Some(4))) - - val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) + // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than + // replicaMaxLagBytes, the follower is out of sync. + val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) EasyMock.verify(log) } - def testISRExpirationForMultiplePartitions() { - val time = new MockTime - // mock zkclient - val zkClient = EasyMock.createMock(classOf[ZkClient]) - EasyMock.replay(zkClient) - // create kafka scheduler - val scheduler = new KafkaScheduler(2) - // create replica manager - val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, null) - try { - val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet) - // create leader log - val log0 = getLogWithHW(5L) + private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, config: KafkaConfig, + localLog: Log): Partition = { + val leaderId=config.brokerId + val replicaManager = new ReplicaManager(config, time, null, null, null) + val partition = replicaManager.getOrCreatePartition(topic, partitionId) + val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) - // create leader and follower replicas - val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet) - val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0) - - partition0.inSyncReplicas = Set(followerReplicaPartition0, leaderReplicaPartition0) - // set the leader and its hw and the hw update time - partition0.leaderId(Some(configs.head.brokerId)) - partition0.leaderHW(Some(5L)) - - // set the leo for non-leader replicas to something low - (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLeo(r, 2)) - - val log1 = getLogWithHW(15L) - // create leader and follower replicas for partition 1 - val partition1 = replicaManager.getOrCreatePartition(topic, 1, configs.map(_.brokerId).toSet) - val leaderReplicaPartition1 = replicaManager.addLocalReplica(topic, 1, log1, configs.map(_.brokerId).toSet) - val followerReplicaPartition1 = replicaManager.addRemoteReplica(topic, 1, configs.last.brokerId, partition0) - - partition1.inSyncReplicas = Set(followerReplicaPartition1, leaderReplicaPartition1) - // set the leader and its hw and the hw update time - partition1.leaderId(Some(configs.head.brokerId)) - partition1.leaderHW(Some(15L)) - - // set the leo for non-leader replicas to something low - (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLeo(r, 4)) - - time.sleep(150) - leaderReplicaPartition0.logEndOffset(Some(4L)) - leaderReplicaPartition1.logEndOffset(Some(4L)) - time.sleep(10) - (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffset(Some(4L))) - - val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) - assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) - - val partition1OSR = partition1.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) - assertEquals("Replica 0 should be out of sync", Set(configs.last.brokerId), partition1OSR.map(_.brokerId)) - - EasyMock.verify(log0) - EasyMock.verify(log1) - }catch { - case e => e.printStackTrace() - }finally { - replicaManager.shutdown() - } - } - - private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, - localLog: Log, leaderHW: Long): Partition = { - val partition = new Partition(topic, partitionId, time) - val leaderReplica = new Replica(leaderId, partition, topic, time, Some(leaderHW), Some(localLog)) - val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica - partition.assignedReplicas(Some(allReplicas.toSet)) + allReplicas.foreach(r => partition.addReplicaIfNotExist(r)) // set in sync replicas for this partition to all the assigned replicas partition.inSyncReplicas = allReplicas.toSet // set the leader and its hw and the hw update time - partition.leaderId(Some(leaderId)) - partition.leaderHW(Some(leaderHW)) + partition.leaderReplicaId = Some(leaderId) partition } - private def getLogWithHW(hw: Long): Log = { + private def getLogWithLogEndOffset(logEndOffset: Long): Log = { val log1 = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6) + EasyMock.expect(log1.logEndOffset).andReturn(logEndOffset).anyTimes() EasyMock.replay(log1) log1 @@ -178,7 +103,7 @@ private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = { configs.filter(_.brokerId != leaderId).map { config => - new Replica(config.brokerId, partition, topic, time) + new Replica(config.brokerId, partition, time) } } } \ No newline at end of file Index: core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (revision 1374135) +++ core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (working copy) @@ -8,7 +8,6 @@ import kafka.zk.ZooKeeperTestHarness import kafka.message.Message import kafka.producer.{ProducerConfig, ProducerData, Producer} -import org.junit.Test class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -41,7 +40,6 @@ var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] - @Test def testHWCheckpointNoFailuresSingleLogSegment { // start both servers server1 = TestUtils.createServer(configProps1) @@ -63,15 +61,18 @@ assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) sendMessages(2) - // don't wait for follower to read the leader's hw - // shutdown the servers to allow the hw to be checkpointed - servers.map(server => server.shutdown()) + + // give some time for the follower 1 to record leader HW of 60 + assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => + server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000)) + + servers.foreach(server => server.replicaManager.checkpointHighWatermarks()) producer.close() val leaderHW = hwFile1.read(topic, 0) assertEquals(60L, leaderHW) val followerHW = hwFile2.read(topic, 0) - assertEquals(30L, followerHW) - servers.map(server => Utils.rm(server.config.logDir)) + assertEquals(60L, followerHW) + servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDir)}) } def testHWCheckpointWithFailuresSingleLogSegment { @@ -124,13 +125,13 @@ sendMessages() // give some time for follower 1 to record leader HW of 60 assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000)) + server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000)) // shutdown the servers to allow the hw to be checkpointed - servers.map(server => server.shutdown()) + servers.foreach(server => server.shutdown()) producer.close() assertEquals(60L, hwFile1.read(topic, 0)) assertEquals(60L, hwFile2.read(topic, 0)) - servers.map(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDir)) } def testHWCheckpointNoFailuresMultipleLogSegments { @@ -166,15 +167,15 @@ sendMessages(20) // give some time for follower 1 to record leader HW of 600 assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server2.getReplica(topic, 0).get.highWatermark() == 600L, 1000)) + server2.replicaManager.getReplica(topic, 0).get.highWatermark == 600L, 1000)) // shutdown the servers to allow the hw to be checkpointed - servers.map(server => server.shutdown()) + servers.foreach(server => server.shutdown()) producer.close() val leaderHW = hwFile1.read(topic, 0) assertEquals(600L, leaderHW) val followerHW = hwFile2.read(topic, 0) assertEquals(600L, followerHW) - servers.map(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDir)) } def testHWCheckpointWithFailuresMultipleLogSegments { @@ -211,7 +212,7 @@ sendMessages(2) // allow some time for the follower to get the leader HW assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000)) + server2.replicaManager.getReplica(topic, 0).get.highWatermark == 60L, 1000)) // kill the server hosting the preferred replica server1.shutdown() server2.shutdown() @@ -234,13 +235,13 @@ sendMessages(2) // allow some time for the follower to get the leader HW assertTrue("Failed to update highwatermark for follower after 1000 ms", TestUtils.waitUntilTrue(() => - server1.getReplica(topic, 0).get.highWatermark() == 120L, 1000)) + server1.replicaManager.getReplica(topic, 0).get.highWatermark == 120L, 1000)) // shutdown the servers to allow the hw to be checkpointed - servers.map(server => server.shutdown()) + servers.foreach(server => server.shutdown()) producer.close() assertEquals(120L, hwFile1.read(topic, 0)) assertEquals(120L, hwFile2.read(topic, 0)) - servers.map(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDir)) } private def sendMessages(numMessages: Int = 1) { Index: core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (revision 1374135) +++ core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (working copy) @@ -21,8 +21,9 @@ import org.scalatest.junit.JUnit3Suite import org.easymock.EasyMock import org.junit.Assert._ -import kafka.utils.{KafkaScheduler, TestUtils, MockTime} import kafka.common.KafkaException +import kafka.cluster.Replica +import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime} class HighwatermarkPersistenceTest extends JUnit3Suite { @@ -41,33 +42,31 @@ // create replica manager val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null) replicaManager.startup() - replicaManager.startHighWaterMarksCheckPointThread() - // sleep until flush ms - Thread.sleep(configs.head.defaultFlushIntervalMs) - var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) + replicaManager.checkpointHighWatermarks() + var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0) assertEquals(0L, fooPartition0Hw) - val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet) + val partition0 = replicaManager.getOrCreatePartition(topic, 0) // create leader log val log0 = getMockLog // create leader and follower replicas - val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet) - val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0) - replicaManager.checkpointHighwaterMarks() - fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) - assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw) + val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0)) + partition0.addReplicaIfNotExist(leaderReplicaPartition0) + val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime) + partition0.addReplicaIfNotExist(followerReplicaPartition0) + replicaManager.checkpointHighWatermarks() + fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0) + assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) try { - followerReplicaPartition0.highWatermark() + followerReplicaPartition0.highWatermark fail("Should fail with IllegalStateException") }catch { case e: KafkaException => // this is ok } - // set the leader - partition0.leaderId(Some(leaderReplicaPartition0.brokerId)) // set the highwatermark for local replica - partition0.leaderHW(Some(5L)) - replicaManager.checkpointHighwaterMarks() - fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) - assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw) + partition0.getReplica().get.highWatermark = 5L + replicaManager.checkpointHighWatermarks() + fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0) + assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) EasyMock.verify(zkClient) EasyMock.verify(log0) } @@ -84,48 +83,46 @@ // create replica manager val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null) replicaManager.startup() - replicaManager.checkpointHighwaterMarks() - var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) + replicaManager.checkpointHighWatermarks() + var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) assertEquals(0L, topic1Partition0Hw) - val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet) + val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0) // create leader log val topic1Log0 = getMockLog - // create leader and follower replicas - val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0, configs.map(_.brokerId).toSet) - replicaManager.checkpointHighwaterMarks() - topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) - assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw) - // set the leader - topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId)) + // create a local replica for topic1 + val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0)) + topic1Partition0.addReplicaIfNotExist(leaderReplicaTopic1Partition0) + replicaManager.checkpointHighWatermarks() + topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) + assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw) // set the highwatermark for local replica - topic1Partition0.leaderHW(Some(5L)) - replicaManager.checkpointHighwaterMarks() - topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) - assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark()) + topic1Partition0.getReplica().get.highWatermark = 5L + replicaManager.checkpointHighWatermarks() + topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) + assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark) assertEquals(5L, topic1Partition0Hw) // add another partition and set highwatermark - val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, configs.map(_.brokerId).toSet) + val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0) // create leader log val topic2Log0 = getMockLog - // create leader and follower replicas - val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0, configs.map(_.brokerId).toSet) - replicaManager.checkpointHighwaterMarks() - var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0) - assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw) - // set the leader - topic2Partition0.leaderId(Some(leaderReplicaTopic2Partition0.brokerId)) + // create a local replica for topic2 + val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0)) + topic2Partition0.addReplicaIfNotExist(leaderReplicaTopic2Partition0) + replicaManager.checkpointHighWatermarks() + var topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0) + assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw) // set the highwatermark for local replica - topic2Partition0.leaderHW(Some(15L)) - assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark()) + topic2Partition0.getReplica().get.highWatermark = 15L + assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark) // change the highwatermark for topic1 - topic1Partition0.leaderHW(Some(10L)) - assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark()) - replicaManager.checkpointHighwaterMarks() + topic1Partition0.getReplica().get.highWatermark = 10L + assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark) + replicaManager.checkpointHighWatermarks() // verify checkpointed hw for topic 2 - topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0) + topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0) assertEquals(15L, topic2Partition0Hw) // verify checkpointed hw for topic 1 - topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) + topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) assertEquals(10L, topic1Partition0Hw) EasyMock.verify(zkClient) EasyMock.verify(topic1Log0) Index: core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala =================================================================== --- core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala (revision 1374135) +++ core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala (working copy) @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer - -import junit.framework.Assert._ -import org.junit.Test -import org.scalatest.junit.JUnitSuite -import kafka.cluster.Partition - - -class TopicCountTest extends JUnitSuite { -/* - @Test - def testBasic() { - val consumer = "conusmer1" - val json = """{ "topic1" : 2, "topic2" : 3 }""" - val topicCount = TopicCount.constructTopicCount(consumer, json) - val topicCountMap = Map( - "topic1" -> 2, - "topic2" -> 3 - ) - val expectedTopicCount = new TopicCount(consumer, topicCountMap) - assertTrue(expectedTopicCount == topicCount) - - val topicCount2 = TopicCount.constructTopicCount(consumer, expectedTopicCount.toJsonString) - assertTrue(expectedTopicCount == topicCount2) - } -*/ - @Test - def testPartition() { - assertTrue(new Partition("foo", 10).equals(new Partition("foo", 10))) - assertTrue(!new Partition("foo", 1).equals(new Partition("foo", 0))) - } -} Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision 1374135) +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -258,33 +258,13 @@ } def testCompressionSetConsumption() { - val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler]) - requestHandlerLogger.setLevel(Level.FATAL) - // send some messages to each broker val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) val sentMessages = (sentMessages1 ++ sentMessages2).sortWith((s,t) => s.checksum < t.checksum) - // test consumer timeout logic - val consumerConfig0 = new ConsumerConfig( - TestUtils.createConsumerProperties(zkConnect, group, consumer0)) { - override val consumerTimeoutMs = 5000 - } - val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true) - val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1)) - getMessages(100, topicMessageStreams0) - - // also check partition ownership - val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) - val expected_1 = List( ("0", "group1_consumer0-0"), - ("1", "group1_consumer0-0")) - assertEquals(expected_1, actual_1) - - zkConsumerConnector0.shutdown - // at this point, only some part of the message set was consumed. So consumed offset should still be 0 - // also fetched offset should be 0 - val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true) + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) + val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1)) val receivedMessages = getMessages(400, topicMessageStreams1) val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum) @@ -298,8 +278,6 @@ assertEquals(expected_2, actual_2) zkConsumerConnector1.shutdown - - requestHandlerLogger.setLevel(Level.ERROR) } def testConsumerDecoder() { Index: core/src/main/scala/kafka/cluster/Replica.scala =================================================================== --- core/src/main/scala/kafka/cluster/Replica.scala (revision 1374135) +++ core/src/main/scala/kafka/cluster/Replica.scala (working copy) @@ -20,36 +20,41 @@ import kafka.log.Log import kafka.utils.{SystemTime, Time, Logging} import kafka.common.KafkaException +import kafka.server.ReplicaManager +import java.util.concurrent.atomic.AtomicLong class Replica(val brokerId: Int, val partition: Partition, - val topic: String, time: Time = SystemTime, - var hw: Option[Long] = None, - var log: Option[Log] = None) extends Logging { - private var logEndOffset: Long = -1L - private var logEndOffsetUpdateTimeMs: Long = -1L + initialHighWatermarkValue: Long = 0L, + val log: Option[Log] = None) extends Logging { + //only defined in local replica + private[this] var highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue) + // only used for remote replica; logEndOffsetValue for local replica is kept in log + private[this] var logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset) + private var logEndOffsetUpdateTimeMs: Long = time.milliseconds + val topic = partition.topic + val partitionId = partition.partitionId - def logEndOffset(newLeo: Option[Long] = None): Long = { - isLocal match { - case true => - newLeo match { - case Some(newOffset) => logEndOffsetUpdateTimeMs = time.milliseconds; newOffset - case None => log.get.logEndOffset - } - case false => - newLeo match { - case Some(newOffset) => - logEndOffset = newOffset - logEndOffsetUpdateTimeMs = time.milliseconds - trace("Setting log end offset for replica %d for topic %s partition %d to %d" - .format(brokerId, topic, partition.partitionId, logEndOffset)) - logEndOffset - case None => logEndOffset - } - } + def logEndOffset_=(newLogEndOffset: Long) = { + if (!isLocal) { + logEndOffsetValue.set(newLogEndOffset) + logEndOffsetUpdateTimeMs = time.milliseconds + trace("Setting log end offset for replica %d for topic %s partition %d to %d" + .format(brokerId, topic, partitionId, logEndOffsetValue)) + } else + throw new KafkaException("shouldn't set logEndOffset for replica %d topic %s partition %d since it's local" + .format(brokerId, topic, partitionId)) + } + def logEndOffset = { + if (isLocal) + log.get.logEndOffset + else + logEndOffsetValue.get() + } + def isLocal: Boolean = { log match { case Some(l) => true @@ -59,34 +64,24 @@ def logEndOffsetUpdateTime = logEndOffsetUpdateTimeMs - def highWatermark(highwaterMarkOpt: Option[Long] = None): Long = { - highwaterMarkOpt match { - case Some(highwaterMark) => - isLocal match { - case true => - trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId, - brokerId, highwaterMark)) - hw = Some(highwaterMark) - highwaterMark - case false => throw new KafkaException("Unable to set highwatermark for topic %s ".format(topic) + - "partition %d on broker %d, since there is no local log for this partition" - .format(partition.partitionId, brokerId)) - } - case None => - isLocal match { - case true => - hw match { - case Some(highWatermarkValue) => highWatermarkValue - case None => throw new KafkaException("HighWatermark does not exist for topic %s ".format(topic) + - " partition %d on broker %d but local log exists".format(partition.partitionId, brokerId)) - } - case false => throw new KafkaException("Unable to get highwatermark for topic %s ".format(topic) + - "partition %d on broker %d, since there is no local log for this partition" - .format(partition.partitionId, brokerId)) - } - } + def highWatermark_=(newHighWatermark: Long) { + if (isLocal) { + trace("Setting hw for replica %d topic %s partition %d on broker %d to %d" + .format(brokerId, topic, partitionId, newHighWatermark)) + highWatermarkValue.set(newHighWatermark) + } else + throw new KafkaException("Unable to set highwatermark for replica %d topic %s partition %d since it's not local" + .format(brokerId, topic, partitionId)) } + def highWatermark = { + if (isLocal) + highWatermarkValue.get() + else + throw new KafkaException("Unable to get highwatermark for replica %d topic %s partition %d since it's not local" + .format(brokerId, topic, partitionId)) + } + override def equals(that: Any): Boolean = { if(!(that.isInstanceOf[Replica])) return false @@ -107,7 +102,7 @@ replicaString.append("; Topic: " + topic) replicaString.append("; Partition: " + partition.toString) replicaString.append("; isLocal: " + isLocal) - if(isLocal) replicaString.append("; Highwatermark: " + highWatermark()) + if(isLocal) replicaString.append("; Highwatermark: " + highWatermark) replicaString.toString() } } Index: core/src/main/scala/kafka/cluster/Partition.scala =================================================================== --- core/src/main/scala/kafka/cluster/Partition.scala (revision 1374135) +++ core/src/main/scala/kafka/cluster/Partition.scala (working copy) @@ -16,89 +16,210 @@ */ package kafka.cluster -import java.util.concurrent.locks.ReentrantLock import scala.collection._ -import kafka.utils.{ZkUtils, SystemTime, Time} -import kafka.common.{KafkaException, LeaderNotAvailableException} -import org.I0Itec.zkclient.ZkClient -import kafka.utils.Logging +import kafka.utils._ +import java.lang.Object +import kafka.api.LeaderAndISR +import kafka.server.ReplicaManager +import kafka.common.ErrorMapping /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR */ class Partition(val topic: String, val partitionId: Int, - time: Time = SystemTime, - var inSyncReplicas: Set[Replica] = Set.empty[Replica]) extends Logging { - private var leaderReplicaId: Option[Int] = None - private var assignedReplicas: Set[Replica] = Set.empty[Replica] - private var highWatermarkUpdateTime: Long = -1L - private val leaderISRUpdateLock = new ReentrantLock() + time: Time, + val replicaManager: ReplicaManager) extends Logging { + private val localBrokerId = replicaManager.config.brokerId + private val logManager = replicaManager.logManager + private val replicaFetcherManager = replicaManager.replicaFetcherManager + private val highwaterMarkCheckpoint = replicaManager.highWatermarkCheckpoint + private val zkClient = replicaManager.zkClient + var leaderReplicaId: Option[Int] = None + var inSyncReplicas: Set[Replica] = Set.empty[Replica] + private val assignedReplicaMap = new Pool[Int,Replica] + private val leaderISRUpdateLock = new Object - def leaderId(newLeader: Option[Int] = None): Option[Int] = { - try { - leaderISRUpdateLock.lock() - if(newLeader.isDefined) { - info("Updating leader for topic %s partition %d to replica %d".format(topic, partitionId, newLeader.get)) - leaderReplicaId = newLeader + private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) + + def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = { + var retReplica : Replica = null + val replicaOpt = getReplica(replicaId) + if (!replicaOpt.isDefined) { + if (isReplicaLocal(replicaId)) { + val log = logManager.getOrCreateLog(topic, partitionId) + val localReplica = new Replica(replicaId, this, time, + highwaterMarkCheckpoint.read(topic, partitionId), Some(log)) + addReplicaIfNotExist(localReplica) } - leaderReplicaId - }finally { - leaderISRUpdateLock.unlock() - } + else { + val remoteReplica = new Replica(replicaId, this, time) + addReplicaIfNotExist(remoteReplica) + } + retReplica = getReplica(replicaId).get + } else + retReplica = replicaOpt.get + + retReplica } - def assignedReplicas(replicas: Option[Set[Replica]] = None): Set[Replica] = { - replicas match { - case Some(ar) => - assignedReplicas = ar - case None => + def getReplica(replicaId: Int = localBrokerId): Option[Replica] = { + val replica = assignedReplicaMap.get(replicaId) + if (replica == null) + None + else + Some(replica) + } + + def leaderReplicaIfLocal(): Option[Replica] = { + leaderISRUpdateLock synchronized { + if (leaderReplicaId.isDefined && leaderReplicaId.get == localBrokerId) + getReplica(localBrokerId) + else + None } - assignedReplicas } - def getReplica(replicaId: Int): Option[Replica] = { - assignedReplicas().find(_.brokerId == replicaId) + def addReplicaIfNotExist(replica: Replica) = { + assignedReplicaMap.putIfNotExists(replica.brokerId, replica) } - def addReplica(replica: Replica): Boolean = { - if(!assignedReplicas.contains(replica)) { - assignedReplicas += replica - true - }else false + def assignedReplicas(): Set[Replica] = { + assignedReplicaMap.values.toSet } - def updateReplicaLeo(replica: Replica, leo: Long) { - replica.logEndOffset(Some(leo)) - debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId)) + /** + * If the local replica is not the leader, make the local replica the leader in the following steps. + * 1. stop the existing replica fetcher + * 2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available) + * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) + * 4. set the new leader and ISR + */ + def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = { + leaderISRUpdateLock synchronized { + if (!leaderReplicaId.isDefined || leaderReplicaId.get != replicaManager.config.brokerId) { + info("becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) + // stop replica fetcher thread, if any + replicaFetcherManager.removeFetcher(topic, partitionId) + + val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet + // reset LogEndOffset for remote replicas + assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) + inSyncReplicas = newInSyncReplicas + leaderReplicaId = Some(localBrokerId) + true + } else + false + } } - def leaderReplica(): Replica = { - val leaderReplicaId = leaderId() - if(leaderReplicaId.isDefined) { - val leaderReplica = assignedReplicas().find(_.brokerId == leaderReplicaId.get) - if(leaderReplica.isDefined) leaderReplica.get - else throw new KafkaException("No replica for leader %d in the replica manager" - .format(leaderReplicaId.get)) - }else - throw new LeaderNotAvailableException("Leader for topic %s partition %d does not exist" - .format(topic, partitionId)) + /** + * If the local replica is not already following the new leader, make it follow the new leader. + * 1. stop any existing fetcher on this partition from the local replica + * 2. make sure local replica exists and truncate the log to high watermark + * 3. set the leader and set ISR to empty + * 4. start a fetcher to the new leader + */ + def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR): Boolean = { + leaderISRUpdateLock synchronized { + val newLeaderBrokerId: Int = leaderAndISR.leader + info("starting the follower state transition to follow leader %d for topic %s partition %d" + .format(newLeaderBrokerId, topic, partitionId)) + val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(newLeaderBrokerId)).head + val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(topic, partitionId) + // become follower only if it is not already following the same leader + if( currentLeaderBroker == None || currentLeaderBroker.get != newLeaderBrokerId) { + info("becoming follower to leader %d for topic %s partition %d".format(newLeaderBrokerId, topic, partitionId)) + // stop fetcher thread to previous leader + replicaFetcherManager.removeFetcher(topic, partitionId) + + // make sure local replica exists + val localReplica = getOrCreateReplica() + localReplica.log.get.truncateTo(localReplica.highWatermark) + inSyncReplicas = Set.empty[Replica] + leaderReplicaId = Some(newLeaderBrokerId) + + // start fetcher thread to current leader + replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) + true + } else + false + } } - def leaderHW(newHw: Option[Long] = None): Long = { - newHw match { - case Some(highWatermark) => - leaderReplica().highWatermark(newHw) - highWatermarkUpdateTime = time.milliseconds - highWatermark - case None => - leaderReplica().highWatermark() + def updateLeaderHWAndMaybeExpandISR(replicaId: Int) { + leaderISRUpdateLock synchronized { + val leaderReplicaOpt = leaderReplicaIfLocal() + if (leaderReplicaOpt.isDefined) { + val replica = getReplica(replicaId).get + val leaderReplica = leaderReplicaOpt.get + val leaderHW = leaderReplica.highWatermark + if (replica.logEndOffset >= leaderHW) { + // expand ISR + val newInSyncReplicas = inSyncReplicas + replica + info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) + // update ISR in ZK and cache + updateISR(newInSyncReplicas) + } + maybeIncrementLeaderHW(leaderReplica) + } } } - def hwUpdateTime: Long = highWatermarkUpdateTime + def checkEnoughReplicasReachAnOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { + leaderISRUpdateLock synchronized { + val leaderReplicaOpt = leaderReplicaIfLocal() + if (leaderReplicaOpt.isDefined) { + val numAcks = inSyncReplicas.count(r => { + if (!r.isLocal) + r.logEndOffset >= requiredOffset + else + true /* also count the local (leader) replica */ + }) + trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) + if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) || + (requiredAcks > 0 && numAcks >= requiredAcks)) { + /* + * requiredAcks < 0 means acknowledge after all replicas in ISR + * are fully caught up to the (local) leader's offset + * corresponding to this produce request. + */ + (true, ErrorMapping.NoError) + } else + (false, ErrorMapping.NoError) + } else + (false, ErrorMapping.NotLeaderForPartitionCode) + } + } + + private def maybeIncrementLeaderHW(leaderReplica: Replica) { + val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) + val newHighWatermark = allLogEndOffsets.min + val oldHighWatermark = leaderReplica.highWatermark + if(newHighWatermark > oldHighWatermark) + leaderReplica.highWatermark = newHighWatermark + else + debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s" + .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(","))) + } - def getOutOfSyncReplicas(keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = { + def maybeShrinkISR(replicaMaxLagTimeMs: Long, replicaMaxLagBytes: Long) { + leaderISRUpdateLock synchronized { + val leaderReplicaOpt = leaderReplicaIfLocal() + if (leaderReplicaOpt.isDefined) { + val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplicaOpt.get, replicaMaxLagTimeMs, replicaMaxLagBytes) + if(outOfSyncReplicas.size > 0) { + val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas + assert(newInSyncReplicas.size > 0) + info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) + // update ISR in zk and in cache + updateISR(newInSyncReplicas) + } + } + } + } + + def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = { /** * there are two cases that need to be handled here - * 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated @@ -107,42 +228,27 @@ * follower is not catching up and should be removed from the ISR **/ // Case 1 above - val possiblyStuckReplicas = inSyncReplicas.filter(r => r.logEndOffset() < leaderReplica().logEndOffset()) - info("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId, + val possiblyStuckReplicas = inSyncReplicas.filter(r => r.logEndOffset < leaderReplica.logEndOffset) + debug("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId, possiblyStuckReplicas.map(_.brokerId).mkString(","))) val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime < (time.milliseconds - keepInSyncTimeMs)) - info("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) - val leader = leaderReplica() + debug("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above - val slowReplicas = inSyncReplicas.filter(r => (leader.logEndOffset() - r.logEndOffset()) > keepInSyncBytes) - info("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) + val slowReplicas = inSyncReplicas.filter(r => r.logEndOffset >= 0 && (leaderReplica.logEndOffset - r.logEndOffset) > keepInSyncBytes) + debug("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) stuckReplicas ++ slowReplicas } - def updateISR(newISR: Set[Int], zkClientOpt: Option[ZkClient] = None) { - try{ - leaderISRUpdateLock.lock() - // update partition's ISR in cache - inSyncReplicas = newISR.map {r => - getReplica(r) match { - case Some(replica) => replica - case None => throw new KafkaException("ISR update failed. No replica for id %d".format(r)) - } - } - info("Updated ISR for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(","))) - if(zkClientOpt.isDefined){ - val zkClient = zkClientOpt.get - val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId) - curLeaderAndISR match { - case None => - throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId)) - case Some(m) => - m.ISR = newISR.toList - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString) - } - } - } finally { - leaderISRUpdateLock.unlock() + private def updateISR(newISR: Set[Replica]) { + info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(","))) + inSyncReplicas = newISR + val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId) + curLeaderAndISR match { + case None => + throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId)) + case Some(m) => + m.ISR = newISR.map(r => r.brokerId).toList + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString) } } @@ -163,8 +269,8 @@ val partitionString = new StringBuilder partitionString.append("Topic: " + topic) partitionString.append("; Partition: " + partitionId) - partitionString.append("; Leader: " + leaderId()) - partitionString.append("; Assigned replicas: " + assignedReplicas().map(_.brokerId).mkString(",")) + partitionString.append("; Leader: " + leaderReplicaId) + partitionString.append("; Assigned replicas: " + assignedReplicaMap.keys.mkString(",")) partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(",")) partitionString.toString() } Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala =================================================================== --- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (revision 1374135) +++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (working copy) @@ -21,7 +21,6 @@ import kafka.common.KafkaException import kafka.utils.{Logging, Utils} import kafka.common.ErrorMapping -import kafka.cluster.{Replica, Partition} class BrokerPartitionInfo(producerConfig: ProducerConfig, @@ -37,7 +36,7 @@ * @return a sequence of (brokerId, numPartitions). Returns a zero-length * sequence if no brokers are available. */ - def getBrokerPartitionInfo(topic: String): Seq[Partition] = { + def getBrokerPartitionInfo(topic: String): Seq[PartitionAndLeader] = { debug("Getting broker partition info for topic %s".format(topic)) // check if the cache has metadata for this topic val topicMetadata = topicPartitionInfo.get(topic) @@ -55,16 +54,13 @@ } val partitionMetadata = metadata.partitionsMetadata partitionMetadata.map { m => - val partition = new Partition(topic, m.partitionId) m.leader match { case Some(leader) => - val leaderReplica = new Replica(leader.id, partition, topic) - partition.leaderId(Some(leaderReplica.brokerId)) debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id)) - partition + new PartitionAndLeader(topic, m.partitionId, Some(leader.id)) case None => debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId)) - partition + new PartitionAndLeader(topic, m.partitionId, None) } }.sortWith((s, t) => s.partitionId < t.partitionId) } @@ -113,3 +109,5 @@ } } } + +case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerId: Option[Int]) Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1374135) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -17,7 +17,6 @@ package kafka.producer.async -import kafka.cluster.Partition import kafka.common._ import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} import kafka.producer._ @@ -105,7 +104,7 @@ val brokerPartition = topicPartitionsList(partitionIndex) // postpone the failure until the send operation, so that requests for other brokers are handled correctly - val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1) + val leaderBrokerId = brokerPartition.leaderBrokerId.getOrElse(-1) var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null ret.get(leaderBrokerId) match { @@ -135,7 +134,7 @@ } } - private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = { + private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[PartitionAndLeader] = { debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic) debug("Broker partitions registered for topic: %s are %s" Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaZooKeeper.scala (revision 1374135) +++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala (working copy) @@ -17,7 +17,6 @@ package kafka.server -import java.net.InetAddress import kafka.utils._ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkStateListener, ZkClient} @@ -25,8 +24,7 @@ /** - * Handles the server's interaction with zookeeper. The server needs to register the following paths: - * /topics/[topic]/[node_id-partition_num] + * Handles registering broker with zookeeper in the following path: * /brokers/[0...N] --> host:port */ class KafkaZooKeeper(config: KafkaConfig) extends Logging { @@ -82,28 +80,6 @@ } } - private def doesTopicExistInCluster(topic: String) : Boolean = { - val allTopics = ZkUtils.getAllTopics(zkClient) - trace("all topics, %s, topic %s".format(allTopics, topic)) - allTopics.contains(topic) - } - - def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) { - if(!doesTopicExistInCluster(topic)) - throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic)) - // check if partition id is invalid - if(partition < 0) - throw new InvalidPartitionException("Partition %d is invalid".format(partition)) - ZkUtils.getLeaderForPartition(zkClient, topic, partition) match { - case Some(leader) => - if(leader != config.brokerId) - throw new LeaderNotAvailableException("Broker %d is not leader for partition %d for topic %s" - .format(config.brokerId, partition, topic)) - case None => - throw new LeaderNotAvailableException("There is no leader for topic %s partition %d".format(topic, partition)) - } - } - def getZookeeperClient = { zkClient } Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1374135) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -23,12 +23,8 @@ import kafka.utils._ import java.util.concurrent._ import atomic.AtomicBoolean -import kafka.cluster.Replica -import kafka.api.LeaderAndISR -import scala.collection._ import org.I0Itec.zkclient.ZkClient - /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. @@ -94,11 +90,10 @@ info("Connecting to ZK: " + config.zkConnect) - replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, deleteLog) + replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager) kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient) - apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper, - addReplica, stopReplica, makeLeader, makeFollower, config.brokerId) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) Mx4jLoader.maybeLoad @@ -147,31 +142,6 @@ */ def awaitShutdown(): Unit = shutdownLatch.await() - def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = { - val log = logManager.getOrCreateLog(topic, partition) - replicaManager.addLocalReplica(topic, partition, log, assignedReplicas) - } - - def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = { - replicaManager.makeLeader(replica, leaderAndISR) - } - - def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = { - replicaManager.makeFollower(replica, leaderAndISR) - } - - def getReplica(topic: String, partition: Int): Option[Replica] = - replicaManager.getReplica(topic, partition) - - def stopReplica(topic: String, partition: Int): Short = { - replicaManager.stopReplica(topic, partition) - } - - def deleteLog(topic: String, partition: Int): Unit = { - /* TODO: handle deleteLog in a better way */ - //logManager.deleteLog(topic, partition) - } - def getLogManager(): LogManager = logManager def getStats(): SocketServerStats = socketServer.stats Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (revision 1374135) +++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (working copy) @@ -33,15 +33,15 @@ val replica = replicaMgr.getReplica(topic, partitionId).get val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - if (fetchOffset != replica.logEndOffset()) - throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset())) + if (fetchOffset != replica.logEndOffset) + throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset)) trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d".format(replica.brokerId, - replica.logEndOffset(), messageSet.sizeInBytes, partitionData.hw)) + replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw)) replica.log.get.append(messageSet) trace("Follower %d has replica log end offset %d after appending %d messages" - .format(replica.brokerId, replica.logEndOffset(), messageSet.sizeInBytes)) - val followerHighWatermark = replica.logEndOffset().min(partitionData.hw) - replica.highWatermark(Some(followerHighWatermark)) + .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes)) + val followerHighWatermark = replica.logEndOffset.min(partitionData.hw) + replica.highWatermark = followerHighWatermark trace("Follower %d set replica highwatermark for topic %s partition %d to %d" .format(replica.brokerId, topic, partitionId, followerHighWatermark)) } Index: core/src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1374135) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -16,74 +16,49 @@ */ package kafka.server -import kafka.log.Log import kafka.cluster.{Partition, Replica} import collection._ -import mutable.ListBuffer import org.I0Itec.zkclient.ZkClient -import java.util.concurrent.locks.ReentrantLock -import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging} -import kafka.api.LeaderAndISR import java.util.concurrent.atomic.AtomicBoolean -import kafka.common.{BrokerNotExistException, KafkaException, ErrorMapping, InvalidPartitionException} +import kafka.utils._ +import kafka.log.LogManager +import kafka.api.{LeaderAndISRRequest, LeaderAndISR} +import kafka.common.{InvalidPartitionException, LeaderNotAvailableException, ErrorMapping} +object ReplicaManager { + val UnknownLogEndOffset = -1L +} -class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, deleteLocalLog: (String, Int) => Unit) extends Logging { +class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, + val logManager: LogManager) extends Logging { + private val allPartitions = new Pool[(String, Int), Partition] + private var leaderReplicas = new mutable.HashSet[Partition]() + private val leaderReplicasLock = new Object + val replicaFetcherManager = new ReplicaFetcherManager(config, this) + this.logIdent = "Replica Manager on Broker " + config.brokerId + ": " - var allPartitions = new mutable.HashMap[(String, Int), Partition]() - private var leaderReplicas = new ListBuffer[Partition]() - private val leaderReplicaLock = new ReentrantLock() - private val replicaFetcherManager = new ReplicaFetcherManager(config, this) - this.logIdent = "Replica Manager on Broker " + config.brokerId + ", " + private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) + val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) + info("Created highwatermark file %s".format(highWatermarkCheckpoint.name)) - val hwCheckPointThreadStarted = new AtomicBoolean(false) - private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) - info("Created highwatermark file %s".format(highwaterMarkCheckpoint.name)) - def startHighWaterMarksCheckPointThread() = { - if(hwCheckPointThreadStarted.compareAndSet(false, true)) - kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs) + if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) + kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs) } def startup() { - // start the highwatermark checkpoint thread // start ISR expiration thread kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs) } - def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = { - val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds) - var retReplica : Replica = null - val replicaOpt = partition.getReplica(config.brokerId) - replicaOpt match { - case Some(replica) => - info("changing remote replica %s into a local replica".format(replica.toString)) - replica.log match { - case None => - replica.log = Some(log) - case Some(log) => // nothing to do since log already exists - } - retReplica = replica - case None => - val localReplica = new Replica(config.brokerId, partition, topic, time, - Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log)) - partition.addReplica(localReplica) - info("adding local replica %d for topic %s partition %s on broker %d".format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId)) - retReplica = localReplica - } - val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get) - partition.assignedReplicas(Some(assignedReplicas)) - // get the replica objects for the assigned replicas for this partition - retReplica - } - def stopReplica(topic: String, partition: Int): Short = { trace("handling stop replica for partition [%s, %d]".format(topic, partition)) val errorCode = ErrorMapping.NoError val replica = getReplica(topic, partition) if(replica.isDefined){ replicaFetcherManager.removeFetcher(topic, partition) - deleteLocalLog(topic, partition) + /* TODO: handle deleteLog in a better way */ + //logManager.deleteLog(topic, partition) allPartitions.remove((topic, partition)) info("after removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partition, allPartitions)) } @@ -91,238 +66,156 @@ errorCode } - - def getOrCreatePartition(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = { - val newPartition = allPartitions.contains((topic, partitionId)) - newPartition match { - case true => // partition exists, do nothing - allPartitions.get((topic, partitionId)).get - case false => // create remote replicas for each replica id in assignedReplicas - val partition = new Partition(topic, partitionId, time) - allPartitions += (topic, partitionId) -> partition - (assignedReplicaIds - config.brokerId).foreach( - replicaId => addRemoteReplica(topic, partitionId, replicaId, partition)) - partition + def getOrCreatePartition(topic: String, partitionId: Int): Partition = { + var partition = allPartitions.get((topic, partitionId)) + if (partition == null) { + allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this)) + partition = allPartitions.get((topic, partitionId)) } + partition } - def ensurePartitionExists(topic: String, partitionId: Int): Partition = { - val partitionOpt = allPartitions.get((topic, partitionId)) + def getPartition(topic: String, partitionId: Int): Option[Partition] = { + val partition = allPartitions.get((topic, partitionId)) + if (partition == null) + None + else + Some(partition) + } + + def leaderReplicaIfLocalOrException(topic: String, partitionId: Int): Replica = { + val partitionOpt = getPartition(topic, partitionId) partitionOpt match { - case Some(partition) => partition case None => - throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d".format(topic, partitionId, config.brokerId)) + throw new InvalidPartitionException("topic %s partition %d doesn't exist on %d".format(topic, partitionId, config.brokerId)) + case Some(partition) => + val leaderReplica = partition.leaderReplicaIfLocal + if (!leaderReplica.isDefined) + throw new LeaderNotAvailableException("Leader not local for topic %s partition %d on broker %d" + .format(topic, partitionId, config.brokerId)) + else + leaderReplica.get } } - def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = { - val remoteReplica = new Replica(replicaId, partition, topic, time) - - val replicaAdded = partition.addReplica(remoteReplica) - if(replicaAdded) - info("added remote replica %d for topic %s partition %s".format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId)) - remoteReplica + def getOrCreateReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Replica = { + getOrCreatePartition(topic, partitionId).getOrCreateReplica(replicaId) } - def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = { - val partitionOpt = allPartitions.get((topic, partitionId)) + def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = { + val partitionOpt = getPartition(topic, partitionId) partitionOpt match { - case Some(partition) => - partition.getReplica(replicaId) - case None => - None + case None => None + case Some(partition) => partition.getReplica(replicaId) } } - def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = { - val replicasOpt = allPartitions.get((topic, partitionId)) - replicasOpt match { - case Some(replicas) => - Some(replicas.leaderReplica()) - case None => - throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " + - "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId)) + def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndISRRequest): collection.Map[(String, Int), Short] = { + info("handling leader and isr request %s".format(leaderAndISRRequest)) + val responseMap = new collection.mutable.HashMap[(String, Int), Short] + + for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){ + var errorCode = ErrorMapping.NoError + val topic = partitionInfo._1 + val partitionId = partitionInfo._2 + + val requestedLeaderId = leaderAndISR.leader + try { + if(requestedLeaderId == config.brokerId) + makeLeader(topic, partitionId, leaderAndISR) + else + makeFollower(topic, partitionId, leaderAndISR) + } catch { + case e => + error("error processing leaderAndISR request %s".format(leaderAndISRRequest), e) + errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + } + responseMap.put(partitionInfo, errorCode) } - } - def getPartition(topic: String, partitionId: Int): Option[Partition] = - allPartitions.get((topic, partitionId)) + /** + * If IsInit flag is on, this means that the controller wants to treat topics not in the request + * as deleted. + */ + if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){ + startHighWaterMarksCheckPointThread + val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1) + info("init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove)) + partitionsToRemove.foreach(p => stopReplica(p._1, p._2)) + } - private def updateReplicaLeo(replica: Replica, fetchOffset: Long) { - // set the replica leo - val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId) - partition.updateReplicaLeo(replica, fetchOffset) + responseMap } - private def maybeIncrementLeaderHW(replica: Replica) { - // set the replica leo - val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId) - // set the leader HW to min of the leo of all replicas - val allLeos = partition.inSyncReplicas.map(_.logEndOffset()) - val newHw = allLeos.min - val oldHw = partition.leaderHW() - if(newHw > oldHw) { - partition.leaderHW(Some(newHw)) - }else - debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s".format(replica.topic, - replica.partition.partitionId, oldHw, newHw, allLeos.mkString(","))) - } - - def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = { - info("becoming Leader for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId)) - info("started the leader state transition for topic %s partition %d" - .format(replica.topic, replica.partition.partitionId)) - try { - // read and cache the ISR - replica.partition.leaderId(Some(replica.brokerId)) - replica.partition.updateISR(leaderAndISR.ISR.toSet) - // stop replica fetcher thread, if any - replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) + private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = { + info("becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) + val partition = getOrCreatePartition(topic, partitionId) + if (partition.makeLeader(topic, partitionId, leaderAndISR)) { // also add this partition to the list of partitions for which the leader is the current broker - leaderReplicaLock.lock() - leaderReplicas += replica.partition - info("completed the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId)) - ErrorMapping.NoError - }catch { - case e => error("failed to complete the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId), e) - ErrorMapping.UnknownCode - /* TODO: add specific error code */ - }finally { - leaderReplicaLock.unlock() + leaderReplicasLock synchronized { + leaderReplicas += partition + } } + info("completed the leader state transition for topic %s partition %d".format(topic, partitionId)) } - - def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = { + private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) { val leaderBrokerId: Int = leaderAndISR.leader info("starting the follower state transition to follow leader %d for topic %s partition %d" - .format(leaderBrokerId, replica.topic, replica.partition.partitionId)) - try { - // set the leader for this partition correctly on this broker - replica.partition.leaderId(Some(leaderBrokerId)) - replica.log match { - case Some(log) => // log is already started - log.truncateTo(replica.highWatermark()) - case None => - } - debug("for partition [%s, %d], the leaderBroker is [%d]".format(replica.topic, replica.partition.partitionId, leaderAndISR.leader)) - // get leader for this replica - val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head - val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId) - // become follower only if it is not already following the same leader - if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) { - info("becoming follower to leader %d for topic %s partition %d".format(leaderBrokerId, replica.topic, replica.partition.partitionId)) - // stop fetcher thread to previous leader - replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) - // start fetcher thread to current leader - replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker) - } + .format(leaderBrokerId, topic, partitionId)) + + val partition = getOrCreatePartition(topic, partitionId) + if (partition.makeFollower(topic, partitionId, leaderAndISR)) { // remove this replica's partition from the ISR expiration queue - leaderReplicaLock.lock() - leaderReplicas -= replica.partition - info("completed the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId)) - ErrorMapping.NoError - } catch { - case e: BrokerNotExistException => - error("failed to complete the follower state transition to follow leader %d for topic %s partition %d because the leader broker does not exist in the cluster".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e) - ErrorMapping.BrokerNotExistInZookeeperCode - case e => - error("failed to complete the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e) - ErrorMapping.UnknownCode - }finally { - leaderReplicaLock.unlock() + leaderReplicasLock synchronized { + leaderReplicas -= partition + } } } private def maybeShrinkISR(): Unit = { - try { - info("evaluating ISR list of partitions to see which replicas can be removed from the ISR") - leaderReplicaLock.lock() - leaderReplicas.foreach(partition => { - val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes) - if(outOfSyncReplicas.size > 0) { - val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas - assert(newInSyncReplicas.size > 0) - info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) - // update ISR in zk and in memory - partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient)) - } - }) - }catch { - case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1) - }finally { - leaderReplicaLock.unlock() + trace("evaluating ISR list of partitions to see which replicas can be removed from the ISR") + leaderReplicasLock synchronized { + leaderReplicas.foreach(partition => partition.maybeShrinkISR(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)) } } - private def checkIfISRCanBeExpanded(replica: Replica): Boolean = { - val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId) - if(partition.inSyncReplicas.contains(replica)) false - else if(partition.assignedReplicas().contains(replica)) { - val leaderHW = partition.leaderHW() - replica.logEndOffset() >= leaderHW - } - else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) + " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId)) - } + def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = { + val replica = getOrCreateReplica(topic, partitionId, replicaId) + replica.logEndOffset = offset + debug("Recording follower %d position %d for topic %s partition %d".format(replicaId, offset, topic, partitionId)) - def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = { - val replicaOpt = getReplica(topic, partition, replicaId) - replicaOpt match { - case Some(replica) => - updateReplicaLeo(replica, offset) - // check if this replica needs to be added to the ISR - if(checkIfISRCanBeExpanded(replica)) { - val newISR = replica.partition.inSyncReplicas + replica - // update ISR in ZK and cache - replica.partition.updateISR(newISR.map(_.brokerId), Some(zkClient)) - } - debug("Recording follower %d position %d for topic %s partition %d".format(replicaId, offset, topic, partition)) - maybeIncrementLeaderHW(replica) - case None => - throw new KafkaException("No replica %d in replica manager on %d".format(replicaId, config.brokerId)) - } + // check if this replica needs to be added to the ISR + val partition = getOrCreatePartition(topic, partitionId) + partition.updateLeaderHWAndMaybeExpandISR(replicaId) } - def recordLeaderLogEndOffset(topic: String, partition: Int, logEndOffset: Long) = { - val replicaOpt = getReplica(topic, partition, config.brokerId) - replicaOpt match { - case Some(replica) => replica.logEndOffset(Some(logEndOffset)) - case None => - throw new KafkaException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId)) - } - } - /** * Flushes the highwatermark value for all partitions to the highwatermark file */ - def checkpointHighwaterMarks() { - val highwaterMarksForAllPartitions = allPartitions.map + def checkpointHighWatermarks() { + val highWaterarksForAllPartitions = allPartitions.map { partition => val topic = partition._1._1 val partitionId = partition._1._2 val localReplicaOpt = partition._2.getReplica(config.brokerId) val hw = localReplicaOpt match { - case Some(localReplica) => localReplica.highWatermark() + case Some(localReplica) => localReplica.highWatermark case None => - error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) + " Replica metadata doesn't exist") + error("highwatermark for topic %s partition %d doesn't exist during checkpointing" + .format(topic, partitionId)) 0L } (topic, partitionId) -> hw }.toMap - highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions) - info("Checkpointed high watermark data: %s".format(highwaterMarksForAllPartitions)) + highWatermarkCheckpoint.write(highWaterarksForAllPartitions) + trace("Checkpointed high watermark data: %s".format(highWaterarksForAllPartitions)) } - /** - * Reads the checkpointed highWatermarks for all partitions - * @return checkpointed value of highwatermark for topic, partition. If one doesn't exist, returns 0 - */ - def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition) - def shutdown() { info("shut down") replicaFetcherManager.shutdown() - checkpointHighwaterMarks() + checkpointHighWatermarks() info("shuttedd down completely") } } Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1374135) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -21,10 +21,9 @@ import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.api._ import kafka.common._ -import kafka.log._ import kafka.message._ import kafka.network._ -import kafka.utils.{ZkUtils, Pool, SystemTime, Logging} +import kafka.utils.{Pool, SystemTime, Logging} import org.apache.log4j.Logger import scala.collection._ import mutable.HashMap @@ -32,18 +31,14 @@ import kafka.network.RequestChannel.Response import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup -import kafka.cluster.Replica +import org.I0Itec.zkclient.ZkClient - /** * Logic to handle the various Kafka requests */ -class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, - val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper, - addReplicaCbk: (String, Int, Set[Int]) => Replica, - stopReplicaCbk: (String, Int) => Short, - becomeLeader: (Replica, LeaderAndISR) => Short, - becomeFollower: (Replica, LeaderAndISR) => Short, +class KafkaApis(val requestChannel: RequestChannel, + val replicaManager: ReplicaManager, + val zkClient: ZkClient, brokerId: Int) extends Logging { private val metricsGroup = brokerId.toString @@ -70,52 +65,13 @@ } } - def handleLeaderAndISRRequest(request: RequestChannel.Request){ - val responseMap = new HashMap[(String, Int), Short] val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer) if(requestLogger.isTraceEnabled) requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest) trace("Handling leader and isr request " + leaderAndISRRequest) - for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){ - var errorCode = ErrorMapping.NoError - val topic = partitionInfo._1 - val partition = partitionInfo._2 - - // If the partition does not exist locally, create it - if(replicaManager.getPartition(topic, partition) == None){ - trace("The partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica".format(topic, partition)) - val assignedReplicas = ZkUtils.getReplicasForPartition(kafkaZookeeper.getZookeeperClient, topic, partition) - trace("Assigned replicas list for topic [%s] partition [%d] is [%s]".format(topic, partition, assignedReplicas.toString)) - if(assignedReplicas.contains(brokerId)) { - val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet) - info("Starting replica for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId)) - } - } - val replica = replicaManager.getReplica(topic, partition).get - // The command ask this broker to be new leader for P and it isn't the leader yet - val requestedLeaderId = leaderAndISR.leader - // If the broker is requested to be the leader and it's not current the leader (the leader id is set and not equal to broker id) - if(requestedLeaderId == brokerId && (!replica.partition.leaderId().isDefined || replica.partition.leaderId().get != brokerId)){ - info("Becoming the leader for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest)) - errorCode = becomeLeader(replica, leaderAndISR) - } - else if (requestedLeaderId != brokerId) { - info("Becoming the follower for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest)) - errorCode = becomeFollower(replica, leaderAndISR) - } - - responseMap.put(partitionInfo, errorCode) - } - - if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){ - replicaManager.startHighWaterMarksCheckPointThread - val partitionsToRemove = replicaManager.allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).keySet - info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove)) - partitionsToRemove.foreach(p => stopReplicaCbk(p._1, p._2)) - } - + val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest) val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse))) } @@ -129,9 +85,9 @@ val responseMap = new HashMap[(String, Int), Short] - for((topic, partition) <- stopReplicaRequest.stopReplicaSet){ - val errorCode = stopReplicaCbk(topic, partition) - responseMap.put((topic, partition), errorCode) + for((topic, partitionId) <- stopReplicaRequest.stopReplicaSet){ + val errorCode = replicaManager.stopReplica(topic, partitionId) + responseMap.put((topic, partitionId), errorCode) } val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) @@ -225,10 +181,9 @@ BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes) try { - kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition) - val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition) + val localReplica = replicaManager.leaderReplicaIfLocalOrException(topicData.topic, partitionData.partition) + val log = localReplica.log.get log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) - replicaManager.recordLeaderLogEndOffset(topicData.topic, partitionData.partition, log.logEndOffset) offsets(msgIndex) = log.logEndOffset errors(msgIndex) = ErrorMapping.NoError.toShort trace("%d bytes written to logs, nextAppendOffset = %d" @@ -313,9 +268,9 @@ for(offsetDetail <- fetchRequest.offsetInfo) { for(i <- 0 until offsetDetail.partitions.size) { try { - val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i)) - val available = maybeLog match { - case Some(log) => max(0, log.logEndOffset - offsetDetail.offsets(i)) + val localReplica = replicaManager.getReplica(offsetDetail.topic, offsetDetail.partitions(i)) + val available = localReplica match { + case Some(replica) => max(0, replica.log.get.logEndOffset - offsetDetail.offsets(i)) case None => 0 } totalBytes += math.min(offsetDetail.fetchSizes(i), available) @@ -337,8 +292,7 @@ val topic = offsetDetail.topic val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets) for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) { - replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset, - kafkaZookeeper.getZookeeperClient) + replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset) } } } @@ -354,7 +308,7 @@ val info = new mutable.ArrayBuffer[PartitionData]() val topic = offsetDetail.topic val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) - for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ){ + for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match { case Left(err) => BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest @@ -367,23 +321,14 @@ case Right(messages) => BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes) - val leaderReplicaOpt = replicaManager.getReplica(topic, partition, brokerId) - assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d must exist on leader broker %d".format(topic, partition, brokerId)) - val leaderReplica = leaderReplicaOpt.get - fetchRequest.replicaId match { - case FetchRequest.NonFollowerId => - // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas - new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) - case _ => // fetch request from a follower - val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId) - assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, brokerId)) - val replica = replicaOpt.get - debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]" - .format(replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) - debug("Leader returning %d messages for topic %s partition %d to follower %d" - .format(messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) - new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) + val leaderReplica = replicaManager.getReplica(topic, partition).get + if (fetchRequest.replicaId != FetchRequest.NonFollowerId) { + debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]" + .format(topic, partition, fetchRequest.replicaId)) + debug("Leader returning %d messages for topic %s partition %d to follower %d" + .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId)) } + new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark, messages) } info.append(partitionInfo) } @@ -399,10 +344,10 @@ var response: Either[Short, MessageSet] = null try { // check if the current broker is the leader for the partitions - kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition) + val localReplica = replicaManager.leaderReplicaIfLocalOrException(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val log = logManager.getLog(topic, partition) - response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty }) + val log = localReplica.log.get + response = Right(log.read(offset, maxSize)) } catch { case e => error("error when processing request " + (topic, partition, offset, maxSize), e) @@ -422,8 +367,9 @@ var response: OffsetResponse = null try { - kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition) - val offsets = logManager.getOffsets(offsetRequest) + // ensure leader exists + replicaManager.leaderReplicaIfLocalOrException(offsetRequest.topic, offsetRequest.partition) + val offsets = replicaManager.logManager.getOffsets(offsetRequest) response = new OffsetResponse(offsetRequest.versionId, offsets) }catch { case ioe: IOException => @@ -446,10 +392,8 @@ trace("Handling topic metadata request " + metadataRequest.toString()) val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() - val zkClient = kafkaZookeeper.getZookeeperClient var errorCode = ErrorMapping.NoError - val config = logManager.config - + val config = replicaManager.config try { val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient) metadataRequest.topics.zip(topicMetadataList).foreach( @@ -604,52 +548,23 @@ trace("Checking producer request satisfaction for %s-%d, acksPending = %b" .format(topic, partitionId, fetchPartitionStatus.acksPending)) if (fetchPartitionStatus.acksPending) { - val leaderReplica = replicaManager.getLeaderReplica(topic, partitionId) - leaderReplica match { - case Some(leader) => { - if (leader.isLocal) { - val isr = leader.partition.inSyncReplicas - val numAcks = isr.count(r => { - if (!r.isLocal) { - r.logEndOffset() >= partitionStatus(key).requiredOffset - } - else - true /* also count the local (leader) replica */ - }) - - trace("Received %d/%d acks for producer request to %s-%d; isr size = %d".format( - numAcks, produce.requiredAcks, - topic, partitionId, isr.size)) - if ((produce.requiredAcks < 0 && numAcks >= isr.size) || - (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) { - /* - * requiredAcks < 0 means acknowledge after all replicas in ISR - * are fully caught up to the (local) leader's offset - * corresponding to this produce request. - */ - - fetchPartitionStatus.acksPending = false - fetchPartitionStatus.error = ErrorMapping.NoError - val topicData = - produce.data.find(_.topic == topic).get - val partitionData = - topicData.partitionDataArray.find(_.partition == partitionId).get - delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key, - durationNs, - partitionData.sizeInBytes) - maybeUnblockDelayedFetchRequests( - topic, Array(partitionData)) - } - } - else { - debug("Broker not leader for %s-%d".format(topic, partitionId)) - fetchPartitionStatus.setThisBrokerNotLeader() - } - } - case None => - debug("Broker not leader for %s-%d".format(topic, partitionId)) - fetchPartitionStatus.setThisBrokerNotLeader() + val partition = replicaManager.getOrCreatePartition(topic, partitionId) + val (hasEnough, errorCode) = partition.checkEnoughReplicasReachAnOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks) + if (errorCode != ErrorMapping.NoError) { + fetchPartitionStatus.acksPending = false + fetchPartitionStatus.error = errorCode + } else if (hasEnough) { + fetchPartitionStatus.acksPending = false + fetchPartitionStatus.error = ErrorMapping.NoError } + if (!fetchPartitionStatus.acksPending) { + val topicData = produce.data.find(_.topic == topic).get + val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get + delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key, + durationNs, + partitionData.sizeInBytes) + maybeUnblockDelayedFetchRequests(topic, Array(partitionData)) + } } // unblocked if there are no partitions with pending acks Index: core/src/main/scala/kafka/api/LeaderAndISRResponse.scala =================================================================== --- core/src/main/scala/kafka/api/LeaderAndISRResponse.scala (revision 1374135) +++ core/src/main/scala/kafka/api/LeaderAndISRResponse.scala (working copy) @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import kafka.utils.Utils import collection.mutable.HashMap -import collection.mutable.Map +import collection.Map object LeaderAndISRResponse {