diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5d5c6d6..619a9df 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -312,17 +312,23 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, var totalBytes = 0L for(offsetDetail <- fetchRequest.offsetInfo) { for(i <- 0 until offsetDetail.partitions.size) { + debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i))) try { - val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i)) - val available = maybeLog match { - case Some(log) => max(0, log.logEndOffset - offsetDetail.offsets(i)) + val available = replicaManager.getLeaderReplica(offsetDetail.topic, offsetDetail.partitions(i)) match { + case Some(leader) => + val end = if(fetchRequest.replicaId == FetchRequest.NonFollowerId) { + leader.highWatermark() + } else { + leader.logEndOffset() + } + max(0, end - offsetDetail.offsets(i)) case None => 0 } totalBytes += math.min(offsetDetail.fetchSizes(i), available) } catch { - case e: InvalidPartitionException => - info("Invalid partition %d in fetch request from client %d." - .format(offsetDetail.partitions(i), fetchRequest.clientId)) + case e => + info("No leader topic %s partition %s on broker %s in fetch request from client %s" + .format(offsetDetail.topic, offsetDetail.partitions(i), logManager.config.brokerId, fetchRequest.clientId)) } } } @@ -354,35 +360,32 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, val info = new mutable.ArrayBuffer[PartitionData]() val topic = offsetDetail.topic val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) - for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ){ - val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match { + for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { + // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas + val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId + val partitionInfo = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) match { case Left(err) => BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest - fetchRequest.replicaId match { - case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty) - case _ => - new PartitionData(partition, err, offset, -1L, MessageSet.Empty) - } + new PartitionData(partition, err, offset, -1L, MessageSet.Empty) case Right(messages) => BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes) val leaderReplicaOpt = replicaManager.getReplica(topic, partition, brokerId) assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d must exist on leader broker %d".format(topic, partition, brokerId)) val leaderReplica = leaderReplicaOpt.get - fetchRequest.replicaId match { - case FetchRequest.NonFollowerId => - // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas - new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) - case _ => // fetch request from a follower - val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId) - assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, brokerId)) - val replica = replicaOpt.get - debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]" - .format(replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) - debug("Leader returning %d messages for topic %s partition %d to follower %d" - .format(messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) - new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) + if(!isFetchFromFollower) { + new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) + } else { + val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId) + assert(replicaOpt.isDefined, "No replica %d in replica manager on %d" + .format(fetchRequest.replicaId, replicaManager.config.brokerId)) + val replica = replicaOpt.get + debug("Leader %d for topic %s partition %d received fetch request from follower %d" + .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) + debug("Leader %d returning %d messages for topic %s partition %d to follower %d" + .format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) + new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) } } info.append(partitionInfo) @@ -393,16 +396,28 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, } /** - * Read from a single topic/partition at the given offset + * Read from a single topic/partition at the given offset upto maxSize bytes */ - private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Short, MessageSet] = { + private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int, fromFollower: Boolean): Either[Short, MessageSet] = { var response: Either[Short, MessageSet] = null try { // check if the current broker is the leader for the partitions kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) + + val leaderReplicaOpt = replicaManager.getLeaderReplica(topic, partition) + assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + + " must exist on leader broker %d".format(logManager.config.brokerId)) + val leader = leaderReplicaOpt.get + + val actualSize = if (!fromFollower) { + min(leader.highWatermark() - offset, maxSize).toInt + } else { + maxSize + } + val log = logManager.getLog(topic, partition) - response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty }) + response = Right(log match { case Some(l) => l.read(offset, actualSize) case None => MessageSet.Empty }) } catch { case e => error("error when processing request " + (topic, partition, offset, maxSize), e) @@ -425,13 +440,14 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition) val offsets = logManager.getOffsets(offsetRequest) response = new OffsetResponse(offsetRequest.versionId, offsets) - }catch { + } catch { case ioe: IOException => fatal("Halting due to unrecoverable I/O error while handling producer request: " + ioe.getMessage, ioe) System.exit(1) case e => warn("Error while responding to offset request", e) - response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort) + response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long], + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort) } requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7710dfe..c4f090f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -68,7 +68,8 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient val localReplica = new Replica(config.brokerId, partition, topic, time, Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log)) partition.addReplica(localReplica) - info("adding local replica %d for topic %s partition %s on broker %d".format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId)) + info("adding local replica %d for topic %s partition %s on broker %d".format( + localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId)) retReplica = localReplica } val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get) @@ -111,7 +112,8 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient partitionOpt match { case Some(partition) => partition case None => - throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d".format(topic, partitionId, config.brokerId)) + throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist " + + "in replica manager on %d".format(topic, partitionId, config.brokerId)) } } @@ -120,7 +122,8 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient val replicaAdded = partition.addReplica(remoteReplica) if(replicaAdded) - info("added remote replica %d for topic %s partition %s".format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId)) + info("added remote replica %d for topic %s partition %s".format(remoteReplica.brokerId, remoteReplica.topic, + remoteReplica.partition.partitionId)) remoteReplica } @@ -141,7 +144,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient Some(replicas.leaderReplica()) case None => throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " + - "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId)) + "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId)) } } @@ -163,15 +166,16 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient val oldHw = partition.leaderHW() if(newHw > oldHw) { partition.leaderHW(Some(newHw)) - }else + } else { debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s".format(replica.topic, - replica.partition.partitionId, oldHw, newHw, allLeos.mkString(","))) + replica.partition.partitionId, oldHw, newHw, allLeos.mkString(","))) + } } def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = { info("becoming Leader for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId)) info("started the leader state transition for topic %s partition %d" - .format(replica.topic, replica.partition.partitionId)) + .format(replica.topic, replica.partition.partitionId)) try { // read and cache the ISR replica.partition.leaderId(Some(replica.brokerId)) @@ -183,11 +187,12 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient leaderReplicas += replica.partition info("completed the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId)) ErrorMapping.NoError - }catch { - case e => error("failed to complete the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId), e) + } catch { + case e => error("failed to complete the leader state transition for topic %s partition %d".format( + replica.topic, replica.partition.partitionId), e) ErrorMapping.UnknownCode /* TODO: add specific error code */ - }finally { + } finally { leaderReplicaLock.unlock() } } @@ -196,7 +201,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = { val leaderBrokerId: Int = leaderAndISR.leader info("starting the follower state transition to follow leader %d for topic %s partition %d" - .format(leaderBrokerId, replica.topic, replica.partition.partitionId)) + .format(leaderBrokerId, replica.topic, replica.partition.partitionId)) try { // set the leader for this partition correctly on this broker replica.partition.leaderId(Some(leaderBrokerId)) @@ -205,13 +210,15 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient log.truncateTo(replica.highWatermark()) case None => } - debug("for partition [%s, %d], the leaderBroker is [%d]".format(replica.topic, replica.partition.partitionId, leaderAndISR.leader)) + debug("for partition [%s, %d], the leaderBroker is [%d]" + .format(replica.topic, replica.partition.partitionId, leaderAndISR.leader)) // get leader for this replica val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId) // become follower only if it is not already following the same leader if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) { - info("becoming follower to leader %d for topic %s partition %d".format(leaderBrokerId, replica.topic, replica.partition.partitionId)) + info("becoming follower to leader %d for topic %s partition %d" + .format(leaderBrokerId, replica.topic, replica.partition.partitionId)) // stop fetcher thread to previous leader replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) // start fetcher thread to current leader @@ -220,16 +227,18 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient // remove this replica's partition from the ISR expiration queue leaderReplicaLock.lock() leaderReplicas -= replica.partition - info("completed the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId)) + info("completed the follower state transition to follow leader %d for topic %s partition %d" + .format(leaderAndISR.leader, replica.topic, replica.partition.partitionId)) ErrorMapping.NoError } catch { case e: BrokerNotExistException => error("failed to complete the follower state transition to follow leader %d for topic %s partition %d because the leader broker does not exist in the cluster".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e) ErrorMapping.BrokerNotExistInZookeeperCode case e => - error("failed to complete the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e) + error("failed to complete the follower state transition to follow leader %d for topic %s partition %d".format( + leaderAndISR.leader, replica.topic, replica.partition.partitionId), e) ErrorMapping.UnknownCode - }finally { + } finally { leaderReplicaLock.unlock() } } @@ -243,14 +252,15 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient if(outOfSyncReplicas.size > 0) { val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.size > 0) - info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) + info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId, + newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in memory partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient)) } }) - }catch { + } catch { case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1) - }finally { + } finally { leaderReplicaLock.unlock() } } @@ -262,7 +272,8 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient val leaderHW = partition.leaderHW() replica.logEndOffset() >= leaderHW } - else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) + " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId)) + else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) + + " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId)) } def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = { @@ -286,7 +297,11 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient def recordLeaderLogEndOffset(topic: String, partition: Int, logEndOffset: Long) = { val replicaOpt = getReplica(topic, partition, config.brokerId) replicaOpt match { - case Some(replica) => replica.logEndOffset(Some(logEndOffset)) + case Some(replica) => + replica.logEndOffset(Some(logEndOffset)) + if(replica.partition.assignedReplicas().size <= 1) { + replica.highWatermark(Some(logEndOffset)) + } case None => throw new KafkaException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId)) } @@ -296,19 +311,19 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient * Flushes the highwatermark value for all partitions to the highwatermark file */ def checkpointHighwaterMarks() { - val highwaterMarksForAllPartitions = allPartitions.map - { partition => - val topic = partition._1._1 - val partitionId = partition._1._2 - val localReplicaOpt = partition._2.getReplica(config.brokerId) - val hw = localReplicaOpt match { - case Some(localReplica) => localReplica.highWatermark() - case None => - error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) + " Replica metadata doesn't exist") - 0L - } - (topic, partitionId) -> hw - }.toMap + val highwaterMarksForAllPartitions = allPartitions.map{ partition => + val topic = partition._1._1 + val partitionId = partition._1._2 + val localReplicaOpt = partition._2.getReplica(config.brokerId) + val hw = localReplicaOpt match { + case Some(localReplica) => localReplica.highWatermark() + case None => + error("Error while checkpointing highwatermark for topic %s partition %d. Replica metadata doesn't exist" + .format(topic, partitionId)) + 0L + } + (topic, partitionId) -> hw + }.toMap highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions) info("Checkpointed high watermark data: %s".format(highwaterMarksForAllPartitions)) } diff --git a/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala b/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala index 255fdb6..851daa9 100644 --- a/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala +++ b/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala @@ -20,18 +20,17 @@ package kafka.integration import junit.framework.Assert._ import java.util.Properties +import kafka.admin.CreateTopicCommand import kafka.api.{FetchRequestBuilder, OffsetRequest} import kafka.consumer.SimpleConsumer import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.scalatest.junit.JUnit3Suite -import kafka.admin.CreateTopicCommand class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness { val topic = "MagicByte0" - val group = "default_group" - val testConsumer = "consumer" + val partition = 0 val kafkaProps = new Properties val host = "localhost" val port = TestUtils.choosePort @@ -57,15 +56,16 @@ class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness // test for reading data with magic byte 0 def testProtocolVersion0() { CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) - val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 500) + val lastOffset = simpleConsumer.getOffsetsBefore(topic, partition, OffsetRequest.LatestTime, 1) var fetchOffset: Long = 0L var messageCount: Int = 0 while(fetchOffset < lastOffset(0)) { - val fetched = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, fetchOffset, 10000).build()) + val fetched = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, fetchOffset, 10000).build()) val fetchedMessages = fetched.messageSet(topic, 0) - fetchedMessages.foreach(m => fetchOffset = m.offset) + fetchOffset = fetchedMessages.last.offset +// fetchedMessages.foreach(m => fetchOffset = m.offset) messageCount += fetchedMessages.size } assertEquals(100, messageCount) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala new file mode 100644 index 0000000..e1812aa --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.nio.ByteBuffer +import kafka.api.{FetchRequest, FetchRequestBuilder} +import kafka.cluster.{Partition, Replica} +import kafka.log.Log +import kafka.message.{ByteBufferMessageSet, Message} +import kafka.network.{BoundedByteBufferReceive, RequestChannel} +import kafka.utils.{Time, TestUtils, MockTime} +import org.easymock.EasyMock +import org.I0Itec.zkclient.ZkClient +import org.scalatest.junit.JUnit3Suite + +class SimpleFetchTest extends JUnit3Suite { + + val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { + override val replicaMaxLagTimeMs = 100L + override val replicaMaxLagBytes = 10L + }) + val topic = "foo" + val partitionId = 0 + + /** + * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has + * one partition with one follower replica on broker "1". The leader replica on "0" + * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica + * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync + * but is still in ISR (hasn't yet expired from ISR). + * + * When a normal consumer fetches data, it only should only see data upto the HW of the leader, + * in this case up an offset of "5". + */ + def testNonReplicaSeesHwWhenFetching() { + /* setup */ + val time = new MockTime + val leo = 20 + val hw = 5 + val messages = new Message("test-message".getBytes()) + + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() + EasyMock.expect(log.read(0, hw)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.replay(log) + + val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw) + partition.getReplica(configs(1).brokerId).get.logEndOffset(Some(leo-5L)) + + val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + EasyMock.expect(replicaManager.getLeaderReplica(topic, partitionId)) + .andReturn(Some(partition.leaderReplica())).anyTimes() + EasyMock.expect(replicaManager.getReplica(topic, partitionId, configs.head.brokerId)) + .andReturn(Some(partition.leaderReplica())).anyTimes() + EasyMock.replay(replicaManager) + + val kafkaZooKeeper = EasyMock.createMock(classOf[kafka.server.KafkaZooKeeper]) + EasyMock.expect(kafkaZooKeeper.ensurePartitionLeaderOnThisBroker(topic, partitionId)).once() + EasyMock.replay(kafkaZooKeeper) + + val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes() + EasyMock.replay(logManager) + + // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary) + // don't provide replica or leader callbacks since they will not be tested here + val requestChannel = new RequestChannel(2, 5) + val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZooKeeper, + null, null, null, null, configs.head.brokerId) + + // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log + val goodFetch = new FetchRequestBuilder() + .replicaId(FetchRequest.NonFollowerId) + .addFetch(topic, partitionId, 0, hw*2) + .build() + val goodFetchBB = ByteBuffer.allocate(goodFetch.sizeInBytes) + goodFetch.writeTo(goodFetchBB) + goodFetchBB.rewind() + + val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) + EasyMock.expect(receivedRequest.buffer).andReturn(goodFetchBB) + EasyMock.replay(receivedRequest) + + // send the request + apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, request=receivedRequest, start=1)) + + // make sure the log only reads bytes between 0->HW (5) + EasyMock.verify(log) + } + + /** + * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has + * one partition with one follower replica on broker "1". The leader replica on "0" + * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica + * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync + * but is still in ISR (hasn't yet expired from ISR). + * + * When the follower from broker "1" fetches data, it should see data upto the log end offset ("20") + */ + def testReplicaSeesLeoWhenFetching() { + /* setup */ + val time = new MockTime + val leo = 20 + val hw = 5 + + val messages = new Message("test-message".getBytes()) + + val followerReplicaId = configs(1).brokerId + val followerLEO = 15 + + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() + EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.replay(log) + + val partition = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, hw) + partition.getReplica(followerReplicaId).get.logEndOffset(Some(followerLEO.asInstanceOf[Long])) + + val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + EasyMock.expect(replicaManager.recordFollowerPosition(topic, 0, followerReplicaId, followerLEO, zkClient)) + EasyMock.expect(replicaManager.getReplica(topic, 0, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId)) + EasyMock.expect(replicaManager.getLeaderReplica(topic, 0)).andReturn(Some(partition.leaderReplica())).anyTimes() + EasyMock.expect(replicaManager.getReplica(topic, 0, configs.head.brokerId)).andReturn(Some(partition.leaderReplica())).anyTimes() + EasyMock.replay(replicaManager) + + val kafkaZooKeeper = EasyMock.createMock(classOf[kafka.server.KafkaZooKeeper]) + EasyMock.expect(kafkaZooKeeper.getZookeeperClient).andReturn(zkClient) + EasyMock.expect(kafkaZooKeeper.ensurePartitionLeaderOnThisBroker(topic, 0)).once() + EasyMock.replay(kafkaZooKeeper) + + val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes() + EasyMock.replay(logManager) + + val requestChannel = new RequestChannel(2, 5) + val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZooKeeper, + null, null, null, null, configs.head.brokerId) + + /** + * This fetch, coming from a replica, requests all data at offset "15". Because the request is coming + * from a follower, the leader should oblige and read beyond the HW. + */ + val bigFetch = new FetchRequestBuilder() + .replicaId(followerReplicaId) + .addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE) + .build() + + val fetchRequest = ByteBuffer.allocate(bigFetch.sizeInBytes) + bigFetch.writeTo(fetchRequest) + fetchRequest.rewind() + + val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) + EasyMock.expect(receivedRequest.buffer).andReturn(fetchRequest) + EasyMock.replay(receivedRequest) + + // send the request + apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) + + /** + * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after + * an offset of 15 + */ + EasyMock.verify(log) + } + + private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, + localLog: Log, leaderHW: Long): Partition = { + val partition = new Partition(topic, partitionId, time) + val leaderReplica = new Replica(leaderId, partition, topic, time, Some(leaderHW), Some(localLog)) + + val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica + partition.assignedReplicas(Some(allReplicas.toSet)) + // set in sync replicas for this partition to all the assigned replicas + partition.inSyncReplicas = allReplicas.toSet + // set the leader and its hw and the hw update time + partition.leaderId(Some(leaderId)) + partition.leaderHW(Some(leaderHW)) + partition + } + + private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = { + configs.filter(_.brokerId != leaderId).map { config => + new Replica(config.brokerId, partition, topic, time) + } + } + +}