Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1350782) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -397,6 +397,18 @@ } } + def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = { + val startTime = System.currentTimeMillis() + while (true) { + if (condition()) + return true + if (System.currentTimeMillis() > startTime + waitTime) + return false + Thread.sleep(100) + } + // should never hit here + throw new RuntimeException("unexpected error") + } } object ControllerTestUtils{ @@ -442,9 +454,6 @@ } } - - - object TestZKUtils { val zookeeperConnect = "127.0.0.1:2182" } Index: core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (revision 1350782) +++ core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (working copy) @@ -23,11 +23,8 @@ import kafka.producer.ProducerData import kafka.serializer.StringEncoder import kafka.admin.CreateTopicCommand -import kafka.cluster.{Replica, Partition, Broker} -import kafka.utils.{MockTime, TestUtils} +import kafka.utils.TestUtils import junit.framework.Assert._ -import java.io.File -import kafka.log.Log class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(2) @@ -41,16 +38,13 @@ } override def tearDown() { - super.tearDown() brokers.foreach(_.shutdown()) + super.tearDown() } def testReplicaFetcherThread() { val partition = 0 val testMessageList = List("test1", "test2", "test3", "test4") - val leaderBrokerId = configs.head.brokerId - val followerBrokerId = configs.last.brokerId - val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port) // create a topic and partition CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":")) @@ -59,26 +53,13 @@ val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder) producer.send(new ProducerData[String, String](topic, "test", testMessageList)) - // create a tmp directory - val tmpLogDir = TestUtils.tempDir() - val replicaLogDir = new File(tmpLogDir, topic + "-" + partition) - replicaLogDir.mkdirs() - val replicaLog = new Log(replicaLogDir, 500, 500, false) + def condition(): Boolean = { + brokers.foldLeft(true) { (total, item) => total & (60L == item.getLogManager().getLog(topic, partition).get.logEndOffset) } + } - // create replica fetch thread - val time = new MockTime - val testPartition = new Partition(topic, partition, time) - testPartition.leaderId(Some(leaderBrokerId)) - val testReplica = new Replica(followerBrokerId, testPartition, topic, Some(replicaLog)) - val replicaFetchThread = new ReplicaFetcherThread("replica-fetcher", testReplica, leaderBroker, configs.last) + val result = waitUntilTrue(condition, 6000) - // start a replica fetch thread to the above broker - replicaFetchThread.start() - - Thread.sleep(700) - replicaFetchThread.shutdown() - - assertEquals(60L, testReplica.log.get.logEndOffset) - replicaLog.close() + producer.close() + assertTrue(result) } } \ No newline at end of file Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1350782) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -382,6 +382,19 @@ ret } + /** + * Truncate all segments in the log and start a new segment on a new offset + */ + def truncateAndStartWithNewOffset(newOffset: Long) { + lock synchronized { + val deletedSegments = segments.trunc(segments.view.size) + val newFile = new File(dir, Log.nameFromOffset(newOffset)) + debug("tuncate and start log '" + name + "' to " + newFile.getName()) + segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset)) + deleteSegments(deletedSegments) + } + } + /* Attemps to delete all provided segments from a log and returns how many it was able to */ def deleteSegments(segments: Seq[LogSegment]): Int = { var total = 0 Index: core/src/main/scala/kafka/cluster/Replica.scala =================================================================== --- core/src/main/scala/kafka/cluster/Replica.scala (revision 1350782) +++ core/src/main/scala/kafka/cluster/Replica.scala (working copy) @@ -18,7 +18,6 @@ package kafka.cluster import kafka.log.Log -import kafka.server.{KafkaConfig, ReplicaFetcherThread} import java.lang.IllegalStateException import kafka.utils.Logging @@ -28,7 +27,6 @@ var log: Option[Log] = None, var leoUpdateTime: Long = -1L) extends Logging { private var logEndOffset: Long = -1L - private var replicaFetcherThread: ReplicaFetcherThread = null def logEndOffset(newLeo: Option[Long] = None): Long = { isLocal match { @@ -88,32 +86,6 @@ } } - def startReplicaFetcherThread(leaderBroker: Broker, config: KafkaConfig) { - val name = "Replica-Fetcher-%d-%s-%d".format(brokerId, topic, partition.partitionId) - replicaFetcherThread = new ReplicaFetcherThread(name, this, leaderBroker, config) - replicaFetcherThread.setDaemon(true) - replicaFetcherThread.start() - } - - def stopReplicaFetcherThread() { - if(replicaFetcherThread != null) { - replicaFetcherThread.shutdown() - replicaFetcherThread = null - } - } - - def getIfFollowerAndLeader(): (Boolean, Int) = { - replicaFetcherThread != null match { - case true => (true, replicaFetcherThread.getLeader().id) - case false => (false, -1) - } - } - - def close() { - if(replicaFetcherThread != null) - replicaFetcherThread.shutdown() - } - override def equals(that: Any): Boolean = { if(!(that.isInstanceOf[Replica])) return false Index: core/src/main/scala/kafka/common/ErrorMapping.scala =================================================================== --- core/src/main/scala/kafka/common/ErrorMapping.scala (revision 1350782) +++ core/src/main/scala/kafka/common/ErrorMapping.scala (working copy) @@ -60,6 +60,8 @@ def maybeThrowException(code: Short) = if(code != 0) throw codeToException(code).newInstance() + + def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance() } class InvalidTopicException(message: String) extends RuntimeException(message) { Index: core/src/main/scala/kafka/server/AbstractFetcherManager.scala =================================================================== --- core/src/main/scala/kafka/server/AbstractFetcherManager.scala (revision 0) +++ core/src/main/scala/kafka/server/AbstractFetcherManager.scala (revision 0) @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import scala.collection.mutable +import kafka.utils.Logging +import kafka.cluster.Broker + +abstract class AbstractFetcherManager(name: String, numReplicaFetchers: Int = 1) extends Logging { + // map of (source brokerid, fetcher Id per source broker) => fetcher + private val fetcherRunnableMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcher] + private val mapLock = new Object + this.logIdent = name + " " + + private def getFetcherId(topic: String, partitionId: Int) : Int = { + (topic.hashCode() + 31 * partitionId) % numReplicaFetchers + } + + // to be defined in subclass to create a specific fetcher + def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcher + + def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) { + info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d".format(topic, partitionId, initialOffset, sourceBroker.id)) + mapLock synchronized { + var fetcher: AbstractFetcher = null + val key = (sourceBroker.id, getFetcherId(topic, partitionId)) + fetcherRunnableMap.get(key) match { + case Some(f) => fetcher = f + case None => + fetcher = createFetcherThread(key._2, sourceBroker) + fetcherRunnableMap.put(key, fetcher) + fetcher.start + } + fetcher.addPartition(topic, partitionId, initialOffset) + } + } + + def removeFetcher(topic: String, partitionId: Int) { + info("%s removing fetcher on topic %s, partition %d".format(name, topic, partitionId)) + mapLock synchronized { + for ((key, fetcher) <- fetcherRunnableMap) { + fetcher.removePartition(topic, partitionId) + if (fetcher.partitionCount <= 0) { + fetcher.shutdown + fetcherRunnableMap.remove(key) + } + } + } + } + + def fetcherSourceBroker(topic: String, partitionId: Int): Option[Int] = { + mapLock synchronized { + for ( ((sourceBrokerId, _), fetcher) <- fetcherRunnableMap) + if (fetcher.hasPartition(topic, partitionId)) + return Some(sourceBrokerId) + } + None + } + + def shutdown() = { + info("shutting down") + for ( (_, fetcher) <- fetcherRunnableMap) { + fetcher.shutdown + } + info("shutdown completes") + } +} \ No newline at end of file Index: core/src/main/scala/kafka/server/KafkaConfig.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaConfig.scala (revision 1350782) +++ core/src/main/scala/kafka/server/KafkaConfig.scala (working copy) @@ -136,4 +136,7 @@ val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086) + /* number of fetcher threads used to replicate messages from a source broker. + * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ + val numReplicaFetchers = Utils.getInt(props, "replica.fetchers", 1) } Index: core/src/main/scala/kafka/server/AbstractFetcher.scala =================================================================== --- core/src/main/scala/kafka/server/AbstractFetcher.scala (revision 0) +++ core/src/main/scala/kafka/server/AbstractFetcher.scala (revision 0) @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.CountDownLatch +import kafka.cluster.Broker +import kafka.consumer.SimpleConsumer +import java.util.concurrent.atomic.AtomicBoolean +import kafka.utils.Logging +import kafka.common.ErrorMapping +import kafka.api.{PartitionData, FetchRequestBuilder} +import scala.collection.mutable + +/** + * Abstract class for fetching data from multiple partitions from the same broker. + */ +abstract class AbstractFetcher(val name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, + fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) + extends Thread(name) with Logging { + private val isRunning: AtomicBoolean = new AtomicBoolean(true) + private val shutdownLatch = new CountDownLatch(1) + private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map + private val fetchMapLock = new Object + val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) + this.logIdent = name + " " + + // callbacks to be defined in subclass + + // process fetched data and return the new fetch offset + def processPartitionData(topic: String, partitionData: PartitionData): Long + + // handle a partition whose offset is out of range and return a new fetch offset + def handleOffsetOutOfRange(topic: String, partitionId: Int): Long + + // any logic for partitions whose leader has changed + def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit + + override def run() { + try { + while(isRunning.get()) { + val builder = new FetchRequestBuilder(). + clientId(name). + replicaId(fetcherBrokerId). + maxWait(maxWait). + minBytes(minBytes) + + fetchMapLock synchronized { + for ( ((topic, partitionId), offset) <- fetchMap ) + builder.addFetch(topic, partitionId, offset.longValue, fetchSize) + } + + val fetchRequest = builder.build() + val response = simpleConsumer.fetch(fetchRequest) + trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) + + var partitionsWithNewLeader : List[Tuple2[String, Int]] = Nil + // process fetched data + fetchMapLock synchronized { + for ( topicData <- response.data ) { + for ( partitionData <- topicData.partitionData) { + val topic = topicData.topic + val partitionId = partitionData.partition + val key = (topic, partitionId) + val currentOffset = fetchMap.get(key) + if (!currentOffset.isEmpty) { + partitionData.error match { + case ErrorMapping.NoError => + val newOffset = processPartitionData(topic, partitionData) + fetchMap.put(key, newOffset) + case ErrorMapping.OffsetOutOfRangeCode => + val newOffset = handleOffsetOutOfRange(topic, partitionId) + fetchMap.put(key, newOffset) + warn("current offset %d for topic %s partition %d out of range; reset offset to %d" + .format(currentOffset.get, topic, partitionId, newOffset)) + case ErrorMapping.NotLeaderForPartitionCode => + partitionsWithNewLeader ::= key + case _ => + error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host), + ErrorMapping.exceptionFor(partitionData.error)) + } + } + } + } + } + if (partitionsWithNewLeader.size > 0) { + debug("changing leaders for %s".format(partitionsWithNewLeader)) + handlePartitionsWithNewLeader(partitionsWithNewLeader) + } + } + } catch { + case e: InterruptedException => info("replica fetcher runnable interrupted. Shutting down") + case e1 => error("error in replica fetcher runnable", e1) + } + shutdownComplete() + } + + def addPartition(topic: String, partitionId: Int, initialOffset: Long) { + fetchMapLock synchronized { + fetchMap.put((topic, partitionId), initialOffset) + } + } + + def removePartition(topic: String, partitionId: Int) { + fetchMapLock synchronized { + fetchMap.remove((topic, partitionId)) + } + } + + def hasPartition(topic: String, partitionId: Int): Boolean = { + fetchMap.get((topic, partitionId)) != null + } + + def partitionCount() = fetchMap.size + + private def shutdownComplete() = { + simpleConsumer.close() + shutdownLatch.countDown + } + + def shutdown() { + info("shutting down") + isRunning.set(false) + interrupt() + shutdownLatch.await() + info("shutdown completed") + } +} \ No newline at end of file Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (revision 1350782) +++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (working copy) @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.CountDownLatch -import kafka.api.FetchRequestBuilder -import kafka.utils.Logging -import kafka.cluster.{Broker, Replica} -import kafka.consumer.SimpleConsumer - -class ReplicaFetcherThread(name: String, replica: Replica, leaderBroker: Broker, config: KafkaConfig) - extends Thread(name) with Logging { - val isRunning: AtomicBoolean = new AtomicBoolean(true) - private val shutdownLatch = new CountDownLatch(1) - private val replicaConsumer = new SimpleConsumer(leaderBroker.host, leaderBroker.port, - config.replicaSocketTimeoutMs, config.replicaSocketBufferSize) - - override def run() { - try { - info("Starting replica fetch thread %s for topic %s partition %d".format(name, replica.topic, replica.partition.partitionId)) - while(isRunning.get()) { - val builder = new FetchRequestBuilder(). - clientId(name). - replicaId(replica.brokerId). - maxWait(config.replicaMaxWaitTimeMs). - minBytes(config.replicaMinBytes) - - // TODO: KAFKA-339 Keep this simple single fetch for now. Change it to fancier multi fetch when message - // replication actually works - val fetchOffset = replica.logEndOffset() - trace("Follower %d issuing fetch request for topic %s partition %d to leader %d from offset %d" - .format(replica.brokerId, replica.topic, replica.partition.partitionId, leaderBroker.id, fetchOffset)) - builder.addFetch(replica.topic, replica.partition.partitionId, fetchOffset, config.replicaFetchSize) - - val fetchRequest = builder.build() - val response = replicaConsumer.fetch(fetchRequest) - // TODO: KAFKA-339 Check for error. Don't blindly read the messages - // append messages to local log - replica.log.get.append(response.messageSet(replica.topic, replica.partition.partitionId)) - // record the hw sent by the leader for this partition - val followerHighWatermark = replica.logEndOffset().min(response.data.head.partitionData.head.hw) - replica.highWatermark(Some(followerHighWatermark)) - trace("Follower %d set replica highwatermark for topic %s partition %d to %d" - .format(replica.brokerId, replica.topic, replica.partition.partitionId, replica.highWatermark())) - } - }catch { - case e: InterruptedException => warn("Replica fetcher thread %s interrupted. Shutting down".format(name)) - case e1 => error("Error in replica fetcher thread. Shutting down due to ", e1) - } - shutdownComplete() - } - - private def shutdownComplete() = { - replicaConsumer.close() - shutdownLatch.countDown - } - - def getLeader(): Broker = leaderBroker - - def shutdown() { - info("Shutting down replica fetcher thread") - isRunning.set(false) - interrupt() - shutdownLatch.await() - info("Replica fetcher thread shutdown completed") - } -} \ No newline at end of file Index: core/src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1350782) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -32,6 +32,8 @@ private var leaderReplicas = new ListBuffer[Partition]() private val leaderReplicaLock = new ReentrantLock() private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true) + private val replicaFetcherManager = new ReplicaFetcherManager(config, this) + // start ISR expiration thread isrExpirationScheduler.startUp isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs) @@ -139,7 +141,7 @@ def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) { // stop replica fetcher thread, if any - replica.stopReplicaFetcherThread() + replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) // read and cache the ISR replica.partition.leaderId(Some(replica.brokerId)) replica.partition.updateISR(currentISRInZk.toSet) @@ -153,7 +155,7 @@ } def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) { - info("Broker %d becoming follower to leader %d for topic %s partition %d" + info("broker %d intending to follow leader %d for topic %s partition %d" .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId)) // remove this replica's partition from the ISR expiration queue try { @@ -169,13 +171,15 @@ } // get leader for this replica val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head - val isReplicaAFollower = replica.getIfFollowerAndLeader() + val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId) // Become follower only if it is not already following the same leader - if(!(isReplicaAFollower._1 && (isReplicaAFollower._2 == leaderBroker.id))) { + if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) { + info("broker %d becoming follower to leader %d for topic %s partition %d" + .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId)) // stop fetcher thread to previous leader - replica.stopReplicaFetcherThread() + replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) // start fetcher thread to current leader - replica.startReplicaFetcherThread(leaderBroker, config) + replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker) } } @@ -244,6 +248,6 @@ def close() { isrExpirationScheduler.shutdown() - allReplicas.foreach(_._2.assignedReplicas().foreach(_.close())) + replicaFetcherManager.shutdown() } } Index: core/src/main/scala/kafka/server/ReplicaFetcherManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (revision 0) +++ core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (revision 0) @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.cluster.Broker + +class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) + extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) { + + def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcher = { + new ReplicaFetcher("ReplicaFetcher-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr) + } + +} \ No newline at end of file Index: core/src/main/scala/kafka/server/ReplicaFetcher.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaFetcher.scala (revision 0) +++ core/src/main/scala/kafka/server/ReplicaFetcher.scala (revision 0) @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.{OffsetRequest, PartitionData} +import kafka.cluster.Broker + +class ReplicaFetcher(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager) + extends AbstractFetcher(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs, + socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize, + fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs, + minBytes = brokerConfig.replicaMinBytes) { + + // process fetched data and return the new fetch offset + def processPartitionData(topic: String, partitionData: PartitionData): Long = { + val partitionId = partitionData.partition + val replica = replicaMgr.getReplica(topic, partitionId).get + val messageSet = partitionData.messages + + replica.log.get.append(messageSet) + replica.highWatermark(Some(partitionData.hw)) + trace("follower %d set replica highwatermark for topic %s partition %d to %d" + .format(replica.brokerId, topic, partitionId, partitionData.hw)) + replica.logEndOffset() + } + + // handle a partition whose offset is out of range and return a new fetch offset + def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = { + // This means the local replica is out of date. Truncate the log and catch up from beginning. + val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, OffsetRequest.EarliestTime, 1) + val replica = replicaMgr.getReplica(topic, partitionId).get + replica.log.get.truncateAndStartWithNewOffset(offsets(0)) + return offsets(0) + } + + // any logic for partitions whose leader has changed + def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit = { + // no handler needed since the controller will make the changes accordingly + } + + +} \ No newline at end of file