diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d9cd6c6..294269a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -238,8 +238,7 @@ class KafkaApis(val requestChannel: RequestChannel, satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key) }) }) - debug("Replica %d fetch unblocked %d producer requests." - .format(fetchRequest.replicaId, satisfiedProduceRequests.size)) + debug("Replica %d fetch unblocked %d producer requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size)) satisfiedProduceRequests.foreach(_.respond()) } @@ -269,17 +268,23 @@ class KafkaApis(val requestChannel: RequestChannel, var totalBytes = 0L for(offsetDetail <- fetchRequest.offsetInfo) { for(i <- 0 until offsetDetail.partitions.size) { + debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i))) try { - val localReplica = replicaManager.getReplica(offsetDetail.topic, offsetDetail.partitions(i)) - val available = localReplica match { - case Some(replica) => max(0, replica.log.get.logEndOffset - offsetDetail.offsets(i)) - case None => 0 + val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i)) + val end = if(fetchRequest.replicaId == FetchRequest.NonFollowerId) { + leader.highWatermark + } else { + leader.logEndOffset } + val available = max(0, end - offsetDetail.offsets(i)) totalBytes += math.min(offsetDetail.fetchSizes(i), available) } catch { case e: UnknownTopicOrPartitionException => info("Invalid partition %d in fetch request from client %d." .format(offsetDetail.partitions(i), fetchRequest.clientId)) + case e => + error("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s" + .format(offsetDetail.topic, offsetDetail.partitions(i), brokerId, fetchRequest.clientId), e) } } } @@ -311,26 +316,24 @@ class KafkaApis(val requestChannel: RequestChannel, val topic = offsetDetail.topic val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { - val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match { + val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId + val partitionInfo = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) match { case Left(err) => BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest - fetchRequest.replicaId match { - case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty) - case _ => - new PartitionData(partition, err, offset, -1L, MessageSet.Empty) - } - case Right(messages) => + new PartitionData(partition, err, offset, -1L, MessageSet.Empty) + case Right((messages, highWatermark)) => BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes) - val leaderReplica = replicaManager.getReplica(topic, partition).get - if (fetchRequest.replicaId != FetchRequest.NonFollowerId) { - debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]" - .format(topic, partition, fetchRequest.replicaId)) - debug("Leader returning %d messages for topic %s partition %d to follower %d" - .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId)) + if(!isFetchFromFollower) { + new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages) + } else { + debug("Leader %d for topic %s partition %d received fetch request from follower %d" + .format(brokerId, topic, partition, fetchRequest.replicaId)) + debug("Leader %d returning %d messages for topic %s partition %d to follower %d" + .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId)) + new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages) } - new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark, messages) } info.append(partitionInfo) } @@ -340,16 +343,31 @@ class KafkaApis(val requestChannel: RequestChannel, } /** - * Read from a single topic/partition at the given offset + * Read from a single topic/partition at the given offset upto maxSize bytes */ - private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Short, MessageSet] = { - var response: Either[Short, MessageSet] = null + private def readMessageSet(topic: String, partition: Int, offset: Long, + maxSize: Int, fromFollower: Boolean): Either[Short, (MessageSet, Long)] = { + var response: Either[Short, (MessageSet, Long)] = null try { // check if the current broker is the leader for the partitions - val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partition) + val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val log = localReplica.log.get - response = Right(log.read(offset, maxSize)) + val actualSize = if (!fromFollower) { + min(leader.highWatermark - offset, maxSize).toInt + } else { + maxSize + } + val messages = leader.log match { + case Some(log) => + if(actualSize >= 0) + log.read(offset, actualSize) + else + MessageSet.Empty + case None => + error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic, partition, brokerId)) + MessageSet.Empty + } + response = Right(messages, leader.highWatermark) } catch { case e => error("error when processing request " + (topic, partition, offset, maxSize), e) @@ -373,13 +391,14 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.getLeaderReplicaIfLocal(offsetRequest.topic, offsetRequest.partition) val offsets = replicaManager.logManager.getOffsets(offsetRequest) response = new OffsetResponse(offsetRequest.versionId, offsets) - }catch { + } catch { case ioe: IOException => fatal("Halting due to unrecoverable I/O error while handling producer request: " + ioe.getMessage, ioe) System.exit(1) case e => warn("Error while responding to offset request", e) - response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort) + response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long], + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort) } requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } @@ -420,7 +439,7 @@ class KafkaApis(val requestChannel: RequestChannel, ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause) } }) - }catch { + } catch { case e => error("Error while retrieving topic metadata", e) // convert exception type to error code errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala new file mode 100644 index 0000000..aa86392 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.nio.ByteBuffer +import kafka.api.{FetchRequest, FetchRequestBuilder} +import kafka.cluster.{Partition, Replica} +import kafka.log.Log +import kafka.message.{ByteBufferMessageSet, Message} +import kafka.network.{BoundedByteBufferReceive, RequestChannel} +import kafka.utils.{Time, TestUtils, MockTime} +import org.easymock.EasyMock +import org.I0Itec.zkclient.ZkClient +import org.scalatest.junit.JUnit3Suite + +class SimpleFetchTest extends JUnit3Suite { + + val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { + override val replicaMaxLagTimeMs = 100L + override val replicaMaxLagBytes = 10L + }) + val topic = "foo" + val partitionId = 0 + + /** + * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has + * one partition with one follower replica on broker "1". The leader replica on "0" + * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica + * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync + * but is still in ISR (hasn't yet expired from ISR). + * + * When a normal consumer fetches data, it only should only see data upto the HW of the leader, + * in this case up an offset of "5". + */ + def testNonReplicaSeesHwWhenFetching() { + /* setup */ + val time = new MockTime + val leo = 20 + val hw = 5 + val messages = new Message("test-message".getBytes()) + + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() + EasyMock.expect(log.read(0, hw)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.replay(log) + + val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes() + EasyMock.replay(logManager) + + val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + EasyMock.expect(replicaManager.config).andReturn(configs.head) + EasyMock.expect(replicaManager.logManager).andReturn(logManager) + EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) + EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint])) + EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.replay(replicaManager) + + val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) + partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L + + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() + EasyMock.replay(replicaManager) + + // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary) + // don't provide replica or leader callbacks since they will not be tested here + val requestChannel = new RequestChannel(2, 5) + val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId) + + // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log + val goodFetch = new FetchRequestBuilder() + .replicaId(FetchRequest.NonFollowerId) + .addFetch(topic, partitionId, 0, hw*2) + .build() + val goodFetchBB = ByteBuffer.allocate(goodFetch.sizeInBytes) + goodFetch.writeTo(goodFetchBB) + goodFetchBB.rewind() + + val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) + EasyMock.expect(receivedRequest.buffer).andReturn(goodFetchBB) + EasyMock.replay(receivedRequest) + + // send the request + apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, request=receivedRequest, start=1)) + + // make sure the log only reads bytes between 0->HW (5) + EasyMock.verify(log) + } + + /** + * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has + * one partition with one follower replica on broker "1". The leader replica on "0" + * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica + * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync + * but is still in ISR (hasn't yet expired from ISR). + * + * When the follower from broker "1" fetches data, it should see data upto the log end offset ("20") + */ + def testReplicaSeesLeoWhenFetching() { + /* setup */ + val time = new MockTime + val leo = 20 + val hw = 5 + + val messages = new Message("test-message".getBytes()) + + val followerReplicaId = configs(1).brokerId + val followerLEO = 15 + + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() + EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.replay(log) + + val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) + EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes() + EasyMock.replay(logManager) + + val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) + EasyMock.expect(replicaManager.config).andReturn(configs.head) + EasyMock.expect(replicaManager.logManager).andReturn(logManager) + EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) + EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint])) + EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.replay(replicaManager) + + val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) + partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long] + + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId, followerLEO)) + EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId)) + EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() + EasyMock.replay(replicaManager) + + val requestChannel = new RequestChannel(2, 5) + val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId) + + /** + * This fetch, coming from a replica, requests all data at offset "15". Because the request is coming + * from a follower, the leader should oblige and read beyond the HW. + */ + val bigFetch = new FetchRequestBuilder() + .replicaId(followerReplicaId) + .addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE) + .build() + + val fetchRequest = ByteBuffer.allocate(bigFetch.sizeInBytes) + bigFetch.writeTo(fetchRequest) + fetchRequest.rewind() + + val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) + EasyMock.expect(receivedRequest.buffer).andReturn(fetchRequest) + EasyMock.replay(receivedRequest) + + // send the request + apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) + + /** + * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after + * an offset of 15 + */ + EasyMock.verify(log) + } + + private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, + localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = { + val partition = new Partition(topic, partitionId, time, replicaManager) + val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) + + val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica + allReplicas.foreach(partition.addReplicaIfNotExists(_)) + // set in sync replicas for this partition to all the assigned replicas + partition.inSyncReplicas = allReplicas.toSet + // set the leader and its hw and the hw update time + partition.leaderReplicaIdOpt = Some(leaderId) + leaderReplica.highWatermark = leaderHW + partition + } + + private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = { + configs.filter(_.brokerId != leaderId).map { config => + new Replica(config.brokerId, partition, time) + } + } + +}