diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 55a5982..c0f80e8 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -17,16 +17,16 @@ package kafka.api -import java.nio.ByteBuffer import kafka.utils.nonthreadsafe import kafka.api.ApiUtils._ -import scala.collection.immutable.Map import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.consumer.ConsumerConfig -import java.util.concurrent.atomic.AtomicInteger import kafka.network.RequestChannel import kafka.message.MessageSet +import java.util.concurrent.atomic.AtomicInteger +import java.nio.ByteBuffer +import scala.collection.immutable.Map case class PartitionFetchInfo(offset: Long, fetchSize: Int) @@ -147,7 +147,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val fetchResponsePartitionData = requestInfo.map { case (topicAndPartition, data) => - (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty)) + (topicAndPartition, PartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty)) } val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse))) diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index d117f10..40e9ca9 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -19,20 +19,21 @@ package kafka.api import java.nio.ByteBuffer import java.nio.channels.GatheringByteChannel + import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.message.{MessageSet, ByteBufferMessageSet} import kafka.network.{MultiSend, Send} import kafka.api.ApiUtils._ -object FetchResponsePartitionData { - def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = { +object PartitionData { + def readFrom(buffer: ByteBuffer): PartitionData = { val error = buffer.getShort val hw = buffer.getLong val messageSetSize = buffer.getInt val messageSetBuffer = buffer.slice() messageSetBuffer.limit(messageSetSize) buffer.position(buffer.position + messageSetSize) - new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer)) + new PartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer)) } val headerSize = @@ -41,18 +42,18 @@ object FetchResponsePartitionData { 4 /* messageSetSize */ } -case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError, hw: Long = -1L, messages: MessageSet) { - val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes +case class PartitionData(error: Short = ErrorMapping.NoError, hw: Long = -1L, messages: MessageSet) { + val sizeInBytes = PartitionData.headerSize + messages.sizeInBytes } // SENDS class PartitionDataSend(val partitionId: Int, - val partitionData: FetchResponsePartitionData) extends Send { + val partitionData: PartitionData) extends Send { private val messageSize = partitionData.messages.sizeInBytes private var messagesSentSize = 0 - private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize) + private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + PartitionData.headerSize) buffer.putInt(partitionId) buffer.putShort(partitionData.error) buffer.putLong(partitionData.hw) @@ -80,7 +81,7 @@ object TopicData { val partitionCount = buffer.getInt val topicPartitionDataPairs = (1 to partitionCount).map(_ => { val partitionId = buffer.getInt - val partitionData = FetchResponsePartitionData.readFrom(buffer) + val partitionData = PartitionData.readFrom(buffer) (partitionId, partitionData) }) TopicData(topic, Map(topicPartitionDataPairs:_*)) @@ -91,7 +92,7 @@ object TopicData { 4 /* partition count */ } -case class TopicData(topic: String, partitionData: Map[Int, FetchResponsePartitionData]) { +case class TopicData(topic: String, partitionData: Map[Int, PartitionData]) { val sizeInBytes = TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes + 4) @@ -151,7 +152,7 @@ object FetchResponse { case class FetchResponse(correlationId: Int, - data: Map[TopicAndPartition, FetchResponsePartitionData]) { + data: Map[TopicAndPartition, PartitionData]) { /** * Partitions the data into a map of maps (one for each topic). @@ -167,7 +168,7 @@ case class FetchResponse(correlationId: Int, folded + topicData.sizeInBytes }) - private def partitionDataFor(topic: String, partition: Int): FetchResponsePartitionData = { + private def partitionDataFor(topic: String, partition: Int): PartitionData = { val topicAndPartition = TopicAndPartition(topic, partition) data.get(topicAndPartition) match { case Some(partitionData) => partitionData diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 134aef9..edc8079 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -21,7 +21,7 @@ import kafka.admin.AdminUtils import kafka.utils._ import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{OffsetManager, ReplicaManager} +import kafka.server.{TopicPartitionRequestKey, LogOffsetMetadata, OffsetManager, ReplicaManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -29,7 +29,8 @@ import kafka.message.ByteBufferMessageSet import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.utils.Utils.{inReadLock,inWriteLock} -import scala.collection._ +import scala.Some +import scala.collection.immutable.Set import com.yammer.metrics.core.Gauge @@ -39,18 +40,18 @@ import com.yammer.metrics.core.Gauge */ class Partition(val topic: String, val partitionId: Int, - var replicationFactor: Int, time: Time, - val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup { + replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup { private val localBrokerId = replicaManager.config.brokerId private val logManager = replicaManager.logManager private val zkClient = replicaManager.zkClient - var leaderReplicaIdOpt: Option[Int] = None - var inSyncReplicas: Set[Replica] = Set.empty[Replica] - private val assignedReplicaMap = new Pool[Int,Replica] + private val assignedReplicaMap = new Pool[Int, Replica] + // The read lock is only required when multiple reads are executed and needs to be in a consistent manner private val leaderIsrUpdateLock = new ReentrantReadWriteLock() private var zkVersion: Int = LeaderAndIsr.initialZKVersion - private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 + @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 + @volatile var leaderReplicaIdOpt: Option[Int] = None + @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica] /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. * One way of doing that is through the controller's start replica state change command. When a new broker starts up * the controller sends it a start replica command containing the leader for each partition that the broker hosts. @@ -58,7 +59,6 @@ class Partition(val topic: String, * each partition. */ private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId) - private val stateChangeLogger = KafkaController.stateChangeLogger private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) @@ -72,13 +72,11 @@ class Partition(val topic: String, ) def isUnderReplicated(): Boolean = { - inReadLock(leaderIsrUpdateLock) { - leaderReplicaIfLocal() match { - case Some(_) => - inSyncReplicas.size < assignedReplicas.size - case None => - false - } + leaderReplicaIfLocal() match { + case Some(_) => + inSyncReplicas.size < assignedReplicas.size + case None => + false } } @@ -114,15 +112,13 @@ class Partition(val topic: String, } def leaderReplicaIfLocal(): Option[Replica] = { - inReadLock(leaderIsrUpdateLock) { - leaderReplicaIdOpt match { - case Some(leaderReplicaId) => - if (leaderReplicaId == localBrokerId) - getReplica(localBrokerId) - else - None - case None => None - } + leaderReplicaIdOpt match { + case Some(leaderReplicaId) => + if (leaderReplicaId == localBrokerId) + getReplica(localBrokerId) + else + None + case None => None } } @@ -155,9 +151,7 @@ class Partition(val topic: String, } def getLeaderEpoch(): Int = { - inReadLock(leaderIsrUpdateLock) { - return this.leaderEpoch - } + return this.leaderEpoch } /** @@ -179,14 +173,17 @@ class Partition(val topic: String, val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet // remove assigned replicas that have been removed by the controller (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) - // reset LogEndOffset for remote replicas - assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) inSyncReplicas = newInSyncReplicas leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion leaderReplicaIdOpt = Some(localBrokerId) + // construct the high watermark metadata for the new leader replica + val newLeaderReplica = getReplica().get + newLeaderReplica.convertHWToLocalOffsetMetadata() + // reset log end offset for remote replicas + assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata) // we may need to increment high watermark since ISR could be down to 1 - maybeIncrementLeaderHW(getReplica().get) + maybeIncrementLeaderHW(newLeaderReplica) if (topic == OffsetManager.OffsetsTopicName) offsetManager.loadOffsetsFromLog(partitionId) true @@ -233,18 +230,8 @@ class Partition(val topic: String, } } - def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { + def updateLeaderHWAndMaybeExpandIsr(replicaId: Int) { inWriteLock(leaderIsrUpdateLock) { - debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) - val replicaOpt = getReplica(replicaId) - if(!replicaOpt.isDefined) { - throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" + - " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, - offset, topic, partitionId, replicaId, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) - } - val replica = replicaOpt.get - replica.logEndOffset = offset - // check if this replica needs to be added to the ISR leaderReplicaIfLocal() match { case Some(leaderReplica) => @@ -253,8 +240,10 @@ class Partition(val topic: String, // For a replica to get added back to ISR, it has to satisfy 3 conditions- // 1. It is not already in the ISR // 2. It is part of the assigned replica list. See KAFKA-1097 - // 3. It's log end offset >= leader's highwatermark - if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) { + // 3. It's log end offset >= leader's high watermark + if (!inSyncReplicas.contains(replica) && + assignedReplicas.map(_.brokerId).contains(replicaId) && + replica.logEndOffset.messageDiff(leaderHW) >= 0) { // expand ISR val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" @@ -270,29 +259,29 @@ class Partition(val topic: String, } def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { - inReadLock(leaderIsrUpdateLock) { - leaderReplicaIfLocal() match { - case Some(_) => - val numAcks = inSyncReplicas.count(r => { - if (!r.isLocal) - r.logEndOffset >= requiredOffset - else - true /* also count the local (leader) replica */ - }) - trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) - if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) || - (requiredAcks > 0 && numAcks >= requiredAcks)) { - /* - * requiredAcks < 0 means acknowledge after all replicas in ISR - * are fully caught up to the (local) leader's offset - * corresponding to this produce request. - */ - (true, ErrorMapping.NoError) - } else - (false, ErrorMapping.NoError) - case None => - (false, ErrorMapping.NotLeaderForPartitionCode) - } + leaderReplicaIfLocal() match { + case Some(leaderReplica) => + // keep the current immutable replica list reference + val curInSyncReplicas = inSyncReplicas + val numAcks = curInSyncReplicas.count(r => { + if (!r.isLocal) + r.logEndOffset.messageOffset >= requiredOffset + else + true /* also count the local (leader) replica */ + }) + trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) + if ((requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) || + (requiredAcks > 0 && numAcks >= requiredAcks)) { + /* + * requiredAcks < 0 means acknowledge after all replicas in ISR + * are fully caught up to the (local) leader's offset + * corresponding to this produce request. + */ + (true, ErrorMapping.NoError) + } else + (false, ErrorMapping.NoError) + case None => + (false, ErrorMapping.NotLeaderForPartitionCode) } } @@ -302,15 +291,19 @@ class Partition(val topic: String, */ private def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) - val newHighWatermark = allLogEndOffsets.min + val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark - if(newHighWatermark > oldHighWatermark) { + if(oldHighWatermark.precedes(newHighWatermark)) { leaderReplica.highWatermark = newHighWatermark - debug("Highwatermark for partition [%s,%d] updated to %d".format(topic, partitionId, newHighWatermark)) + debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) + // some delayed requests may be unblocked after HW changed + val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId) + replicaManager.unblockDelayedFetchRequests(requestKey) + replicaManager.unblockDelayedProduceRequests(requestKey) + } else { + debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s" + .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(","))) } - else - debug("Old hw for partition [%s,%d] is %d. New hw is %d. All leo's are %s" - .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(","))) } def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { @@ -349,7 +342,9 @@ class Partition(val topic: String, if(stuckReplicas.size > 0) debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above - val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages) + val slowReplicas = candidateReplicas.filter(r => + r.logEndOffset.messageOffset >= 0 && + leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages) if(slowReplicas.size > 0) debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) stuckReplicas ++ slowReplicas @@ -362,6 +357,8 @@ class Partition(val topic: String, case Some(leaderReplica) => val log = leaderReplica.log.get val info = log.append(messages, assignOffsets = true) + // probably unblock some follower fetch requests since log end offset has been updated + replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) info @@ -399,14 +396,12 @@ class Partition(val topic: String, } override def toString(): String = { - inReadLock(leaderIsrUpdateLock) { - val partitionString = new StringBuilder - partitionString.append("Topic: " + topic) - partitionString.append("; Partition: " + partitionId) - partitionString.append("; Leader: " + leaderReplicaIdOpt) - partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(",")) - partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(",")) - partitionString.toString() - } + val partitionString = new StringBuilder + partitionString.append("Topic: " + topic) + partitionString.append("; Partition: " + partitionId) + partitionString.append("; Leader: " + leaderReplicaIdOpt) + partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(",")) + partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(",")) + partitionString.toString() } } diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 5e659b4..e9a2ea5 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -19,8 +19,9 @@ package kafka.cluster import kafka.log.Log import kafka.utils.{SystemTime, Time, Logging} +import kafka.server.LogOffsetMetadata import kafka.common.KafkaException -import kafka.server.ReplicaManager + import java.util.concurrent.atomic.AtomicLong class Replica(val brokerId: Int, @@ -28,33 +29,17 @@ class Replica(val brokerId: Int, time: Time = SystemTime, initialHighWatermarkValue: Long = 0L, val log: Option[Log] = None) extends Logging { - //only defined in local replica - private[this] var highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue) - // only used for remote replica; logEndOffsetValue for local replica is kept in log - private[this] var logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset) - private[this] var logEndOffsetUpdateTimeMsValue: AtomicLong = new AtomicLong(time.milliseconds) + // the high watermark offset value, in non-leader replicas only its message offsets are kept + @volatile private[this] var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue) + // the log end offset value, kept in all replicas; + // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch + @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata + // the time when log offset is updated + private[this] val logEndOffsetUpdateTimeMsValue = new AtomicLong(time.milliseconds) + val topic = partition.topic val partitionId = partition.partitionId - def logEndOffset_=(newLogEndOffset: Long) { - if (!isLocal) { - logEndOffsetValue.set(newLogEndOffset) - logEndOffsetUpdateTimeMsValue.set(time.milliseconds) - trace("Setting log end offset for replica %d for partition [%s,%d] to %d" - .format(brokerId, topic, partitionId, logEndOffsetValue.get())) - } else - throw new KafkaException("Shouldn't set logEndOffset for replica %d partition [%s,%d] since it's local" - .format(brokerId, topic, partitionId)) - - } - - def logEndOffset = { - if (isLocal) - log.get.logEndOffset - else - logEndOffsetValue.get() - } - def isLocal: Boolean = { log match { case Some(l) => true @@ -62,24 +47,45 @@ class Replica(val brokerId: Int, } } - def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get() - - def highWatermark_=(newHighWatermark: Long) { + def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { if (isLocal) { - trace("Setting hw for replica %d partition [%s,%d] on broker %d to %d" - .format(brokerId, topic, partitionId, brokerId, newHighWatermark)) - highWatermarkValue.set(newHighWatermark) - } else - throw new KafkaException("Unable to set highwatermark for replica %d partition [%s,%d] since it's not local" - .format(brokerId, topic, partitionId)) + throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId)) + } else { + logEndOffsetMetadata = newLogEndOffset + logEndOffsetUpdateTimeMsValue.set(time.milliseconds) + trace("Setting log end offset for replica %d for partition [%s,%d] to [%s]" + .format(brokerId, topic, partitionId, logEndOffsetMetadata)) + } } - def highWatermark = { + def logEndOffset = if (isLocal) - highWatermarkValue.get() + log.get.logEndOffsetMetadata else - throw new KafkaException("Unable to get highwatermark for replica %d partition [%s,%d] since it's not local" - .format(brokerId, topic, partitionId)) + logEndOffsetMetadata + + def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get() + + def highWatermark_=(newHighWatermark: LogOffsetMetadata) { + if (isLocal) { + highWatermarkMetadata = newHighWatermark + trace("Setting high watermark for replica %d partition [%s,%d] on broker %d to [%s]" + .format(brokerId, topic, partitionId, brokerId, newHighWatermark)) + } else { + throw new KafkaException("Should not set high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId)) + } + } + + def highWatermark = highWatermarkMetadata + + def convertHWToLocalOffsetMetadata() = { + if (isLocal) { + highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset) + if (highWatermarkMetadata == LogOffsetMetadata.UnknownOffsetMetadata) + highWatermarkMetadata = log.get.logEndOffsetMetadata + } else { + throw new KafkaException("Should not construct complete high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId)) + } } override def equals(that: Any): Boolean = { diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index f8c1b4e..57b51c3 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -20,7 +20,7 @@ package kafka.consumer import kafka.cluster.Broker import kafka.server.AbstractFetcherThread import kafka.message.ByteBufferMessageSet -import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData} +import kafka.api.{Request, OffsetRequest, PartitionData} import kafka.common.TopicAndPartition @@ -41,7 +41,7 @@ class ConsumerFetcherThread(name: String, isInterruptible = true) { // process fetched data - def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { + def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) { val pti = partitionMap(topicAndPartition) if (pti.getFetchOffset != fetchOffset) throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 0e64632..8db9203 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -71,7 +71,7 @@ class SimpleConsumer(val host: String, response = blockingChannel.receive() } catch { case e : Throwable => - info("Reconnect due to socket error: %s".format(e.getMessage)) + info("Reconnect due to socket error: %s".format(e.toString)) // retry once try { reconnect() diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b7bc5ff..87e2595 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -21,7 +21,7 @@ import kafka.utils._ import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup -import kafka.server.BrokerTopicStats +import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats} import java.io.{IOException, File} import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} @@ -51,11 +51,11 @@ import com.yammer.metrics.core.Gauge class Log(val dir: File, @volatile var config: LogConfig, @volatile var recoveryPoint: Long = 0L, - val scheduler: Scheduler, + scheduler: Scheduler, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ - + /* A lock that guards all modifications to the log */ private val lock = new Object @@ -67,7 +67,7 @@ class Log(val dir: File, loadSegments() /* Calculate the offset of the next message */ - private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset()) + @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt) val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name) @@ -167,6 +167,10 @@ class Log(val dir: File, for (s <- logSegments) s.index.sanityCheck() } + + private def updateLogEndOffset(messageOffset: Long) { + nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt) + } private def recoverLog() { // if we have the clean shutdown marker, skip recovery @@ -246,14 +250,14 @@ class Log(val dir: File, try { // they are valid, insert them in the log lock synchronized { - appendInfo.firstOffset = nextOffset.get + appendInfo.firstOffset = nextOffsetMetadata.messageOffset // maybe roll the log if this segment is full val segment = maybeRoll() if(assignOffsets) { // assign offsets to the message set - val offset = new AtomicLong(nextOffset.get) + val offset = new AtomicLong(nextOffsetMetadata.messageOffset) try { validMessages = validMessages.assignOffsets(offset, appendInfo.codec) } catch { @@ -262,7 +266,7 @@ class Log(val dir: File, appendInfo.lastOffset = offset.get - 1 } else { // we are taking the offsets we are given - if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get) + if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset) throw new IllegalArgumentException("Out of order offsets found in " + messages) } @@ -282,10 +286,10 @@ class Log(val dir: File, segment.append(appendInfo.firstOffset, validMessages) // increment the log end offset - nextOffset.set(appendInfo.lastOffset + 1) + updateLogEndOffset(appendInfo.lastOffset + 1) trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" - .format(this.name, appendInfo.firstOffset, nextOffset.get(), validMessages)) + .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages)) if(unflushedMessages >= config.flushInterval) flush() @@ -307,7 +311,7 @@ class Log(val dir: File, * @param offsetsMonotonic Are the offsets in this message set monotonically increasing */ case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) - + /** * Validate the following: *
    @@ -392,15 +396,15 @@ class Log(val dir: File, * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set). * * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment. - * @return The messages read + * @return The fetch data information including offset metadata and messages read */ - def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = { + def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = { trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) // check if the offset is valid and in range - val next = nextOffset.get + val next = nextOffsetMetadata.messageOffset if(startOffset == next) - return MessageSet.Empty + return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty) var entry = segments.floorEntry(startOffset) @@ -412,15 +416,26 @@ class Log(val dir: File, // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log while(entry != null) { - val messages = entry.getValue.read(startOffset, maxOffset, maxLength) - if(messages == null) + val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength) + if(fetchInfo == null) { entry = segments.higherEntry(entry.getKey) - else - return messages + } else { + return fetchInfo + } } - // okay we are beyond the end of the last segment but less than the log end offset - MessageSet.Empty + // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, + // this can happen when all messages with offset larger than start offsets have been deleted. + // In this case, we will return the empty set with log end offset metadata + FetchDataInfo(nextOffsetMetadata, MessageSet.Empty) + } + + /** + * Given a message offset, find its log metadata + */ + def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = { + val fetchDataInfo = read(offset, 1) + fetchDataInfo.fetchOffset } /** @@ -433,7 +448,7 @@ class Log(val dir: File, // find any segments that match the user-supplied predicate UNLESS it is the final segment // and it is empty (since we would just end up re-creating it val lastSegment = activeSegment - var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) + val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) val numToDelete = deletable.size if(numToDelete > 0) { lock synchronized { @@ -458,9 +473,14 @@ class Log(val dir: File, def logStartOffset: Long = logSegments.head.baseOffset /** + * The offset metadata of the next message that will be appended to the log + */ + def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata + + /** * The offset of the next message that will be appended to the log */ - def logEndOffset: Long = nextOffset.get + def logEndOffset: Long = nextOffsetMetadata.messageOffset /** * Roll the log over to a new empty log segment if necessary @@ -582,7 +602,7 @@ class Log(val dir: File, val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) deletable.foreach(deleteSegment(_)) activeSegment.truncateTo(targetOffset) - this.nextOffset.set(targetOffset) + updateLogEndOffset(targetOffset) this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) } } @@ -602,7 +622,7 @@ class Log(val dir: File, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, time = time)) - this.nextOffset.set(newOffset) + updateLogEndOffset(newOffset) this.recoveryPoint = math.min(newOffset, this.recoveryPoint) } } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index afbeffc..c20de4a 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -17,20 +17,22 @@ package kafka.log +import kafka.common._ +import kafka.message._ +import kafka.utils._ +import kafka.metrics.KafkaMetricsGroup + import scala.collection._ import scala.math import java.nio._ import java.util.Date import java.io.File -import kafka.common._ -import kafka.message._ -import kafka.utils._ -import kafka.metrics.KafkaMetricsGroup -import com.yammer.metrics.core.Gauge import java.lang.IllegalStateException import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import com.yammer.metrics.core.Gauge + /** * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. * A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'. @@ -325,7 +327,6 @@ private[log] class Cleaner(val id: Int, * @param log The log being cleaned * @param segments The group of segments being cleaned * @param map The offset map to use for cleaning segments - * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet * @param deleteHorizonMs The time to retain delete tombstones */ private[log] def cleanSegments(log: Log, diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0d6926e..8d42404 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -14,15 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.log +package kafka.log -import scala.math._ -import java.io.File import kafka.message._ import kafka.common._ import kafka.utils._ +import kafka.server.{LogOffsetMetadata, FetchDataInfo} -/** +import scala.math._ +import java.io.File + + + /** * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in @@ -86,7 +89,7 @@ class LogSegment(val log: FileMessageSet, * Find the physical file position for the first message with offset >= the requested offset. * * The lowerBound argument is an optimization that can be used if we already know a valid starting position - * in the file higher than the greast-lower-bound from the index. + * in the file higher than the greatest-lower-bound from the index. * * @param offset The offset we want to translate * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and @@ -99,7 +102,7 @@ class LogSegment(val log: FileMessageSet, val mapping = index.lookup(offset) log.searchFor(offset, max(mapping.position, startingFilePosition)) } - + /** * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. @@ -108,22 +111,26 @@ class LogSegment(val log: FileMessageSet, * @param maxSize The maximum number of bytes to include in the message set we read * @param maxOffset An optional maximum offset for the message set we read * - * @return The message set read or null if the startOffset is larger than the largest offset in this log. + * @return The fetched data information or null if the startOffset is larger than the largest offset in this log */ @threadsafe - def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = { + def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = { if(maxSize < 0) throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) - if(maxSize == 0) - return MessageSet.Empty - + val logSize = log.sizeInBytes // this may change, need to save a consistent copy val startPosition = translateOffset(startOffset) - + // if the start position is already off the end of the log, return null if(startPosition == null) return null - + + val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position) + + // if the size is zero, still return a log segment but with zero size + if(maxSize == 0) + return FetchDataInfo(offsetMetadata, MessageSet.Empty) + // calculate the length of the message set to read based on whether or not they gave us a maxOffset val length = maxOffset match { @@ -143,7 +150,7 @@ class LogSegment(val log: FileMessageSet, min(endPosition - startPosition.position, maxSize) } } - log.read(startPosition.position, length) + FetchDataInfo(offsetMetadata, log.read(startPosition.position, length)) } /** @@ -222,7 +229,7 @@ class LogSegment(val log: FileMessageSet, if(ms == null) { baseOffset } else { - ms.lastOption match { + ms.messageSet.lastOption match { case None => baseOffset case Some(last) => last.nextOffset } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 3b15254..5859ba8 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -18,21 +18,22 @@ package kafka.server import kafka.cluster.Broker -import collection.mutable -import scala.collection.Set -import scala.collection.Map -import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} -import kafka.metrics.KafkaMetricsGroup -import com.yammer.metrics.core.Gauge import kafka.utils.{Pool, ShutdownableThread} import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} -import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} +import kafka.api.{FetchRequest, FetchResponse, PartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} import kafka.utils.Utils.inLock +import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} +import kafka.metrics.KafkaMetricsGroup + +import scala.collection.mutable +import scala.collection.Set +import scala.collection.Map import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicLong +import com.yammer.metrics.core.Gauge /** * Abstract class for fetching data from multiple partitions from the same broker. @@ -59,7 +60,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke // process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, - partitionData: FetchResponsePartitionData) + partitionData: PartitionData) // handle a partition whose offset is out of range and return a new fetch offset def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long @@ -92,12 +93,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke val partitionsWithError = new mutable.HashSet[TopicAndPartition] var response: FetchResponse = null try { - trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) + trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) } catch { case t: Throwable => if (isRunning.get) { - warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.getMessage)) + warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys } diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala new file mode 100644 index 0000000..bb56336 --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.network.RequestChannel +import kafka.api.{FetchResponse, FetchRequest} +import kafka.common.{UnknownTopicOrPartitionException, NotLeaderForPartitionException, TopicAndPartition} + +import scala.collection.immutable.Map +import scala.collection.Seq + +/** + * A delayed fetch request, which is satisfied (or more + * accurately, unblocked) -- if: + * Case A: This broker is no longer the leader for some partitions it tries to fetch + * - should return whatever data is available for the rest partitions. + * Case B: This broker is does not know of some partitions it tries to fetch + * - should return whatever data is available for the rest partitions. + * Case C: The fetch offset locates not on the last segment of the log + * - should return all the data on that segment. + * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * - should return whatever data is available. + */ + +class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], + override val request: RequestChannel.Request, + override val delayMs: Long, + val fetch: FetchRequest, + private val partitionFetchOffsets: Map[TopicAndPartition, LogOffsetMetadata]) + extends DelayedRequest(keys, request, delayMs) { + + def isSatisfied(replicaManager: ReplicaManager) : Boolean = { + var accumulatedSize = 0 + val fromFollower = fetch.isFromFollower + partitionFetchOffsets.foreach { + case (topicAndPartition, fetchOffset) => + try { + if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { + val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) + val endOffset = + if (fromFollower) + replica.logEndOffset + else + replica.highWatermark + + if (endOffset.offsetOnOlderSegment(fetchOffset)) { + // Case C, this can happen when the new follower replica fetching on a truncated leader + debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition)) + } else if (fetchOffset.offsetOnOlderSegment(endOffset)) { + // Case C, this can happen when the folloer replica is lagging too much + debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch)) + return true + } else if (fetchOffset.precedes(endOffset)) { + accumulatedSize += endOffset.positionDiff(fetchOffset) + } + } + } catch { + case utpe: UnknownTopicOrPartitionException => // Case A + debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetch)) + return true + case nle: NotLeaderForPartitionException => // Case B + debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetch)) + return true + } + } + + // Case D + accumulatedSize >= fetch.minBytes + } + + def respond(replicaManager: ReplicaManager): FetchResponse = { + val topicData = replicaManager.readMessageSets(fetch) + FetchResponse(fetch.correlationId, topicData.mapValues(_.data)) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala new file mode 100644 index 0000000..9481508 --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api._ +import kafka.common.ErrorMapping +import kafka.common.TopicAndPartition +import kafka.utils.Logging +import kafka.network.RequestChannel + +import scala.Some +import scala.collection.immutable.Map +import scala.collection.Seq + +/** A delayed produce request, which is satisfied (or more + * accurately, unblocked) -- if for every partition it produce to: + * Case A: This broker is not the leader: unblock - should return error. + * Case B: This broker is the leader: + * B.1 - If there was a localError (when writing to the local log): unblock - should return error + * B.2 - else, at least requiredAcks replicas should be caught up to this request. + */ + +class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], + override val request: RequestChannel.Request, + override val delayMs: Long, + val produce: ProducerRequest, + val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus], + val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) + extends DelayedRequest(keys, request, delayMs) with Logging { + + // first update the acks pending variable according to the error code + partitionStatus foreach { case (topicAndPartition, delayedStatus) => + if (delayedStatus.responseStatus.error == ErrorMapping.NoError) { + // Timeout error state will be cleared when required acks are received + delayedStatus.acksPending = true + delayedStatus.responseStatus.error = ErrorMapping.RequestTimedOutCode + } else { + delayedStatus.acksPending = false + } + + trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) + } + + def respond(offsetManager: OffsetManager): RequestOrResponse = { + val responseStatus = partitionStatus.mapValues(status => status.responseStatus) + + val errorCode = responseStatus.find { case (_, status) => + status.error != ErrorMapping.NoError + }.map(_._2.error).getOrElse(ErrorMapping.NoError) + + if (errorCode == ErrorMapping.NoError) { + offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) + } + + val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize)) + .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) + + response + } + + def isSatisfied(replicaManager: ReplicaManager) = { + // check for each partition if it still has pending acks + partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) => + trace("Checking producer request satisfaction for %s, acksPending = %b" + .format(topicAndPartition, fetchPartitionStatus.acksPending)) + // skip those partitions that have already been satisfied + if (fetchPartitionStatus.acksPending) { + val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) + val (hasEnough, errorCode) = partitionOpt match { + case Some(partition) => + partition.checkEnoughReplicasReachOffset( + fetchPartitionStatus.requiredOffset, + produce.requiredAcks) + case None => + (false, ErrorMapping.UnknownTopicOrPartitionCode) + } + if (errorCode != ErrorMapping.NoError) { + fetchPartitionStatus.acksPending = false + fetchPartitionStatus.responseStatus.error = errorCode + } else if (hasEnough) { + fetchPartitionStatus.acksPending = false + fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError + } + } + } + + // unblocked if there are no partitions with pending acks + val satisfied = ! partitionStatus.exists(p => p._2.acksPending) + satisfied + } +} + +case class DelayedProduceResponseStatus(val requiredOffset: Long, + val responseStatus: ProducerResponseStatus) { + @volatile var acksPending = false + + override def toString = + "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( + acksPending, responseStatus.error, responseStatus.offset, requiredOffset) +} diff --git a/core/src/main/scala/kafka/server/DelayedRequestKey.scala b/core/src/main/scala/kafka/server/DelayedRequestKey.scala new file mode 100644 index 0000000..628ef59 --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedRequestKey.scala @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.TopicAndPartition + +/** + * Keys used for delayed request metrics recording + */ +trait DelayedRequestKey { + def keyLabel: String +} + +object DelayedRequestKey { + val globalLabel = "All" +} + +case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey { + + def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) + + override def keyLabel = "%s-%d".format(topic, partition) +} diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala new file mode 100644 index 0000000..26f278f --- /dev/null +++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.message.MessageSet + +case class FetchDataInfo(fetchOffset: LogOffsetMetadata, messageSet: MessageSet) diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala new file mode 100644 index 0000000..ed13188 --- /dev/null +++ b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.metrics.KafkaMetricsGroup +import kafka.network.RequestChannel +import kafka.api.FetchResponseSend + +import java.util.concurrent.TimeUnit + +/** + * The purgatory holding delayed fetch requests + */ +class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel) + extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) { + this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId) + + private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { + private val metricPrefix = if (forFollower) "Follower" else "Consumer" + + val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) + } + + private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) + private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) + + private def recordDelayedFetchExpired(forFollower: Boolean) { + val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics + else aggregateNonFollowerFetchRequestMetrics + + metrics.expiredRequestMeter.mark() + } + + /** + * Check if a specified delayed fetch request is satisfied + */ + def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager) + + /** + * When a delayed fetch request expires just answer it with whatever data is present + */ + def expire(delayedFetch: DelayedFetch) { + debug("Expiring fetch request %s.".format(delayedFetch.fetch)) + val fromFollower = delayedFetch.fetch.isFromFollower + recordDelayedFetchExpired(fromFollower) + respond(delayedFetch) + } + + // TODO: purgatory should not be responsible for sending back the responses + def respond(delayedFetch: DelayedFetch) { + val response = delayedFetch.respond(replicaManager) + requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response))) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fd5f12e..bb94673 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -23,13 +23,10 @@ import kafka.log._ import kafka.message._ import kafka.network._ import kafka.admin.AdminUtils -import kafka.metrics.KafkaMetricsGroup import kafka.network.RequestChannel.Response import kafka.controller.KafkaController -import kafka.utils.{Pool, SystemTime, Logging} +import kafka.utils.{SystemTime, Logging} -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic._ import scala.collection._ import org.I0Itec.zkclient.ZkClient @@ -45,11 +42,10 @@ class KafkaApis(val requestChannel: RequestChannel, val config: KafkaConfig, val controller: KafkaController) extends Logging { - private val producerRequestPurgatory = - new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests) - private val fetchRequestPurgatory = - new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) - private val delayedRequestMetrics = new DelayedRequestMetrics + val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager, offsetManager, requestChannel) + val fetchRequestPurgatory = new FetchRequestPurgatory(replicaManager, requestChannel) + // TODO: the following line will be removed in 0.9 + replicaManager.initWithRequestPurgatory(producerRequestPurgatory, fetchRequestPurgatory) var metadataCache = new MetadataCache this.logIdent = "[KafkaApi-%d] ".format(brokerId) @@ -127,22 +123,6 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) } - /** - * Check if a partitionData from a produce request can unblock any - * DelayedFetch requests. - */ - def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) { - val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes) - trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size)) - - // send any newly unblocked responses - for(fetchReq <- satisfied) { - val topicData = readMessageSets(fetchReq.fetch) - val response = FetchResponse(fetchReq.fetch.correlationId, topicData) - requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) - } - } - private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = { val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map { case (topicAndPartition, offset) => @@ -171,27 +151,21 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle a produce request or offset commit request (which is really a specialized producer request) */ def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { - - val (produceRequest, offsetCommitRequestOpt) = if (request.requestId == RequestKeys.OffsetCommitKey) { - val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] - (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) - } - else { - (request.requestObj.asInstanceOf[ProducerRequest], None) - } + val (produceRequest, offsetCommitRequestOpt) = + if (request.requestId == RequestKeys.OffsetCommitKey) { + val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) + } else { + (request.requestObj.asInstanceOf[ProducerRequest], None) + } val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(produceRequest) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) + val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) val numPartitionsInError = localProduceResults.count(_.error.isDefined) - produceRequest.data.foreach(partitionAndData => - maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes)) - - val allPartitionHaveReplicationFactorOne = - !produceRequest.data.keySet.exists( - m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1) if(produceRequest.requiredAcks == 0) { // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since // no response is expected by the producer the handler will send a close connection response to the socket server @@ -214,7 +188,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } else if (produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || - allPartitionHaveReplicationFactorOne || numPartitionsInError == produceRequest.numPartitions) { if (firstErrorCode == ErrorMapping.NoError) { @@ -229,46 +202,27 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // create a list of (topic, partition) pairs to use as keys for this delayed request val producerRequestKeys = produceRequest.data.keys.map( - topicAndPartition => new RequestKey(topicAndPartition)).toSeq + topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq val statuses = localProduceResults.map(r => r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap val delayedRequest = new DelayedProduce( producerRequestKeys, request, - statuses, - produceRequest, produceRequest.ackTimeoutMs.toLong, + produceRequest, + statuses, offsetCommitRequestOpt) - producerRequestPurgatory.watch(delayedRequest) - - /* - * Replica fetch requests may have arrived (and potentially satisfied) - * delayedProduce requests while they were being added to the purgatory. - * Here, we explicitly check if any of them can be satisfied. - */ - var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] - producerRequestKeys.foreach(key => - satisfiedProduceRequests ++= - producerRequestPurgatory.update(key, key)) - debug(satisfiedProduceRequests.size + - " producer requests unblocked during produce to local log.") - satisfiedProduceRequests.foreach(_.respond()) - - // we do not need the data anymore - produceRequest.emptyData() + // add the produce request for watch if it's not satisfied, otherwise send the response back + val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest) + if (satisfiedByMe) + producerRequestPurgatory.respond(delayedRequest) } - } - - case class DelayedProduceResponseStatus(requiredOffset: Long, - status: ProducerResponseStatus) { - var acksPending = false - override def toString = - "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( - acksPending, status.error, status.offset, requiredOffset) + // we do not need the data anymore + produceRequest.emptyData() } - + case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { def this(key: TopicAndPartition, throwable: Throwable) = this(key, -1L, -1L, Some(throwable)) @@ -288,13 +242,12 @@ class KafkaApis(val requestChannel: RequestChannel, partitionAndData.map {case (topicAndPartition, messages) => try { val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) - val info = - partitionOpt match { - case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) - case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" - .format(topicAndPartition, brokerId)) - - } + val info = partitionOpt match { + case Some(partition) => + partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) + case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" + .format(topicAndPartition, brokerId)) + } val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1) @@ -338,121 +291,58 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - if(fetchRequest.isFromFollower) { - maybeUpdatePartitionHw(fetchRequest) - // after updating HW, some delayed produce requests may be unblocked - var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] - fetchRequest.requestInfo.foreach { - case (topicAndPartition, _) => - val key = new RequestKey(topicAndPartition) - satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key) - } - debug("Replica %d fetch unblocked %d producer requests." - .format(fetchRequest.replicaId, satisfiedProduceRequests.size)) - satisfiedProduceRequests.foreach(_.respond()) - } - - val dataRead = readMessageSets(fetchRequest) - val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum + val dataRead = replicaManager.readMessageSets(fetchRequest) + + // if the fetch request comes from the follower, + // update its corresponding log end offset + if(fetchRequest.isFromFollower) + recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset)) + + // check if this fetch request can be satisfied right away + val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum + val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) => + errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError)) + // send the data immediately if 1) fetch request does not want to wait + // 2) fetch request does not require any data + // 3) has enough data to respond + // 4) some error happens while reading data if(fetchRequest.maxWait <= 0 || + fetchRequest.numPartitions <= 0 || bytesReadable >= fetchRequest.minBytes || - fetchRequest.numPartitions <= 0) { + errorReadingData) { debug("Returning fetch response %s for fetch request with correlation id %d to client %s" - .format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - val response = new FetchResponse(fetchRequest.correlationId, dataRead) + .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) + val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request - val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_)) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable) - fetchRequestPurgatory.watch(delayedFetch) + val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) + val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, + dataRead.mapValues(_.offset)) + + // add the fetch request for watch if it's not satisfied, otherwise send the response back + val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) + if (satisfiedByMe) + fetchRequestPurgatory.respond(delayedFetch) } } - private def maybeUpdatePartitionHw(fetchRequest: FetchRequest) { - debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest)) - fetchRequest.requestInfo.foreach(info => { - val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset) - replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset) - }) - } + private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) { + debug("Record follower log end offsets: %s ".format(offsets)) + offsets.foreach { + case (topicAndPartition, offset) => + replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic, + topicAndPartition.partition, replicaId, offset) - /** - * Read from all the offset details given and return a map of - * (topic, partition) -> PartitionData - */ - private def readMessageSets(fetchRequest: FetchRequest) = { - val isFetchFromFollower = fetchRequest.isFromFollower - fetchRequest.requestInfo.map - { - case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => - val partitionData = - try { - val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) - BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes) - if (!isFetchFromFollower) { - new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) - } else { - debug("Leader %d for partition [%s,%d] received fetch request from follower %d" - .format(brokerId, topic, partition, fetchRequest.replicaId)) - new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) - } - } catch { - // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException - // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request - // for a partition it is the leader for - case utpe: UnknownTopicOrPartitionException => - warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( - fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) - new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) - case nle: NotLeaderForPartitionException => - warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( - fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) - new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) - case t: Throwable => - BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() - error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s" - .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage)) - new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) - } - (TopicAndPartition(topic, partition), partitionData) + // for producer requests with ack > 1, we need to check + // if they can be unblocked after some follower's log end offsets have moved + replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition)) } } /** - * Read from a single topic/partition at the given offset upto maxSize bytes - */ - private def readMessageSet(topic: String, - partition: Int, - offset: Long, - maxSize: Int, - fromReplicaId: Int): (MessageSet, Long) = { - // check if the current broker is the leader for the partitions - val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) - replicaManager.getReplicaOrException(topic, partition) - else - replicaManager.getLeaderReplicaIfLocal(topic, partition) - trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val maxOffsetOpt = - if (Request.isValidBrokerId(fromReplicaId)) - None - else - Some(localReplica.highWatermark) - val messages = localReplica.log match { - case Some(log) => - log.read(offset, maxSize, maxOffsetOpt) - case None => - error("Leader for partition [%s,%d] on broker %d does not have a local log".format(topic, partition, brokerId)) - MessageSet.Empty - } - (messages, localReplica.highWatermark) - } - - /** * Service the offset request API */ def handleOffsetRequest(request: RequestChannel.Request) { @@ -473,7 +363,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!offsetRequest.isFromOrdinaryClient) { allOffsets } else { - val hw = localReplica.highWatermark + val hw = localReplica.highWatermark.messageOffset if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) else @@ -643,209 +533,5 @@ class KafkaApis(val requestChannel: RequestChannel, producerRequestPurgatory.shutdown() debug("Shut down complete.") } - - private [kafka] trait MetricKey { - def keyLabel: String - } - private [kafka] object MetricKey { - val globalLabel = "All" - } - - private [kafka] case class RequestKey(topic: String, partition: Int) - extends MetricKey { - - def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) - - def topicAndPartition = TopicAndPartition(topic, partition) - - override def keyLabel = "%s-%d".format(topic, partition) - } - - /** - * A delayed fetch request - */ - class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) - extends DelayedRequest(keys, request, delayMs) { - val bytesAccumulated = new AtomicLong(initialSize) - } - - /** - * A holding pen for fetch requests waiting to be satisfied - */ - class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int) - extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) { - this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId) - - /** - * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field - */ - def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = { - val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes) - accumulatedSize >= delayedFetch.fetch.minBytes - } - - /** - * When a request expires just answer it with whatever data is present - */ - def expire(delayed: DelayedFetch) { - debug("Expiring fetch request %s.".format(delayed.fetch)) - try { - val topicData = readMessageSets(delayed.fetch) - val response = FetchResponse(delayed.fetch.correlationId, topicData) - val fromFollower = delayed.fetch.isFromFollower - delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) - requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) - } - catch { - case e1: LeaderNotAvailableException => - debug("Leader changed before fetch request %s expired.".format(delayed.fetch)) - case e2: UnknownTopicOrPartitionException => - debug("Replica went offline before fetch request %s expired.".format(delayed.fetch)) - } - } - } - - class DelayedProduce(keys: Seq[RequestKey], - request: RequestChannel.Request, - val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus], - produce: ProducerRequest, - delayMs: Long, - offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) - extends DelayedRequest(keys, request, delayMs) with Logging { - - // first update the acks pending variable according to the error code - partitionStatus foreach { case (topicAndPartition, delayedStatus) => - if (delayedStatus.status.error == ErrorMapping.NoError) { - // Timeout error state will be cleared when requiredAcks are received - delayedStatus.acksPending = true - delayedStatus.status.error = ErrorMapping.RequestTimedOutCode - } else { - delayedStatus.acksPending = false - } - - trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) - } - - def respond() { - val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) => - topicAndPartition -> delayedStatus.status - } - - val errorCode = responseStatus.find { case (_, status) => - status.error != ErrorMapping.NoError - }.map(_._2.error).getOrElse(ErrorMapping.NoError) - - if (errorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) - } - - val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, config.offsetMetadataMaxSize)) - .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) - - requestChannel.sendResponse(new RequestChannel.Response( - request, new BoundedByteBufferSend(response))) - } - - /** - * Returns true if this delayed produce request is satisfied (or more - * accurately, unblocked) -- this is the case if for every partition: - * Case A: This broker is not the leader: unblock - should return error. - * Case B: This broker is the leader: - * B.1 - If there was a localError (when writing to the local log): unblock - should return error - * B.2 - else, at least requiredAcks replicas should be caught up to this request. - * - * As partitions become acknowledged, we may be able to unblock - * DelayedFetchRequests that are pending on those partitions. - */ - def isSatisfied(followerFetchRequestKey: RequestKey) = { - val topic = followerFetchRequestKey.topic - val partitionId = followerFetchRequestKey.partition - val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId)) - trace("Checking producer request satisfaction for %s-%d, acksPending = %b" - .format(topic, partitionId, fetchPartitionStatus.acksPending)) - if (fetchPartitionStatus.acksPending) { - val partitionOpt = replicaManager.getPartition(topic, partitionId) - val (hasEnough, errorCode) = partitionOpt match { - case Some(partition) => - partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks) - case None => - (false, ErrorMapping.UnknownTopicOrPartitionCode) - } - if (errorCode != ErrorMapping.NoError) { - fetchPartitionStatus. acksPending = false - fetchPartitionStatus.status.error = errorCode - } else if (hasEnough) { - fetchPartitionStatus.acksPending = false - fetchPartitionStatus.status.error = ErrorMapping.NoError - } - if (!fetchPartitionStatus.acksPending) { - val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition) - maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes) - } - } - - // unblocked if there are no partitions with pending acks - val satisfied = ! partitionStatus.exists(p => p._2.acksPending) - trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied)) - satisfied - } - } - - /** - * A holding pen for produce requests waiting to be satisfied. - */ - private [kafka] class ProducerRequestPurgatory(purgeInterval: Int) - extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) { - this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId) - - protected def checkSatisfied(followerFetchRequestKey: RequestKey, - delayedProduce: DelayedProduce) = - delayedProduce.isSatisfied(followerFetchRequestKey) - - /** - * Handle an expired delayed request - */ - protected def expire(delayedProduce: DelayedProduce) { - for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) - delayedRequestMetrics.recordDelayedProducerKeyExpired(RequestKey(topicPartition.topic, topicPartition.partition)) - - delayedProduce.respond() - } - } - - private class DelayedRequestMetrics { - private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { - val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - - private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { - private val metricPrefix = if (forFollower) "Follower" else "Consumer" - - val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - private val producerRequestMetricsForKey = { - val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") - new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory)) - } - - private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics - - private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) - private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) - - def recordDelayedProducerKeyExpired(key: MetricKey) { - val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) - List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) - } - - def recordDelayedFetchExpired(forFollower: Boolean) { - val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics - else aggregateNonFollowerFetchRequestMetrics - - metrics.expiredRequestMeter.mark() - } - } } diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala new file mode 100644 index 0000000..be5afed --- /dev/null +++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.apache.kafka.common.KafkaException + +object LogOffsetMetadata { + val UnknownOffsetMetadata = new LogOffsetMetadata(-1, 0, 0) + val UnknownSegBaseOffset = -1L + val UnknownFilePosition = -1 + + class OffsetOrdering extends Ordering[LogOffsetMetadata] { + override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = { + return x.messageDiff(y).toInt + } + } + +} + +/* + * A log offset structure, including: + * 1. the message offset + * 2. the base message offset of the located segment + * 3. the physical position on the located segment + */ +case class LogOffsetMetadata(messageOffset: Long, + segmentBaseOffset: Long = LogOffsetMetadata.UnknownSegBaseOffset, + relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) { + + // check if this offset is already on an older segment compared with the given offset + def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = { + if (messageOffsetOnly()) + throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that)) + + this.segmentBaseOffset < that.segmentBaseOffset + } + + // check if this offset is on the same segment with the given offset + def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = { + if (messageOffsetOnly()) + throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that)) + + this.segmentBaseOffset == that.segmentBaseOffset + } + + // check if this offset is before the given offset + def precedes(that: LogOffsetMetadata): Boolean = this.messageOffset < that.messageOffset + + // compute the number of messages between this offset to the given offset + def messageDiff(that: LogOffsetMetadata): Long = { + this.messageOffset - that.messageOffset + } + + // compute the number of bytes between this offset to the given offset + // if they are on the same segment and this offset precedes the given offset + def positionDiff(that: LogOffsetMetadata): Int = { + if(!offsetOnSameSegment(that)) + throw new KafkaException("%s cannot compare its segment position with %s since they are not on the same segment".format(this, that)) + if(messageOffsetOnly()) + throw new KafkaException("%s cannot compare its segment position with %s since it only has message offset info".format(this, that)) + + this.relativePositionInSegment - that.relativePositionInSegment + } + + // decide if the offset metadata only contains message offset info + def messageOffsetOnly(): Boolean = { + segmentBaseOffset == LogOffsetMetadata.UnknownSegBaseOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition + } + + override def toString = messageOffset.toString + " [" + segmentBaseOffset + " : " + relativePositionInSegment + "]" + +} diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 0e22897..43eb2a3 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -17,26 +17,29 @@ package kafka.server +import org.apache.kafka.common.protocol.types.{Struct, Schema, Field} +import org.apache.kafka.common.protocol.types.Type.STRING +import org.apache.kafka.common.protocol.types.Type.INT32 +import org.apache.kafka.common.protocol.types.Type.INT64 + import kafka.utils._ import kafka.common._ -import java.nio.ByteBuffer -import java.util.Properties import kafka.log.{FileMessageSet, LogConfig} -import org.I0Itec.zkclient.ZkClient -import scala.collection._ import kafka.message._ -import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup -import com.yammer.metrics.core.Gauge -import scala.Some import kafka.common.TopicAndPartition import kafka.tools.MessageFormatter + +import scala.Some +import scala.collection._ import java.io.PrintStream -import org.apache.kafka.common.protocol.types.{Struct, Schema, Field} -import org.apache.kafka.common.protocol.types.Type.STRING -import org.apache.kafka.common.protocol.types.Type.INT32 -import org.apache.kafka.common.protocol.types.Type.INT64 import java.util.concurrent.atomic.AtomicBoolean +import java.nio.ByteBuffer +import java.util.Properties +import java.util.concurrent.TimeUnit + +import com.yammer.metrics.core.Gauge +import org.I0Itec.zkclient.ZkClient /** @@ -271,7 +274,7 @@ class OffsetManager(val config: OffsetManagerConfig, // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { buffer.clear() - val messages = log.read(currOffset, config.loadBufferSize).asInstanceOf[FileMessageSet] + val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet] messages.readInto(buffer, 0) val messageSet = new ByteBufferMessageSet(buffer) messageSet.foreach { msgAndOffset => @@ -312,7 +315,7 @@ class OffsetManager(val config: OffsetManagerConfig, val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, partitionId) val hw = partitionOpt.map { partition => - partition.leaderReplicaIfLocal().map(_.highWatermark).getOrElse(-1L) + partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L) }.getOrElse(-1L) hw diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala new file mode 100644 index 0000000..d4a7d4a --- /dev/null +++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.Pool +import kafka.network.{BoundedByteBufferSend, RequestChannel} + +import java.util.concurrent.TimeUnit + +/** + * The purgatory holding delayed producer requests + */ +class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) + extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { + this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) + + private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup { + val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) + } + + private val producerRequestMetricsForKey = { + val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") + new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory)) + } + + private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics + + private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) { + val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) + List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) + } + + /** + * Check if a specified delayed fetch request is satisfied + */ + def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager) + + /** + * When a delayed produce request expires answer it with possible time out error codes + */ + def expire(delayedProduce: DelayedProduce) { + debug("Expiring produce request %s.".format(delayedProduce.produce)) + for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) + recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition)) + respond(delayedProduce) + } + + // TODO: purgatory should not be responsible for sending back the responses + def respond(delayedProduce: DelayedProduce) { + val response = delayedProduce.respond(offsetManager) + requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response))) + } +} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 75ae1e1..0d3da19 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -21,7 +21,7 @@ import kafka.admin.AdminUtils import kafka.cluster.Broker import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet -import kafka.api.{OffsetRequest, FetchResponsePartitionData} +import kafka.api.{OffsetRequest, PartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} class ReplicaFetcherThread(name:String, @@ -40,23 +40,26 @@ class ReplicaFetcherThread(name:String, isInterruptible = false) { // process fetched data - def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { + def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) { try { val topic = topicAndPartition.topic val partitionId = topicAndPartition.partition val replica = replicaMgr.getReplica(topic, partitionId).get val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - if (fetchOffset != replica.logEndOffset) - throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset)) + if (fetchOffset != replica.logEndOffset.messageOffset) + throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset)) trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d" - .format(replica.brokerId, replica.logEndOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw)) + .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw)) replica.log.get.append(messageSet, assignOffsets = false) trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s" - .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, topicAndPartition)) - val followerHighWatermark = replica.logEndOffset.min(partitionData.hw) - replica.highWatermark = followerHighWatermark - trace("Follower %d set replica highwatermark for partition [%s,%d] to %d" + .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicAndPartition)) + val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.hw) + // for the follower replica, we do not need to keep + // its segment base offset the physical position, + // these values will be computed upon making the leader + replica.highWatermark = new LogOffsetMetadata(followerHighWatermark) + trace("Follower %d set replica high watermark for partition [%s,%d] to %s" .format(replica.brokerId, topic, partitionId, followerHighWatermark)) } catch { case e: KafkaStorageException => @@ -82,7 +85,7 @@ class ReplicaFetcherThread(name:String, * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. */ val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) - if (leaderEndOffset < replica.logEndOffset) { + if (leaderEndOffset < replica.logEndOffset.messageOffset) { // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. @@ -91,13 +94,13 @@ class ReplicaFetcherThread(name:String, // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" - .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset)) + .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) Runtime.getRuntime.halt(1) } replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset)) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderEndOffset)) + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset)) leaderEndOffset } else { /** @@ -109,7 +112,7 @@ class ReplicaFetcherThread(name:String, val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderStartOffset)) + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) leaderStartOffset } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 897783c..2715945 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -16,29 +16,39 @@ */ package kafka.server -import collection._ -import mutable.HashMap -import kafka.cluster.{Broker, Partition, Replica} +import kafka.api._ +import kafka.common._ import kafka.utils._ +import kafka.cluster.{Broker, Partition, Replica} import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup -import kafka.common._ -import kafka.api.{UpdateMetadataRequest, StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController -import org.I0Itec.zkclient.ZkClient -import com.yammer.metrics.core.Gauge +import kafka.common.TopicAndPartition +import kafka.message.MessageSet + import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} import java.util.concurrent.TimeUnit +import scala.Predef._ +import scala.collection._ +import scala.collection.mutable.HashMap +import scala.collection.Map +import scala.collection.Set +import scala.Some + +import org.I0Itec.zkclient.ZkClient +import com.yammer.metrics.core.Gauge object ReplicaManager { - val UnknownLogEndOffset = -1L val HighWatermarkFilename = "replication-offset-checkpoint" } -class ReplicaManager(val config: KafkaConfig, - time: Time, - val zkClient: ZkClient, +case class PartitionDataAndOffset(data: PartitionData, offset: LogOffsetMetadata) + + +class ReplicaManager(val config: KafkaConfig, + time: Time, + val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup { @@ -54,6 +64,9 @@ class ReplicaManager(val config: KafkaConfig, this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger + var producerRequestPurgatory: ProducerRequestPurgatory = null + var fetchRequestPurgatory: FetchRequestPurgatory = null + newGauge( "LeaderCount", new Gauge[Int] { @@ -87,17 +100,37 @@ class ReplicaManager(val config: KafkaConfig, } /** - * This function is only used in two places: in Partition.updateISR() and KafkaApis.handleProducerRequest(). - * In the former case, the partition should have been created, in the latter case, return -1 will put the request into purgatory + * Initialize the replica manager with the request purgatory + * + * TODO: will be removed in 0.9 where we refactor server structure */ - def getReplicationFactorForPartition(topic: String, partitionId: Int) = { - val partitionOpt = getPartition(topic, partitionId) - partitionOpt match { - case Some(partition) => - partition.replicationFactor - case None => - -1 - } + + def initWithRequestPurgatory(producerRequestPurgatory: ProducerRequestPurgatory, fetchRequestPurgatory: FetchRequestPurgatory) { + this.producerRequestPurgatory = producerRequestPurgatory + this.fetchRequestPurgatory = fetchRequestPurgatory + } + + /** + * Unblock some delayed produce requests with the request key + */ + def unblockDelayedProduceRequests(key: DelayedRequestKey) { + val satisfied = producerRequestPurgatory.update(key) + debug("Request key %s unblocked %d producer requests." + .format(key.keyLabel, satisfied.size)) + + // send any newly unblocked responses + satisfied.foreach(producerRequestPurgatory.respond(_)) + } + + /** + * Unblock some delayed fetch requests with the request key + */ + def unblockDelayedFetchRequests(key: DelayedRequestKey) { + val satisfied = fetchRequestPurgatory.update(key) + debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size)) + + // send any newly unblocked responses + satisfied.foreach(fetchRequestPurgatory.respond(_)) } def startup() { @@ -155,10 +188,10 @@ class ReplicaManager(val config: KafkaConfig, } } - def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = { + def getOrCreatePartition(topic: String, partitionId: Int): Partition = { var partition = allPartitions.get((topic, partitionId)) if (partition == null) { - allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, replicationFactor, time, this)) + allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this)) partition = allPartitions.get((topic, partitionId)) } partition @@ -203,6 +236,77 @@ class ReplicaManager(val config: KafkaConfig, } } + /** + * Read from all the offset details given and return a map of + * (topic, partition) -> PartitionData + */ + def readMessageSets(fetchRequest: FetchRequest) = { + val isFetchFromFollower = fetchRequest.isFromFollower + fetchRequest.requestInfo.map + { + case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => + val partitionDataAndOffsetInfo = + try { + val (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) + BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) + if (isFetchFromFollower) { + debug("Partition [%s,%d] received fetch request from follower %d" + .format(topic, partition, fetchRequest.replicaId)) + } + new PartitionDataAndOffset(new PartitionData(ErrorMapping.NoError, highWatermark, fetchInfo.messageSet), fetchInfo.fetchOffset) + } catch { + // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException + // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request + // for a partition it is the leader for + case utpe: UnknownTopicOrPartitionException => + warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( + fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) + new PartitionDataAndOffset(new PartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) + case nle: NotLeaderForPartitionException => + warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( + fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) + new PartitionDataAndOffset(new PartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) + case t: Throwable => + BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() + error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s" + .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage)) + new PartitionDataAndOffset(new PartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) + } + (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) + } + } + + /** + * Read from a single topic/partition at the given offset upto maxSize bytes + */ + private def readMessageSet(topic: String, + partition: Int, + offset: Long, + maxSize: Int, + fromReplicaId: Int): (FetchDataInfo, Long) = { + // check if the current broker is the leader for the partitions + val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) + getReplicaOrException(topic, partition) + else + getLeaderReplicaIfLocal(topic, partition) + trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) + val maxOffsetOpt = + if (Request.isValidBrokerId(fromReplicaId)) + None + else + Some(localReplica.highWatermark.messageOffset) + val fetchInfo = localReplica.log match { + case Some(log) => + log.read(offset, maxSize, maxOffsetOpt) + case None => + error("Leader for partition [%s,%d] does not have a local log".format(topic, partition)) + FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + } + (fetchInfo, localReplica.highWatermark.messageOffset) + } + def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) { replicaStateChangeLock synchronized { if(updateMetadataRequest.controllerEpoch < controllerEpoch) { @@ -243,7 +347,7 @@ class ReplicaManager(val config: KafkaConfig, // First check partition's leader epoch val partitionState = new HashMap[Partition, PartitionStateInfo]() leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) => - val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) + val partition = getOrCreatePartition(topic, partitionId) val partitionLeaderEpoch = partition.getLeaderEpoch() // If the leader epoch is valid record the epoch of the controller that made the leadership decision. // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path @@ -403,7 +507,7 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId))) } - logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap) + logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap) partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + @@ -421,7 +525,9 @@ class ReplicaManager(val config: KafkaConfig, else { // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => - new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap + new TopicAndPartition(partition) -> BrokerAndInitialOffset( + leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, + partition.getReplica().get.logEndOffset.messageOffset)).toMap replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) partitionsToMakeFollower.foreach { partition => @@ -451,12 +557,23 @@ class ReplicaManager(val config: KafkaConfig, allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) } - def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = { - val partitionOpt = getPartition(topic, partitionId) - if(partitionOpt.isDefined) { - partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset) - } else { - warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId)) + def updateReplicaLEOAndPartitionHW(topic: String, partitionId: Int, replicaId: Int, offset: LogOffsetMetadata) = { + getPartition(topic, partitionId) match { + case Some(partition) => + val replicaOpt = partition.getReplica(replicaId) + if(!replicaOpt.isDefined) { + throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + + " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, + offset.messageOffset, partition.assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) + } + val replica = replicaOpt.get + replica.logEndOffset = offset + + // check if we need to update HW and expand Isr + partition.updateLeaderHWAndMaybeExpandIsr(replicaId) + debug("Recorded follower %d position %d for partition [%s,%d].".format(replicaId, offset.messageOffset, topic, partitionId)) + case None => + warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId)) } } @@ -470,7 +587,7 @@ class ReplicaManager(val config: KafkaConfig, val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { - val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap + val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap try { highWatermarkCheckpoints(dir).write(hwms) } catch { diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 3d0ff1e..ce06d2c 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -17,13 +17,15 @@ package kafka.server -import scala.collection._ -import java.util.concurrent._ -import java.util.concurrent.atomic._ import kafka.network._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup + import java.util +import java.util.concurrent._ +import java.util.concurrent.atomic._ +import scala.collection._ + import com.yammer.metrics.core.Gauge @@ -45,8 +47,10 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * * For us the key is generally a (topic, partition) pair. * By calling - * watch(delayedRequest) - * we will add triggers for each of the given keys. It is up to the user to then call + * val isSatisfiedByMe = checkAndMaybeWatch(delayedRequest) + * we will check if a request is satisfied already, and if not add the request for watch on all its keys. + * + * It is up to the user to then call * val satisfied = update(key, request) * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this * new request. @@ -61,18 +65,23 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * this function handles delayed requests that have hit their time limit without being satisfied. * */ -abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 1000) +abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) extends Logging with KafkaMetricsGroup { /* a list of requests watching each key */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) - private val requestCounter = new AtomicInteger(0) + /* the number of requests being watched, duplicates added on different watchers are also counted */ + private val watched = new AtomicInteger(0) + + /* background thread expiring requests that have been waiting too long */ + private val expiredRequestReaper = new ExpiredRequestReaper + private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false) newGauge( "PurgatorySize", new Gauge[Int] { - def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests + def value = watched.get() + expiredRequestReaper.numRequests } ) @@ -83,41 +92,50 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } ) - /* background thread expiring requests that have been waiting too long */ - private val expiredRequestReaper = new ExpiredRequestReaper - private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false) expirationThread.start() /** - * Add a new delayed request watching the contained keys + * Try to add the request for watch on all keys. Return true iff the request is + * satisfied and the satisfaction is done by the caller. + * + * Requests can be watched on only a few of the keys if it is found satisfied when + * trying to add it to each one of the keys. In this case the request is still treated as satisfied + * and hence no longer watched. Those already added elements will be later purged by the expire reaper. */ - def watch(delayedRequest: T) { - requestCounter.getAndIncrement() - + def checkAndMaybeWatch(delayedRequest: T): Boolean = { for(key <- delayedRequest.keys) { - var lst = watchersFor(key) - lst.add(delayedRequest) + val lst = watchersFor(key) + if(!lst.checkAndMaybeAdd(delayedRequest)) { + if(delayedRequest.satisfied.compareAndSet(false, true)) + return true + else + return false + } } + + // if it is indeed watched, add to the expire queue also expiredRequestReaper.enqueue(delayedRequest) + + false } /** * Update any watchers and return a list of newly satisfied requests. */ - def update(key: Any, request: R): Seq[T] = { + def update(key: Any): Seq[T] = { val w = watchersForKey.get(key) if(w == null) Seq.empty else - w.collectSatisfiedRequests(request) + w.collectSatisfiedRequests() } private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) /** - * Check if this request satisfied this delayed request + * Check if this delayed request is already satisfied */ - protected def checkSatisfied(request: R, delayed: T): Boolean + protected def checkSatisfied(request: T): Boolean /** * Handle an expired delayed request @@ -125,7 +143,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge protected def expire(delayed: T) /** - * Shutdown the expirey thread + * Shutdown the expire reaper thread */ def shutdown() { expiredRequestReaper.shutdown() @@ -136,17 +154,26 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge * bookkeeping logic. */ private class Watchers { + private val requests = new util.ArrayList[T] - private val requests = new util.LinkedList[T] - - def numRequests = requests.size - - def add(t: T) { + // potentially add the element to watch if it is not satisfied yet + def checkAndMaybeAdd(t: T): Boolean = { synchronized { + // if it is already satisfied, do not add to the watch list + if (t.satisfied.get) + return false + // synchronize on the delayed request to avoid any race condition + // with expire and update threads on client-side. + if(t synchronized checkSatisfied(t)) { + return false + } requests.add(t) + watched.getAndIncrement() + return true } } + // traverse the list and purge satisfied elements def purgeSatisfied(): Int = { synchronized { val iter = requests.iterator() @@ -155,6 +182,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge val curr = iter.next if(curr.satisfied.get()) { iter.remove() + watched.getAndDecrement() purged += 1 } } @@ -162,7 +190,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } } - def collectSatisfiedRequests(request: R): Seq[T] = { + // traverse the list and try to satisfy watched elements + def collectSatisfiedRequests(): Seq[T] = { val response = new mutable.ArrayBuffer[T] synchronized { val iter = requests.iterator() @@ -174,9 +203,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } else { // synchronize on curr to avoid any race condition with expire // on client-side. - val satisfied = curr synchronized checkSatisfied(request, curr) + val satisfied = curr synchronized checkSatisfied(curr) if(satisfied) { iter.remove() + watched.getAndDecrement() val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { response += curr @@ -215,13 +245,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge expire(curr) } } - if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge + if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge debug("Beginning purgatory purge") - requestCounter.set(0) val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum - debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers)) + debug("Purged %d requests from watch lists.".format(numPurgedFromWatchers)) } } catch { case e: Exception => @@ -266,10 +295,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } /** - * Delete all expired events from the delay queue + * Delete all satisfied events from the delay queue and the watcher lists */ private def purgeSatisfied(): Int = { var purged = 0 + + // purge the delayed queue val iter = delayed.iterator() while(iter.hasNext) { val curr = iter.next() @@ -278,6 +309,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge purged += 1 } } + purged } } diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index af47836..0353295 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -199,7 +199,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa initialOffsetTime: Long, reportInterval: Long) extends Logging { private val fetchOffsetMap = new Pool[TopicAndPartition, Long] - private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]] + private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, PartitionData]] private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers)) private val verificationBarrier = new AtomicReference(new CountDownLatch(1)) @volatile private var lastReportTime = SystemTime.milliseconds @@ -222,7 +222,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa private def initialize() { for (topicAndPartition <- expectedReplicasPerTopicAndPartition.keySet) - messageSetCache.put(topicAndPartition, new Pool[Int, FetchResponsePartitionData]) + messageSetCache.put(topicAndPartition, new Pool[Int, PartitionData]) setInitialOffsets() } @@ -248,7 +248,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } } - def addFetchedData(topicAndPartition: TopicAndPartition, replicaId: Int, partitionData: FetchResponsePartitionData) { + def addFetchedData(topicAndPartition: TopicAndPartition, replicaId: Int, partitionData: PartitionData) { messageSetCache.get(topicAndPartition).put(replicaId, partitionData) } @@ -372,7 +372,7 @@ private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartiti } } else { for (topicAndPartition <- topicAndPartitions) - replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, new FetchResponsePartitionData(messages = MessageSet.Empty)) + replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, new PartitionData(messages = MessageSet.Empty)) } fetcherBarrier.countDown() diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala index 5f8f6bc..6c6c8e7 100644 --- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala @@ -17,15 +17,17 @@ package kafka.tools +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} + +import kafka.consumer._ + import java.util.Properties import java.util.Arrays -import kafka.consumer._ -import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} object TestEndToEndLatency { def main(args: Array[String]) { - if (args.length != 4) { - System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages") + if (args.length != 6) { + System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks") System.exit(1) } @@ -33,31 +35,37 @@ object TestEndToEndLatency { val zkConnect = args(1) val topic = args(2) val numMessages = args(3).toInt + val consumerFetchMaxWait = args(4).toInt + val producerAcks = args(5).toInt val consumerProps = new Properties() consumerProps.put("group.id", topic) consumerProps.put("auto.commit.enable", "false") consumerProps.put("auto.offset.reset", "largest") consumerProps.put("zookeeper.connect", zkConnect) - consumerProps.put("fetch.wait.max.ms", "1") + consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString) consumerProps.put("socket.timeout.ms", 1201000.toString) val config = new ConsumerConfig(consumerProps) val connector = Consumer.create(config) - var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head + val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head val iter = stream.iterator val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") + producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) val producer = new KafkaProducer(producerProps) + // make sure the consumer fetcher has started before sending data + Thread.sleep(5000) + val message = "hello there beautiful".getBytes var totalTime = 0.0 val latencies = new Array[Long](numMessages) for (i <- 0 until numMessages) { - var begin = System.nanoTime + val begin = System.nanoTime producer.send(new ProducerRecord(topic, message)) val received = iter.next val elapsed = System.nanoTime - begin diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 8fcd068..e19b8b2 100644 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -91,7 +91,7 @@ object StressTestLog { @volatile var offset = 0 override def work() { try { - log.read(offset, 1024, Some(offset+1)) match { + log.read(offset, 1024, Some(offset+1)).messageSet match { case read: FileMessageSet if read.sizeInBytes > 0 => { val first = read.head require(first.offset == offset, "We should either read nothing or the message we asked for.") diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index cd16ced..4bb243c 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -40,10 +40,10 @@ object SerializationTestUtils { private val isr1 = List(0, 1, 2) private val leader2 = 0 private val isr2 = List(0, 2, 3) - private val partitionDataFetchResponse0 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes))) - private val partitionDataFetchResponse1 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("second message".getBytes))) - private val partitionDataFetchResponse2 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("third message".getBytes))) - private val partitionDataFetchResponse3 = new FetchResponsePartitionData(messages = new ByteBufferMessageSet(new Message("fourth message".getBytes))) + private val partitionDataFetchResponse0 = new PartitionData(messages = new ByteBufferMessageSet(new Message("first message".getBytes))) + private val partitionDataFetchResponse1 = new PartitionData(messages = new ByteBufferMessageSet(new Message("second message".getBytes))) + private val partitionDataFetchResponse2 = new PartitionData(messages = new ByteBufferMessageSet(new Message("third message".getBytes))) + private val partitionDataFetchResponse3 = new PartitionData(messages = new ByteBufferMessageSet(new Message("fourth message".getBytes))) private val partitionDataFetchResponseMap = Map((0, partitionDataFetchResponse0), (1, partitionDataFetchResponse1), (2, partitionDataFetchResponse2), (3, partitionDataFetchResponse3)) private val topicDataFetchResponse = { diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 9f04bd3..a5386a0 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -74,7 +74,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val replica = servers.head.replicaManager.getReplica(topic, 0).get assertTrue("HighWatermark should equal logEndOffset with just 1 replica", - replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark) + replica.logEndOffset.messageOffset > 0 && replica.logEndOffset.equals(replica.highWatermark)) val request = new FetchRequestBuilder() .clientId("test-client") @@ -248,13 +248,13 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with "Published messages should be in the log") val replicaId = servers.head.config.brokerId - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") // test if the consumer received the messages in the correct order when producer has enabled request pipelining diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 7d4c70c..59bd8a9 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -94,7 +94,7 @@ class LogManagerTest extends JUnit3Suite { assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length) - assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes) try { log.read(0, 1024) @@ -137,7 +137,7 @@ class LogManagerTest extends JUnit3Suite { assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length) - assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes) try { log.read(0, 1024) fail("Should get exception from fetching earlier.") diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 6b76037..7b97e6a 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -78,7 +78,7 @@ class LogSegmentTest extends JUnit3Suite { val seg = createSegment(40) val ms = messages(50, "hello", "there", "little", "bee") seg.append(50, ms) - val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None) + val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).messageSet assertEquals(ms.toList, read.toList) } @@ -94,7 +94,7 @@ class LogSegmentTest extends JUnit3Suite { seg.append(baseOffset, ms) def validate(offset: Long) = assertEquals(ms.filter(_.offset == offset).toList, - seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).toList) + seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).messageSet.toList) validate(50) validate(51) validate(52) @@ -109,7 +109,7 @@ class LogSegmentTest extends JUnit3Suite { val ms = messages(50, "hello", "there") seg.append(50, ms) val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None) - assertNull("Read beyond the last offset in the segment should give null", null) + assertNull("Read beyond the last offset in the segment should give null", read) } /** @@ -124,7 +124,7 @@ class LogSegmentTest extends JUnit3Suite { val ms2 = messages(60, "alpha", "beta") seg.append(60, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) - assertEquals(ms2.toList, read.toList) + assertEquals(ms2.toList, read.messageSet.toList) } /** @@ -142,12 +142,12 @@ class LogSegmentTest extends JUnit3Suite { seg.append(offset+1, ms2) // check that we can read back both messages val read = seg.read(offset, None, 10000) - assertEquals(List(ms1.head, ms2.head), read.toList) + assertEquals(List(ms1.head, ms2.head), read.messageSet.toList) // now truncate off the last message seg.truncateTo(offset + 1) val read2 = seg.read(offset, None, 10000) - assertEquals(1, read2.size) - assertEquals(ms1.head, read2.head) + assertEquals(1, read2.messageSet.size) + assertEquals(ms1.head, read2.messageSet.head) offset += 1 } } @@ -204,7 +204,7 @@ class LogSegmentTest extends JUnit3Suite { TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt) seg.recover(64*1024) for(i <- 0 until 100) - assertEquals(i, seg.read(i, Some(i+1), 1024).head.offset) + assertEquals(i, seg.read(i, Some(i+1), 1024).messageSet.head.offset) } /** diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1da1393..577d102 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -131,11 +131,11 @@ class LogTest extends JUnitSuite { for(i <- 0 until messages.length) log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i))) for(i <- 0 until messages.length) { - val read = log.read(i, 100, Some(i+1)).head + val read = log.read(i, 100, Some(i+1)).messageSet.head assertEquals("Offset read should match order appended.", i, read.offset) assertEquals("Message should match appended.", messages(i), read.message) } - assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size) + assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size) } /** @@ -153,7 +153,7 @@ class LogTest extends JUnitSuite { log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false) for(i <- 50 until messageIds.max) { val idx = messageIds.indexWhere(_ >= i) - val read = log.read(i, 100, None).head + val read = log.read(i, 100, None).messageSet.head assertEquals("Offset read should match message id.", messageIds(idx), read.offset) assertEquals("Message should match appended.", messages(idx), read.message) } @@ -176,7 +176,7 @@ class LogTest extends JUnitSuite { // now manually truncate off all but one message from the first segment to create a gap in the messages log.logSegments.head.truncateTo(1) - assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset) + assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset) } /** @@ -188,7 +188,7 @@ class LogTest extends JUnitSuite { def testReadOutOfRange() { createEmptyLogs(logDir, 1024) val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time) - assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes) + assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).messageSet.sizeInBytes) try { log.read(0, 1024) fail("Expected exception on invalid read.") @@ -219,12 +219,12 @@ class LogTest extends JUnitSuite { /* do successive reads to ensure all our messages are there */ var offset = 0L for(i <- 0 until numMessages) { - val messages = log.read(offset, 1024*1024) + val messages = log.read(offset, 1024*1024).messageSet assertEquals("Offsets not equal", offset, messages.head.offset) assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message, messages.head.message) offset = messages.head.offset + 1 } - val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)) + val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet assertEquals("Should be no more messages", 0, lastRead.size) // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure @@ -245,7 +245,7 @@ class LogTest extends JUnitSuite { log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) - def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message) + def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message) /* we should always get the first message in the compressed set when reading any offset in the set */ assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) @@ -363,7 +363,7 @@ class LogTest extends JUnitSuite { log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) - assertEquals(i, log.read(i, 100, None).head.offset) + assertEquals(i, log.read(i, 100, None).messageSet.head.offset) log.close() } @@ -575,15 +575,15 @@ class LogTest extends JUnitSuite { @Test def testAppendMessageWithNullPayload() { - var log = new Log(logDir, + val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, time.scheduler, time) log.append(new ByteBufferMessageSet(new Message(bytes = null))) - val ms = log.read(0, 4096, None) - assertEquals(0, ms.head.offset) - assertTrue("Message payload should be null.", ms.head.message.isNull) + val messageSet = log.read(0, 4096, None).messageSet + assertEquals(0, messageSet.head.offset) + assertTrue("Message payload should be null.", messageSet.head.message.isNull) } @Test diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala index 6db245c..dd8847f 100644 --- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala @@ -31,7 +31,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { def createMessageSet(messages: Seq[Message]): MessageSet @Test - def testWrittenEqualsRead { + def testWrittenEqualsRead() { val messageSet = createMessageSet(messages) checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index e532c28..03a424d 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -58,7 +58,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { replicaManager.checkpointHighWatermarks() var fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(0L, fooPartition0Hw) - val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1) + val partition0 = replicaManager.getOrCreatePartition(topic, 0) // create leader and follower replicas val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig()) val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0)) @@ -67,18 +67,12 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { partition0.addReplicaIfNotExists(followerReplicaPartition0) replicaManager.checkpointHighWatermarks() fooPartition0Hw = hwmFor(replicaManager, topic, 0) - assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) - try { - followerReplicaPartition0.highWatermark - fail("Should fail with KafkaException") - }catch { - case e: KafkaException => // this is ok - } - // set the highwatermark for local replica - partition0.getReplica().get.highWatermark = 5L + assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw) + // set the high watermark for local replica + partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L) replicaManager.checkpointHighWatermarks() fooPartition0Hw = hwmFor(replicaManager, topic, 0) - assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) + assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw) EasyMock.verify(zkClient) } @@ -97,7 +91,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { replicaManager.checkpointHighWatermarks() var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(0L, topic1Partition0Hw) - val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1) + val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0) // create leader log val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig()) // create a local replica for topic1 @@ -105,15 +99,15 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0) replicaManager.checkpointHighWatermarks() topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) - assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw) - // set the highwatermark for local replica - topic1Partition0.getReplica().get.highWatermark = 5L + assertEquals(leaderReplicaTopic1Partition0.highWatermark.messageOffset, topic1Partition0Hw) + // set the high watermark for local replica + topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L) replicaManager.checkpointHighWatermarks() topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) - assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark) + assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark.messageOffset) assertEquals(5L, topic1Partition0Hw) // add another partition and set highwatermark - val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1) + val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0) // create leader log val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig()) // create a local replica for topic2 @@ -121,13 +115,13 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0) replicaManager.checkpointHighWatermarks() var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) - assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw) + assertEquals(leaderReplicaTopic2Partition0.highWatermark.messageOffset, topic2Partition0Hw) // set the highwatermark for local replica - topic2Partition0.getReplica().get.highWatermark = 15L - assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark) + topic2Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(15L) + assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark.messageOffset) // change the highwatermark for topic1 - topic1Partition0.getReplica().get.highWatermark = 10L - assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark) + topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(10L) + assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark.messageOffset) replicaManager.checkpointHighWatermarks() // verify checkpointed hw for topic 2 topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 2cd3a3f..cd302aa 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -46,7 +46,7 @@ class IsrExpirationTest extends JUnit3Suite { val leaderReplica = partition0.getReplica(configs.head.brokerId).get // let the follower catch up to 10 - (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10) + (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(10L)) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -69,7 +69,7 @@ class IsrExpirationTest extends JUnit3Suite { assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get // set remote replicas leo to something low, like 4 - (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 4L) + (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(4L)) // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than // replicaMaxLagBytes, the follower is out of sync. @@ -83,7 +83,7 @@ class IsrExpirationTest extends JUnit3Suite { localLog: Log): Partition = { val leaderId=config.brokerId val replicaManager = new ReplicaManager(config, time, null, null, null, new AtomicBoolean(false)) - val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1) + val partition = replicaManager.getOrCreatePartition(topic, partitionId) val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica @@ -97,7 +97,7 @@ class IsrExpirationTest extends JUnit3Suite { private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = { val log1 = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log1.logEndOffset).andReturn(logEndOffset).times(expectedCalls) + EasyMock.expect(log1.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(logEndOffset)).times(expectedCalls) EasyMock.replay(log1) log1 diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 0ec120a..d5d351c 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -85,7 +85,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // give some time for the follower 1 to record leader HW TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == numMessages, "Failed to update high watermark for follower after timeout") servers.foreach(server => server.replicaManager.checkpointHighWatermarks()) @@ -134,7 +134,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // give some time for follower 1 to record leader HW of 60 TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) @@ -147,7 +147,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val hw = 20L // give some time for follower 1 to record leader HW of 600 TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) @@ -165,7 +165,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // allow some time for the follower to get the leader HW TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // kill the server hosting the preferred replica server1.shutdown() @@ -191,7 +191,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // allow some time for the follower to get the leader HW TestUtils.waitUntilTrue(() => - server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server1.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 9abf219..a9c4ddc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -39,7 +39,7 @@ class ReplicaManagerTest extends JUnit3Suite { val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) - val partition = rm.getOrCreatePartition(topic, 1, 1) + val partition = rm.getOrCreatePartition(topic, 1) partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() } @@ -53,7 +53,7 @@ class ReplicaManagerTest extends JUnit3Suite { val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) - val partition = rm.getOrCreatePartition(topic, 1, 1) + val partition = rm.getOrCreatePartition(topic, 1) partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() } diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index 4f61f84..168712d 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -46,17 +46,17 @@ class RequestPurgatoryTest extends JUnit3Suite { def testRequestSatisfaction() { val r1 = new DelayedRequest(Array("test1"), null, 100000L) val r2 = new DelayedRequest(Array("test2"), null, 100000L) - assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1", producerRequest1).size) - purgatory.watch(r1) - assertEquals("Still nothing satisfied", 0, purgatory.update("test1", producerRequest1).size) - purgatory.watch(r2) - assertEquals("Still nothing satisfied", 0, purgatory.update("test2", producerRequest2).size) + assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size) + assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size) + assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size) purgatory.satisfied += r1 - assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1", producerRequest1)) - assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size) + assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1")) + assertEquals("Nothing satisfied", 0, purgatory.update("test1").size) purgatory.satisfied += r2 - assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2)) - assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size) + assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2")) + assertEquals("Nothing satisfied", 0, purgatory.update("test2").size) } @Test @@ -65,8 +65,8 @@ class RequestPurgatoryTest extends JUnit3Suite { val r1 = new DelayedRequest(Array("test1"), null, expiration) val r2 = new DelayedRequest(Array("test1"), null, 200000L) val start = System.currentTimeMillis - purgatory.watch(r1) - purgatory.watch(r2) + assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) + assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) purgatory.awaitExpiration(r1) val elapsed = System.currentTimeMillis - start assertTrue("r1 expired", purgatory.expired.contains(r1)) @@ -74,7 +74,7 @@ class RequestPurgatoryTest extends JUnit3Suite { assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) } - class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] { + class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] { val satisfied = mutable.Set[DelayedRequest]() val expired = mutable.Set[DelayedRequest]() def awaitExpiration(delayed: DelayedRequest) = { @@ -82,7 +82,7 @@ class RequestPurgatoryTest extends JUnit3Suite { delayed.wait() } } - def checkSatisfied(request: ProducerRequest, delayed: DelayedRequest): Boolean = satisfied.contains(delayed) + def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed) def expire(delayed: DelayedRequest) { expired += delayed delayed synchronized { diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index b1c4ce9..f98af71 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -16,17 +16,19 @@ */ package kafka.server +import kafka.api._ import kafka.cluster.{Partition, Replica} +import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.log.Log import kafka.message.{ByteBufferMessageSet, Message} import kafka.network.RequestChannel import kafka.utils.{ZkUtils, Time, TestUtils, MockTime} + +import scala.Some + import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite -import kafka.api._ -import scala.Some -import kafka.common.TopicAndPartition class SimpleFetchTest extends JUnit3Suite { @@ -45,13 +47,13 @@ class SimpleFetchTest extends JUnit3Suite { * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync * but is still in ISR (hasn't yet expired from ISR). * - * When a normal consumer fetches data, it only should only see data upto the HW of the leader, + * When a normal consumer fetches data, it should only see data up to the HW of the leader, * in this case up an offset of "5". */ def testNonReplicaSeesHwWhenFetching() { /* setup */ val time = new MockTime - val leo = 20 + val leo = 20L val hw = 5 val fetchSize = 100 val messages = new Message("test-message".getBytes()) @@ -64,7 +66,11 @@ class SimpleFetchTest extends JUnit3Suite { val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() EasyMock.expect(log) - EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn( + new FetchDataInfo( + new LogOffsetMetadata(0L, 0L, leo.toInt), + new ByteBufferMessageSet(messages) + )).anyTimes() EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) @@ -76,14 +82,26 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(0, fetchSize, Some(hw)) + val partitionData = new PartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) + }).anyTimes() EasyMock.replay(replicaManager) val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) - partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L + partition.getReplica(configs(1).brokerId).get.logEndOffset = new LogOffsetMetadata(leo - 5L, 0L, leo.toInt - 5) EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() + EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(0, fetchSize, Some(hw)) + val partitionData = new PartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) + }).anyTimes() + EasyMock.replay(replicaManager) val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) @@ -138,7 +156,11 @@ class SimpleFetchTest extends JUnit3Suite { val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() - EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn( + new FetchDataInfo( + new LogOffsetMetadata(followerLEO, 0L, followerLEO), + new ByteBufferMessageSet(messages) + )).anyTimes() EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) @@ -150,16 +172,28 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None) + val partitionData = new PartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) + }).anyTimes() EasyMock.replay(replicaManager) val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) - partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long] + partition.getReplica(followerReplicaId).get.logEndOffset = new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO) EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() - EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId, followerLEO)) + EasyMock.expect(replicaManager.updateReplicaLEOAndPartitionHW(topic, partitionId, followerReplicaId, new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO))) EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId)) EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() + EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None) + val partitionData = new PartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) + }).anyTimes() + EasyMock.expect(replicaManager.unblockDelayedProduceRequests(EasyMock.anyObject())).anyTimes() EasyMock.replay(replicaManager) val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) @@ -195,7 +229,7 @@ class SimpleFetchTest extends JUnit3Suite { private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = { - val partition = new Partition(topic, partitionId, 2, time, replicaManager) + val partition = new Partition(topic, partitionId, time, replicaManager) val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica @@ -204,7 +238,7 @@ class SimpleFetchTest extends JUnit3Suite { partition.inSyncReplicas = allReplicas.toSet // set the leader and its hw and the hw update time partition.leaderReplicaIdOpt = Some(leaderId) - leaderReplica.highWatermark = leaderHW + leaderReplica.highWatermark = new LogOffsetMetadata(leaderHW) partition }