diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index cc3b03c..971de48 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -255,17 +255,23 @@ class KafkaApis(val requestChannel: RequestChannel, var totalBytes = 0L for(offsetDetail <- fetchRequest.offsetInfo) { for(i <- 0 until offsetDetail.partitions.size) { + debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i))) try { - debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i))) - val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i)) - val available = maybeLog match { - case Some(log) => max(0, log.logEndOffset - offsetDetail.offsets(i)) + val available = replicaManager.getLeaderReplica(offsetDetail.topic, offsetDetail.partitions(i)) match { + case Some(leader) => + val end = if(fetchRequest.replicaId == FetchRequest.NonFollowerId) { + leader.highWatermark() + } else { + leader.log.map(_.logEndOffset).getOrElse(0L) + } + max(0, end - offsetDetail.offsets(i)) case None => 0 } totalBytes += math.min(offsetDetail.fetchSizes(i), available) } catch { - case e: InvalidPartitionException => - info("Invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'") + case e => + info("No leader topic %s partition %s on broker %s in fetch request from client %s" + .format(offsetDetail.topic, offsetDetail.partitions(i), logManager.config.brokerId, fetchRequest.clientId)) } } } @@ -297,15 +303,13 @@ class KafkaApis(val requestChannel: RequestChannel, val topic = offsetDetail.topic val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { - val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match { + // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas + val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId + val partitionInfo = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) match { case Left(err) => BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest - fetchRequest.replicaId match { - case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty) - case _ => - new PartitionData(partition, err, offset, -1L, MessageSet.Empty) - } + new PartitionData(partition, err, offset, -1L, MessageSet.Empty) case Right(messages) => BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes) @@ -313,19 +317,18 @@ class KafkaApis(val requestChannel: RequestChannel, assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(logManager.config.brokerId)) val leaderReplica = leaderReplicaOpt.get - fetchRequest.replicaId match { - case FetchRequest.NonFollowerId => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas - new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) - case _ => // fetch request from a follower - val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId) - assert(replicaOpt.isDefined, "No replica %d in replica manager on %d" - .format(fetchRequest.replicaId, replicaManager.config.brokerId)) - val replica = replicaOpt.get - debug("Leader %d for topic %s partition %d received fetch request from follower %d" - .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) - debug("Leader %d returning %d messages for topic %s partition %d to follower %d" - .format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) - new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) + if(!isFetchFromFollower) { + new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) + } else { + val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId) + assert(replicaOpt.isDefined, "No replica %d in replica manager on %d" + .format(fetchRequest.replicaId, replicaManager.config.brokerId)) + val replica = replicaOpt.get + debug("Leader %d for topic %s partition %d received fetch request from follower %d" + .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) + debug("Leader %d returning %d messages for topic %s partition %d to follower %d" + .format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) + new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) } } info.append(partitionInfo) @@ -338,14 +341,26 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Read from a single topic/partition at the given offset */ - private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Short, MessageSet] = { + private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int, fromFollower: Boolean): Either[Short, MessageSet] = { var response: Either[Short, MessageSet] = null try { // check if the current broker is the leader for the partitions kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) + + val leaderReplicaOpt = replicaManager.getLeaderReplica(topic, partition) + assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + + " must exist on leader broker %d".format(logManager.config.brokerId)) + val leader = leaderReplicaOpt.get + + val actualSize = if (!fromFollower) { + min(leader.highWatermark() - offset, maxSize).toInt + } else { + maxSize + } + val log = logManager.getLog(topic, partition) - response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty }) + response = Right(log match { case Some(l) => l.read(offset, actualSize) case None => MessageSet.Empty }) } catch { case e => error("error when processing request " + (topic, partition, offset, maxSize), e) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala new file mode 100644 index 0000000..004e249 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -0,0 +1,233 @@ +package kafka.server + +import java.nio.ByteBuffer +import kafka.api.{FetchResponseSend, FetchResponse, FetchRequest, FetchRequestBuilder} +import kafka.cluster.{Partition, Replica} +import kafka.log.Log +import kafka.message.{ByteBufferMessageSet, Message} +import kafka.network.{BoundedByteBufferSend, BoundedByteBufferReceive, RequestChannel} +import kafka.utils.{Time, TestUtils, MockTime} +import org.easymock.EasyMock +import org.I0Itec.zkclient.ZkClient +import org.junit.Assert +import org.scalatest.junit.JUnit3Suite + +/** + * + * @author pmenon + * @version + */ + +class SimpleFetchTest extends JUnit3Suite { + + val configs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_) { + override val replicaMaxLagTimeMs = 100L + override val replicaMaxLagBytes = 10L + }) + val topic = "foo" + + /** + * Scenario is topic "test-topic" on broker "0" has replica + * with two follows on broker "1" and "2". Leader on "0" + * has HW of "5" and LEO of "20". Broker "1" replica has + * HW matching leader ("5") and LEO of "20", meaning it's in sync. + * Broker "2" replica has HW matching leader ("5") and LEO of "15" meaning + * not insync but still in ISR (hasn't expired from ISR). When a normal + * consumer fetches data, it only sees data to "5" offset. + */ + def testNonReplicaSeesHwWhenFetching() { + /* setup */ + val time = new MockTime + val leo = 20 + val hw = 5 + val messages = new Message("test-message".getBytes()) + + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() + EasyMock.expect(log.read(0, hw)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.replay(log) + + val partition = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, hw) + + val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + EasyMock.expect(replicaManager.getLeaderReplica(topic, 0)).andReturn(Some(partition.leaderReplica())).anyTimes() + EasyMock.expect(replicaManager.getReplica(topic, 0, configs.head.brokerId)).andReturn(Some(partition.leaderReplica())).anyTimes() + EasyMock.replay(replicaManager) + + val kafkaZooKeeper = EasyMock.createMock(classOf[kafka.server.KafkaZooKeeper]) + EasyMock.expect(kafkaZooKeeper.ensurePartitionLeaderOnThisBroker(topic, 0)).once() + EasyMock.replay(kafkaZooKeeper) + + val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes() + EasyMock.replay(logManager) + + val requestChannel = new RequestChannel(2, 5) + val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZooKeeper) + + /* test */ + val bigFetch = new FetchRequestBuilder() + .clientId("test-consumer") + .minBytes(hw*2) + .maxWait(10000) + .replicaId(FetchRequest.NonFollowerId) + .addFetch(topic, 0, 0, 10).build() + val goodFetch = new FetchRequestBuilder() + .clientId("test-consumer") + .minBytes(hw) + .maxWait(10000) + .replicaId(FetchRequest.NonFollowerId) + .addFetch(topic, 0, 0, hw*2).build() + val bigFetchBB = ByteBuffer.allocate(bigFetch.sizeInBytes) + bigFetch.writeTo(bigFetchBB) + bigFetchBB.rewind() + + val goodFetchBB = ByteBuffer.allocate(bigFetch.sizeInBytes) + goodFetch.writeTo(goodFetchBB) + goodFetchBB.rewind() + + val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) + EasyMock.expect(receivedRequest.buffer).andReturn(bigFetchBB) + EasyMock.expect(receivedRequest.buffer).andReturn(goodFetchBB) + EasyMock.replay(receivedRequest) + + // send the request which should be put in purgatory + apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) + + // there should be no response + val noResponse = requestChannel.receiveResponse(0) + Assert.assertNull(noResponse) + + apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, request=receivedRequest, start=1)) + + // there should be no response + val goodResponse = requestChannel.receiveResponse(1) + Assert.assertNotNull(goodResponse) + + val fetchResponse = goodResponse.response.asInstanceOf[FetchResponseSend].fetchResponse + Assert.assertNotNull(fetchResponse) + Assert.assertNotNull(fetchResponse.data) + Assert.assertEquals(1, fetchResponse.data.size) + + val data = fetchResponse.data(0) + Assert.assertNotNull(data.partitionDataArray) + Assert.assertEquals(1, data.partitionDataArray.size) + + val messageResponseData = data.partitionDataArray(0) + Assert.assertEquals(hw, messageResponseData.hw.toInt) + Assert.assertEquals(new ByteBufferMessageSet(messages), messageResponseData.messages.asInstanceOf[ByteBufferMessageSet]) + } + + /** + * Scenario is topic "test-topic" on broker "0" has replica + * with two follows on broker "1" and "2". Leader on "0" + * has HW of "5" and LEO of "20". Broker "1" replica has + * HW matching leader ("5") and LEO of "20", meaning it's in sync. + * Broker "2" replica has HW matching leader ("5") and LEO of "15" meaning + * not insync but still in ISR (hasn't expired from ISR). When a follower + * fetches data, it should see data upto the log end offset ("20"). + */ + def testReplicaSeesLeoWhenFetching() { + /* setup */ + val time = new MockTime + val leo = 20 + val hw = 5 + + val messages = new Message("test-message".getBytes()) + + val followerReplicaId = configs(1).brokerId + val followerOffset = 15 + + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() + EasyMock.expect(log.read(followerOffset, Integer.MAX_VALUE)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.replay(log) + + val partition = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, hw) + + val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + EasyMock.expect(replicaManager.recordFollowerPosition(topic, 0, followerReplicaId, followerOffset, zkClient)) + EasyMock.expect(replicaManager.getReplica(topic, 0, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId)) + EasyMock.expect(replicaManager.getLeaderReplica(topic, 0)).andReturn(Some(partition.leaderReplica())).anyTimes() + EasyMock.expect(replicaManager.getReplica(topic, 0, configs.head.brokerId)).andReturn(Some(partition.leaderReplica())).anyTimes() + EasyMock.replay(replicaManager) + + val kafkaZooKeeper = EasyMock.createMock(classOf[kafka.server.KafkaZooKeeper]) + EasyMock.expect(kafkaZooKeeper.getZookeeperClient).andReturn(zkClient) + EasyMock.expect(kafkaZooKeeper.ensurePartitionLeaderOnThisBroker(topic, 0)).once() + EasyMock.replay(kafkaZooKeeper) + + val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes() + EasyMock.replay(logManager) + + val requestChannel = new RequestChannel(2, 5) + val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZooKeeper) + + /* test */ + val bigFetch = new FetchRequestBuilder() + .clientId("test-consumer") + .minBytes(0) + .maxWait(10000) + .replicaId(followerReplicaId) + .addFetch(topic, 0, followerOffset, Integer.MAX_VALUE).build() + + val bigFetchBB = ByteBuffer.allocate(bigFetch.sizeInBytes) + bigFetch.writeTo(bigFetchBB) + bigFetchBB.rewind() + + val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) + EasyMock.expect(receivedRequest.buffer).andReturn(bigFetchBB) + EasyMock.replay(receivedRequest) + + // send the request + apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) + + // there should be no response + val goodResponse = requestChannel.receiveResponse(0) + Assert.assertNotNull(goodResponse) + + val fetchResponse = goodResponse.response.asInstanceOf[FetchResponseSend].fetchResponse + Assert.assertNotNull(fetchResponse) + Assert.assertNotNull(fetchResponse.data) + Assert.assertEquals(1, fetchResponse.data.size) + + val data = fetchResponse.data(0) + Assert.assertNotNull(data.partitionDataArray) + Assert.assertEquals(1, data.partitionDataArray.size) + + val messageResponseData = data.partitionDataArray(0) + Assert.assertEquals(hw, messageResponseData.hw.toInt) + Assert.assertEquals(new ByteBufferMessageSet(messages), messageResponseData.messages.asInstanceOf[ByteBufferMessageSet]) + } + + private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, + localLog: Log, leaderHW: Long): Partition = { + val partition = new Partition(topic, partitionId, time) + val leaderReplica = new Replica(leaderId, partition, topic, time, Some(leaderHW), Some(localLog)) + + val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica + partition.assignedReplicas(Some(allReplicas.toSet)) + // set in sync replicas for this partition to all the assigned replicas + partition.inSyncReplicas = allReplicas.toSet + // set the leader and its hw and the hw update time + partition.leaderId(Some(leaderId)) + partition.leaderHW(Some(leaderHW)) + partition + } + + private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = { + configs.filter(_.brokerId != leaderId).map { config => + new Replica(config.brokerId, partition, topic, time) + } + } + +}