diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 39c4c39..eba0c78 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -269,17 +269,20 @@ class KafkaApis(val requestChannel: RequestChannel, var totalBytes = 0L for(offsetDetail <- fetchRequest.offsetInfo) { for(i <- 0 until offsetDetail.partitions.size) { + debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i))) try { - val localReplica = replicaManager.getReplica(offsetDetail.topic, offsetDetail.partitions(i)) - val available = localReplica match { - case Some(replica) => max(0, replica.log.get.logEndOffset - offsetDetail.offsets(i)) - case None => 0 + val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i)) + val end = if(fetchRequest.replicaId == FetchRequest.NonFollowerId) { + leader.highWatermark + } else { + leader.logEndOffset } + val available = max(0, end - offsetDetail.offsets(i)) totalBytes += math.min(offsetDetail.fetchSizes(i), available) } catch { - case e: UnknownTopicOrPartitionException => - info("Invalid partition %d in fetch request from client %d." - .format(offsetDetail.partitions(i), fetchRequest.clientId)) + case e => + info("No leader topic %s partition %s on broker %s in fetch request from client %s" + .format(offsetDetail.topic, offsetDetail.partitions(i), brokerId, fetchRequest.clientId)) } } } @@ -311,26 +314,32 @@ class KafkaApis(val requestChannel: RequestChannel, val topic = offsetDetail.topic val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { - val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match { + // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas + val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId + val partitionInfo = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) match { case Left(err) => BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest - fetchRequest.replicaId match { - case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty) - case _ => - new PartitionData(partition, err, offset, -1L, MessageSet.Empty) - } + new PartitionData(partition, err, offset, -1L, MessageSet.Empty) case Right(messages) => BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes) - val leaderReplica = replicaManager.getReplica(topic, partition).get - if (fetchRequest.replicaId != FetchRequest.NonFollowerId) { - debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]" - .format(topic, partition, fetchRequest.replicaId)) - debug("Leader returning %d messages for topic %s partition %d to follower %d" - .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId)) + val leaderReplicaOpt = replicaManager.getReplica(topic, partition, brokerId) + assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d must exist on leader broker %d".format(topic, partition, brokerId)) + val leaderReplica = leaderReplicaOpt.get + if(!isFetchFromFollower) { + new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark, messages) + } else { + val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId) + assert(replicaOpt.isDefined, "No replica %d in replica manager on %d" + .format(fetchRequest.replicaId, replicaManager.config.brokerId)) + val replica = replicaOpt.get + debug("Leader %d for topic %s partition %d received fetch request from follower %d" + .format(brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) + debug("Leader %d returning %d messages for topic %s partition %d to follower %d" + .format(brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) + new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark, messages) } - new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark, messages) } info.append(partitionInfo) } @@ -340,16 +349,21 @@ class KafkaApis(val requestChannel: RequestChannel, } /** - * Read from a single topic/partition at the given offset + * Read from a single topic/partition at the given offset upto maxSize bytes */ - private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Short, MessageSet] = { + private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int, fromFollower: Boolean): Either[Short, MessageSet] = { var response: Either[Short, MessageSet] = null try { // check if the current broker is the leader for the partitions - val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partition) + val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val log = localReplica.log.get - response = Right(log.read(offset, maxSize)) + val actualSize = if (!fromFollower) { + min(leader.highWatermark - offset, maxSize).toInt + } else { + maxSize + } + val messages = leader.log match { case Some(log) => log.read(offset, actualSize) case None => MessageSet.Empty } + response = Right(messages) } catch { case e => error("error when processing request " + (topic, partition, offset, maxSize), e) @@ -373,13 +387,14 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.getLeaderReplicaIfLocal(offsetRequest.topic, offsetRequest.partition) val offsets = replicaManager.logManager.getOffsets(offsetRequest) response = new OffsetResponse(offsetRequest.versionId, offsets) - }catch { + } catch { case ioe: IOException => fatal("Halting due to unrecoverable I/O error while handling producer request: " + ioe.getMessage, ioe) System.exit(1) case e => warn("Error while responding to offset request", e) - response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort) + response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long], + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort) } requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } @@ -420,7 +435,7 @@ class KafkaApis(val requestChannel: RequestChannel, ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause) } }) - }catch { + } catch { case e => error("Error while retrieving topic metadata", e) // convert exception type to error code errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala new file mode 100644 index 0000000..46d7415 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.nio.ByteBuffer +import kafka.api.{FetchRequest, FetchRequestBuilder} +import kafka.cluster.{Partition, Replica} +import kafka.log.Log +import kafka.message.{ByteBufferMessageSet, Message} +import kafka.network.{BoundedByteBufferReceive, RequestChannel} +import kafka.utils.{Time, TestUtils, MockTime} +import org.easymock.EasyMock +import org.I0Itec.zkclient.ZkClient +import org.scalatest.junit.JUnit3Suite + +class SimpleFetchTest extends JUnit3Suite { + + val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { + override val replicaMaxLagTimeMs = 100L + override val replicaMaxLagBytes = 10L + }) + val topic = "foo" + val partitionId = 0 + + /** + * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has + * one partition with one follower replica on broker "1". The leader replica on "0" + * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica + * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync + * but is still in ISR (hasn't yet expired from ISR). + * + * When a normal consumer fetches data, it only should only see data upto the HW of the leader, + * in this case up an offset of "5". + */ + def testNonReplicaSeesHwWhenFetching() { + /* setup */ + val time = new MockTime + val leo = 20 + val hw = 5 + val messages = new Message("test-message".getBytes()) + + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() + EasyMock.expect(log.read(0, hw)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.replay(log) + + val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes() + EasyMock.replay(logManager) + + val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + EasyMock.expect(replicaManager.config).andReturn(configs.head) + EasyMock.expect(replicaManager.logManager).andReturn(logManager) + EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) + EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint])) + EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.replay(replicaManager) + + val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) + partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L + + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)) + .andReturn(partition.leaderReplicaIfLocal().get).anyTimes() + EasyMock.expect(replicaManager.getReplica(topic, partitionId, configs.head.brokerId)) + .andReturn(partition.leaderReplicaIfLocal()).anyTimes() + EasyMock.replay(replicaManager) + + val kafkaZooKeeper = EasyMock.createMock(classOf[kafka.server.KafkaZooKeeper]) +// EasyMock.expect(kafkaZooKeeper.ensurePartitionLeaderOnThisBroker(topic, partitionId)).once() + EasyMock.replay(kafkaZooKeeper) + + + // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary) + // don't provide replica or leader callbacks since they will not be tested here + val requestChannel = new RequestChannel(2, 5) + val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId) + + // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log + val goodFetch = new FetchRequestBuilder() + .replicaId(FetchRequest.NonFollowerId) + .addFetch(topic, partitionId, 0, hw*2) + .build() + val goodFetchBB = ByteBuffer.allocate(goodFetch.sizeInBytes) + goodFetch.writeTo(goodFetchBB) + goodFetchBB.rewind() + + val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) + EasyMock.expect(receivedRequest.buffer).andReturn(goodFetchBB) + EasyMock.replay(receivedRequest) + + // send the request + apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, request=receivedRequest, start=1)) + + // make sure the log only reads bytes between 0->HW (5) + EasyMock.verify(log) + } + + /** + * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has + * one partition with one follower replica on broker "1". The leader replica on "0" + * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica + * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync + * but is still in ISR (hasn't yet expired from ISR). + * + * When the follower from broker "1" fetches data, it should see data upto the log end offset ("20") + */ + def testReplicaSeesLeoWhenFetching() { + /* setup */ + val time = new MockTime + val leo = 20 + val hw = 5 + + val messages = new Message("test-message".getBytes()) + + val followerReplicaId = configs(1).brokerId + val followerLEO = 15 + + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() + EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.replay(log) + + val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes() + EasyMock.replay(logManager) + + val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + EasyMock.expect(replicaManager.config).andReturn(configs.head) + EasyMock.expect(replicaManager.logManager).andReturn(logManager) + EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) + EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint])) + EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.replay(replicaManager) + + val partition = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, hw, replicaManager) + partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long] + + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.recordFollowerPosition(topic, 0, followerReplicaId, followerLEO)) + EasyMock.expect(replicaManager.getReplica(topic, 0, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId)) + EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, 0)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() + EasyMock.expect(replicaManager.getReplica(topic, 0, configs.head.brokerId)).andReturn(partition.leaderReplicaIfLocal()).anyTimes() + EasyMock.replay(replicaManager) + + val kafkaZooKeeper = EasyMock.createMock(classOf[kafka.server.KafkaZooKeeper]) + EasyMock.expect(kafkaZooKeeper.getZookeeperClient).andReturn(zkClient) +// EasyMock.expect(kafkaZooKeeper.ensurePartitionLeaderOnThisBroker(topic, 0)).once() + EasyMock.replay(kafkaZooKeeper) + + val requestChannel = new RequestChannel(2, 5) + val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId) + + /** + * This fetch, coming from a replica, requests all data at offset "15". Because the request is coming + * from a follower, the leader should oblige and read beyond the HW. + */ + val bigFetch = new FetchRequestBuilder() + .replicaId(followerReplicaId) + .addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE) + .build() + + val fetchRequest = ByteBuffer.allocate(bigFetch.sizeInBytes) + bigFetch.writeTo(fetchRequest) + fetchRequest.rewind() + + val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) + EasyMock.expect(receivedRequest.buffer).andReturn(fetchRequest) + EasyMock.replay(receivedRequest) + + // send the request + apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) + + /** + * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after + * an offset of 15 + */ + EasyMock.verify(log) + } + + private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, + localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = { + val partition = new Partition(topic, partitionId, time, replicaManager) + val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) + + val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica + allReplicas.foreach(partition.addReplicaIfNotExists(_)) + // set in sync replicas for this partition to all the assigned replicas + partition.inSyncReplicas = allReplicas.toSet + // set the leader and its hw and the hw update time + partition.leaderReplicaIdOpt = Some(leaderId) + leaderReplica.highWatermark = leaderHW + partition + } + + private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = { + configs.filter(_.brokerId != leaderId).map { config => + new Replica(config.brokerId, partition, time) + } + } + +}