diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 55a5982..51cdccf 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -17,16 +17,16 @@
package kafka.api
-import java.nio.ByteBuffer
import kafka.utils.nonthreadsafe
import kafka.api.ApiUtils._
-import scala.collection.immutable.Map
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.ConsumerConfig
-import java.util.concurrent.atomic.AtomicInteger
import kafka.network.RequestChannel
import kafka.message.MessageSet
+import java.util.concurrent.atomic.AtomicInteger
+import java.nio.ByteBuffer
+import scala.collection.immutable.Map
case class PartitionFetchInfo(offset: Long, fetchSize: Int)
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index d117f10..af93087 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -19,6 +19,7 @@ package kafka.api
import java.nio.ByteBuffer
import java.nio.channels.GatheringByteChannel
+
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.message.{MessageSet, ByteBufferMessageSet}
import kafka.network.{MultiSend, Send}
@@ -151,7 +152,7 @@ object FetchResponse {
case class FetchResponse(correlationId: Int,
- data: Map[TopicAndPartition, FetchResponsePartitionData]) {
+ data: Map[TopicAndPartition, FetchResponsePartitionData]) {
/**
* Partitions the data into a map of maps (one for each topic).
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 134aef9..ff106b4 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -21,7 +21,7 @@ import kafka.admin.AdminUtils
import kafka.utils._
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
-import kafka.server.{OffsetManager, ReplicaManager}
+import kafka.server.{TopicPartitionRequestKey, LogOffsetMetadata, OffsetManager, ReplicaManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet
@@ -29,7 +29,8 @@ import kafka.message.ByteBufferMessageSet
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.Utils.{inReadLock,inWriteLock}
-import scala.collection._
+import scala.Some
+import scala.collection.immutable.Set
import com.yammer.metrics.core.Gauge
@@ -39,18 +40,18 @@ import com.yammer.metrics.core.Gauge
*/
class Partition(val topic: String,
val partitionId: Int,
- var replicationFactor: Int,
time: Time,
- val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
+ replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
private val localBrokerId = replicaManager.config.brokerId
private val logManager = replicaManager.logManager
private val zkClient = replicaManager.zkClient
- var leaderReplicaIdOpt: Option[Int] = None
- var inSyncReplicas: Set[Replica] = Set.empty[Replica]
- private val assignedReplicaMap = new Pool[Int,Replica]
+ private val assignedReplicaMap = new Pool[Int, Replica]
+ // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
private var zkVersion: Int = LeaderAndIsr.initialZKVersion
- private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+ @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+ @volatile var leaderReplicaIdOpt: Option[Int] = None
+ @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
/* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
* One way of doing that is through the controller's start replica state change command. When a new broker starts up
* the controller sends it a start replica command containing the leader for each partition that the broker hosts.
@@ -58,7 +59,6 @@ class Partition(val topic: String,
* each partition. */
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
- private val stateChangeLogger = KafkaController.stateChangeLogger
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
@@ -72,13 +72,11 @@ class Partition(val topic: String,
)
def isUnderReplicated(): Boolean = {
- inReadLock(leaderIsrUpdateLock) {
- leaderReplicaIfLocal() match {
- case Some(_) =>
- inSyncReplicas.size < assignedReplicas.size
- case None =>
- false
- }
+ leaderReplicaIfLocal() match {
+ case Some(_) =>
+ inSyncReplicas.size < assignedReplicas.size
+ case None =>
+ false
}
}
@@ -114,15 +112,13 @@ class Partition(val topic: String,
}
def leaderReplicaIfLocal(): Option[Replica] = {
- inReadLock(leaderIsrUpdateLock) {
- leaderReplicaIdOpt match {
- case Some(leaderReplicaId) =>
- if (leaderReplicaId == localBrokerId)
- getReplica(localBrokerId)
- else
- None
- case None => None
- }
+ leaderReplicaIdOpt match {
+ case Some(leaderReplicaId) =>
+ if (leaderReplicaId == localBrokerId)
+ getReplica(localBrokerId)
+ else
+ None
+ case None => None
}
}
@@ -155,9 +151,7 @@ class Partition(val topic: String,
}
def getLeaderEpoch(): Int = {
- inReadLock(leaderIsrUpdateLock) {
- return this.leaderEpoch
- }
+ return this.leaderEpoch
}
/**
@@ -179,14 +173,17 @@ class Partition(val topic: String,
val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
// remove assigned replicas that have been removed by the controller
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
- // reset LogEndOffset for remote replicas
- assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
inSyncReplicas = newInSyncReplicas
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some(localBrokerId)
+ // construct the high watermark metadata for the new leader replica
+ val newLeaderReplica = getReplica().get
+ newLeaderReplica.convertHWToLocalOffsetMetadata()
+ // reset log end offset for remote replicas
+ assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata)
// we may need to increment high watermark since ISR could be down to 1
- maybeIncrementLeaderHW(getReplica().get)
+ maybeIncrementLeaderHW(newLeaderReplica)
if (topic == OffsetManager.OffsetsTopicName)
offsetManager.loadOffsetsFromLog(partitionId)
true
@@ -233,18 +230,8 @@ class Partition(val topic: String,
}
}
- def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
+ def updateLeaderHWAndMaybeExpandIsr(replicaId: Int) {
inWriteLock(leaderIsrUpdateLock) {
- debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
- val replicaOpt = getReplica(replicaId)
- if(!replicaOpt.isDefined) {
- throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" +
- " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId,
- offset, topic, partitionId, replicaId, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId))
- }
- val replica = replicaOpt.get
- replica.logEndOffset = offset
-
// check if this replica needs to be added to the ISR
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
@@ -253,8 +240,10 @@ class Partition(val topic: String,
// For a replica to get added back to ISR, it has to satisfy 3 conditions-
// 1. It is not already in the ISR
// 2. It is part of the assigned replica list. See KAFKA-1097
- // 3. It's log end offset >= leader's highwatermark
- if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) {
+ // 3. It's log end offset >= leader's high watermark
+ if (!inSyncReplicas.contains(replica) &&
+ assignedReplicas.map(_.brokerId).contains(replicaId) &&
+ replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
// expand ISR
val newInSyncReplicas = inSyncReplicas + replica
info("Expanding ISR for partition [%s,%d] from %s to %s"
@@ -270,29 +259,29 @@ class Partition(val topic: String,
}
def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
- inReadLock(leaderIsrUpdateLock) {
- leaderReplicaIfLocal() match {
- case Some(_) =>
- val numAcks = inSyncReplicas.count(r => {
- if (!r.isLocal)
- r.logEndOffset >= requiredOffset
- else
- true /* also count the local (leader) replica */
- })
- trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
- if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) ||
- (requiredAcks > 0 && numAcks >= requiredAcks)) {
- /*
- * requiredAcks < 0 means acknowledge after all replicas in ISR
- * are fully caught up to the (local) leader's offset
- * corresponding to this produce request.
- */
- (true, ErrorMapping.NoError)
- } else
- (false, ErrorMapping.NoError)
- case None =>
- (false, ErrorMapping.NotLeaderForPartitionCode)
- }
+ leaderReplicaIfLocal() match {
+ case Some(leaderReplica) =>
+ // keep the current immutable replica list reference
+ val curInSyncReplicas = inSyncReplicas
+ val numAcks = curInSyncReplicas.count(r => {
+ if (!r.isLocal)
+ r.logEndOffset.messageOffset >= requiredOffset
+ else
+ true /* also count the local (leader) replica */
+ })
+ trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
+ if ((requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) ||
+ (requiredAcks > 0 && numAcks >= requiredAcks)) {
+ /*
+ * requiredAcks < 0 means acknowledge after all replicas in ISR
+ * are fully caught up to the (local) leader's offset
+ * corresponding to this produce request.
+ */
+ (true, ErrorMapping.NoError)
+ } else
+ (false, ErrorMapping.NoError)
+ case None =>
+ (false, ErrorMapping.NotLeaderForPartitionCode)
}
}
@@ -302,15 +291,19 @@ class Partition(val topic: String,
*/
private def maybeIncrementLeaderHW(leaderReplica: Replica) {
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
- val newHighWatermark = allLogEndOffsets.min
+ val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark
- if(newHighWatermark > oldHighWatermark) {
+ if(oldHighWatermark.precedes(newHighWatermark)) {
leaderReplica.highWatermark = newHighWatermark
- debug("Highwatermark for partition [%s,%d] updated to %d".format(topic, partitionId, newHighWatermark))
+ debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
+ // some delayed requests may be unblocked after HW changed
+ val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId)
+ replicaManager.unblockDelayedFetchRequests(requestKey)
+ replicaManager.unblockDelayedProduceRequests(requestKey)
+ } else {
+ debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
+ .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
}
- else
- debug("Old hw for partition [%s,%d] is %d. New hw is %d. All leo's are %s"
- .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
@@ -349,7 +342,9 @@ class Partition(val topic: String,
if(stuckReplicas.size > 0)
debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
// Case 2 above
- val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages)
+ val slowReplicas = candidateReplicas.filter(r =>
+ r.logEndOffset.messageOffset >= 0 &&
+ leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)
if(slowReplicas.size > 0)
debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
stuckReplicas ++ slowReplicas
@@ -362,6 +357,8 @@ class Partition(val topic: String,
case Some(leaderReplica) =>
val log = leaderReplica.log.get
val info = log.append(messages, assignOffsets = true)
+ // probably unblock some follower fetch requests since log end offset has been updated
+ replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
@@ -399,14 +396,12 @@ class Partition(val topic: String,
}
override def toString(): String = {
- inReadLock(leaderIsrUpdateLock) {
- val partitionString = new StringBuilder
- partitionString.append("Topic: " + topic)
- partitionString.append("; Partition: " + partitionId)
- partitionString.append("; Leader: " + leaderReplicaIdOpt)
- partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
- partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
- partitionString.toString()
- }
+ val partitionString = new StringBuilder
+ partitionString.append("Topic: " + topic)
+ partitionString.append("; Partition: " + partitionId)
+ partitionString.append("; Leader: " + leaderReplicaIdOpt)
+ partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
+ partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+ partitionString.toString()
}
}
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 5e659b4..bd13c20 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -19,8 +19,9 @@ package kafka.cluster
import kafka.log.Log
import kafka.utils.{SystemTime, Time, Logging}
+import kafka.server.LogOffsetMetadata
import kafka.common.KafkaException
-import kafka.server.ReplicaManager
+
import java.util.concurrent.atomic.AtomicLong
class Replica(val brokerId: Int,
@@ -28,33 +29,17 @@ class Replica(val brokerId: Int,
time: Time = SystemTime,
initialHighWatermarkValue: Long = 0L,
val log: Option[Log] = None) extends Logging {
- //only defined in local replica
- private[this] var highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue)
- // only used for remote replica; logEndOffsetValue for local replica is kept in log
- private[this] var logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset)
- private[this] var logEndOffsetUpdateTimeMsValue: AtomicLong = new AtomicLong(time.milliseconds)
+ // the high watermark offset value, in non-leader replicas only its message offsets are kept
+ @volatile private[this] var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
+ // the log end offset value, kept in all replicas;
+ // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
+ @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
+ // the time when log offset is updated
+ private[this] val logEndOffsetUpdateTimeMsValue = new AtomicLong(time.milliseconds)
+
val topic = partition.topic
val partitionId = partition.partitionId
- def logEndOffset_=(newLogEndOffset: Long) {
- if (!isLocal) {
- logEndOffsetValue.set(newLogEndOffset)
- logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
- trace("Setting log end offset for replica %d for partition [%s,%d] to %d"
- .format(brokerId, topic, partitionId, logEndOffsetValue.get()))
- } else
- throw new KafkaException("Shouldn't set logEndOffset for replica %d partition [%s,%d] since it's local"
- .format(brokerId, topic, partitionId))
-
- }
-
- def logEndOffset = {
- if (isLocal)
- log.get.logEndOffset
- else
- logEndOffsetValue.get()
- }
-
def isLocal: Boolean = {
log match {
case Some(l) => true
@@ -62,24 +47,43 @@ class Replica(val brokerId: Int,
}
}
- def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get()
-
- def highWatermark_=(newHighWatermark: Long) {
+ def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {
if (isLocal) {
- trace("Setting hw for replica %d partition [%s,%d] on broker %d to %d"
- .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
- highWatermarkValue.set(newHighWatermark)
- } else
- throw new KafkaException("Unable to set highwatermark for replica %d partition [%s,%d] since it's not local"
- .format(brokerId, topic, partitionId))
+ throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId))
+ } else {
+ logEndOffsetMetadata = newLogEndOffset
+ logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
+ trace("Setting log end offset for replica %d for partition [%s,%d] to [%s]"
+ .format(brokerId, topic, partitionId, logEndOffsetMetadata))
+ }
}
- def highWatermark = {
+ def logEndOffset =
if (isLocal)
- highWatermarkValue.get()
+ log.get.logEndOffsetMetadata
else
- throw new KafkaException("Unable to get highwatermark for replica %d partition [%s,%d] since it's not local"
- .format(brokerId, topic, partitionId))
+ logEndOffsetMetadata
+
+ def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get()
+
+ def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
+ if (isLocal) {
+ highWatermarkMetadata = newHighWatermark
+ trace("Setting high watermark for replica %d partition [%s,%d] on broker %d to [%s]"
+ .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
+ } else {
+ throw new KafkaException("Should not set high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
+ }
+ }
+
+ def highWatermark = highWatermarkMetadata
+
+ def convertHWToLocalOffsetMetadata() = {
+ if (isLocal) {
+ highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
+ } else {
+ throw new KafkaException("Should not construct complete high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
+ }
}
override def equals(that: Any): Boolean = {
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 0e64632..8db9203 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -71,7 +71,7 @@ class SimpleConsumer(val host: String,
response = blockingChannel.receive()
} catch {
case e : Throwable =>
- info("Reconnect due to socket error: %s".format(e.getMessage))
+ info("Reconnect due to socket error: %s".format(e.toString))
// retry once
try {
reconnect()
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b7bc5ff..0ddf97b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -21,7 +21,7 @@ import kafka.utils._
import kafka.message._
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
-import kafka.server.BrokerTopicStats
+import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats}
import java.io.{IOException, File}
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
@@ -51,11 +51,11 @@ import com.yammer.metrics.core.Gauge
class Log(val dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
- val scheduler: Scheduler,
+ scheduler: Scheduler,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
-
+
/* A lock that guards all modifications to the log */
private val lock = new Object
@@ -67,7 +67,7 @@ class Log(val dir: File,
loadSegments()
/* Calculate the offset of the next message */
- private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
+ @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
@@ -167,6 +167,10 @@ class Log(val dir: File,
for (s <- logSegments)
s.index.sanityCheck()
}
+
+ private def updateLogEndOffset(messageOffset: Long) {
+ nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
+ }
private def recoverLog() {
// if we have the clean shutdown marker, skip recovery
@@ -246,14 +250,14 @@ class Log(val dir: File,
try {
// they are valid, insert them in the log
lock synchronized {
- appendInfo.firstOffset = nextOffset.get
+ appendInfo.firstOffset = nextOffsetMetadata.messageOffset
// maybe roll the log if this segment is full
val segment = maybeRoll()
if(assignOffsets) {
// assign offsets to the message set
- val offset = new AtomicLong(nextOffset.get)
+ val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
try {
validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
} catch {
@@ -262,7 +266,7 @@ class Log(val dir: File,
appendInfo.lastOffset = offset.get - 1
} else {
// we are taking the offsets we are given
- if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
+ if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " + messages)
}
@@ -282,10 +286,10 @@ class Log(val dir: File,
segment.append(appendInfo.firstOffset, validMessages)
// increment the log end offset
- nextOffset.set(appendInfo.lastOffset + 1)
+ updateLogEndOffset(appendInfo.lastOffset + 1)
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
- .format(this.name, appendInfo.firstOffset, nextOffset.get(), validMessages))
+ .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
if(unflushedMessages >= config.flushInterval)
flush()
@@ -307,7 +311,7 @@ class Log(val dir: File,
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
*/
case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
-
+
/**
* Validate the following:
*
@@ -387,20 +391,21 @@ class Log(val dir: File,
/**
* Read messages from the log
+ *
* @param startOffset The offset to begin reading at
* @param maxLength The maximum number of bytes to read
* @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
*
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
- * @return The messages read
+ * @return The fetch data information including fetch starting offset metadata and messages read
*/
- def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = {
+ def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
// check if the offset is valid and in range
- val next = nextOffset.get
+ val next = nextOffsetMetadata.messageOffset
if(startOffset == next)
- return MessageSet.Empty
+ return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
var entry = segments.floorEntry(startOffset)
@@ -412,15 +417,31 @@ class Log(val dir: File,
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
while(entry != null) {
- val messages = entry.getValue.read(startOffset, maxOffset, maxLength)
- if(messages == null)
+ val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)
+ if(fetchInfo == null) {
entry = segments.higherEntry(entry.getKey)
- else
- return messages
+ } else {
+ return fetchInfo
+ }
}
- // okay we are beyond the end of the last segment but less than the log end offset
- MessageSet.Empty
+ // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
+ // this can happen when all messages with offset larger than start offsets have been deleted.
+ // In this case, we will return the empty set with log end offset metadata
+ FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
+ }
+
+ /**
+ * Given a message offset, find its corresponding offset metadata in the log.
+ * If the message offset is out of range, return unknown offset metadata
+ */
+ def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
+ try {
+ val fetchDataInfo = read(offset, 1)
+ fetchDataInfo.fetchOffset
+ } catch {
+ case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
+ }
}
/**
@@ -433,7 +454,7 @@ class Log(val dir: File,
// find any segments that match the user-supplied predicate UNLESS it is the final segment
// and it is empty (since we would just end up re-creating it
val lastSegment = activeSegment
- var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
+ val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
val numToDelete = deletable.size
if(numToDelete > 0) {
lock synchronized {
@@ -458,9 +479,14 @@ class Log(val dir: File,
def logStartOffset: Long = logSegments.head.baseOffset
/**
+ * The offset metadata of the next message that will be appended to the log
+ */
+ def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+
+ /**
* The offset of the next message that will be appended to the log
*/
- def logEndOffset: Long = nextOffset.get
+ def logEndOffset: Long = nextOffsetMetadata.messageOffset
/**
* Roll the log over to a new empty log segment if necessary
@@ -582,7 +608,7 @@ class Log(val dir: File,
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
deletable.foreach(deleteSegment(_))
activeSegment.truncateTo(targetOffset)
- this.nextOffset.set(targetOffset)
+ updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
}
}
@@ -602,7 +628,7 @@ class Log(val dir: File,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
time = time))
- this.nextOffset.set(newOffset)
+ updateLogEndOffset(newOffset)
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
}
}
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index afbeffc..c20de4a 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -17,20 +17,22 @@
package kafka.log
+import kafka.common._
+import kafka.message._
+import kafka.utils._
+import kafka.metrics.KafkaMetricsGroup
+
import scala.collection._
import scala.math
import java.nio._
import java.util.Date
import java.io.File
-import kafka.common._
-import kafka.message._
-import kafka.utils._
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
import java.lang.IllegalStateException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
+import com.yammer.metrics.core.Gauge
+
/**
* The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
* A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.
@@ -325,7 +327,6 @@ private[log] class Cleaner(val id: Int,
* @param log The log being cleaned
* @param segments The group of segments being cleaned
* @param map The offset map to use for cleaning segments
- * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
* @param deleteHorizonMs The time to retain delete tombstones
*/
private[log] def cleanSegments(log: Log,
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 0d6926e..b7968ac 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -14,15 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
- package kafka.log
+package kafka.log
-import scala.math._
-import java.io.File
import kafka.message._
import kafka.common._
import kafka.utils._
+import kafka.server.{LogOffsetMetadata, FetchDataInfo}
-/**
+import scala.math._
+import java.io.File
+
+
+ /**
* A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
* the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
* segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
@@ -86,7 +89,7 @@ class LogSegment(val log: FileMessageSet,
* Find the physical file position for the first message with offset >= the requested offset.
*
* The lowerBound argument is an optimization that can be used if we already know a valid starting position
- * in the file higher than the greast-lower-bound from the index.
+ * in the file higher than the greatest-lower-bound from the index.
*
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
@@ -99,7 +102,7 @@ class LogSegment(val log: FileMessageSet,
val mapping = index.lookup(offset)
log.searchFor(offset, max(mapping.position, startingFilePosition))
}
-
+
/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
@@ -108,22 +111,27 @@ class LogSegment(val log: FileMessageSet,
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
*
- * @return The message set read or null if the startOffset is larger than the largest offset in this log.
+ * @return The fetched data information including the metadata of the offset >= startOffset,
+ * or null if the startOffset is larger than the largest offset in this log
*/
@threadsafe
- def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = {
+ def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
if(maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
- if(maxSize == 0)
- return MessageSet.Empty
-
+
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset)
-
+
// if the start position is already off the end of the log, return null
if(startPosition == null)
return null
-
+
+ val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
+
+ // if the size is zero, still return a log segment but with zero size
+ if(maxSize == 0)
+ return FetchDataInfo(offsetMetadata, MessageSet.Empty)
+
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
maxOffset match {
@@ -143,7 +151,7 @@ class LogSegment(val log: FileMessageSet,
min(endPosition - startPosition.position, maxSize)
}
}
- log.read(startPosition.position, length)
+ FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}
/**
@@ -222,7 +230,7 @@ class LogSegment(val log: FileMessageSet,
if(ms == null) {
baseOffset
} else {
- ms.lastOption match {
+ ms.messageSet.lastOption match {
case None => baseOffset
case Some(last) => last.nextOffset
}
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 3b15254..2e9532e 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,21 +18,22 @@
package kafka.server
import kafka.cluster.Broker
-import collection.mutable
-import scala.collection.Set
-import scala.collection.Map
-import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
import kafka.utils.{Pool, ShutdownableThread}
import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
import kafka.utils.Utils.inLock
+import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
+import kafka.metrics.KafkaMetricsGroup
+
+import scala.collection.mutable
+import scala.collection.Set
+import scala.collection.Map
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
+import com.yammer.metrics.core.Gauge
/**
* Abstract class for fetching data from multiple partitions from the same broker.
@@ -92,12 +93,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var response: FetchResponse = null
try {
- trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+ trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
response = simpleConsumer.fetch(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning.get) {
- warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.getMessage))
+ warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
partitionMapLock synchronized {
partitionsWithError ++= partitionMap.keys
}
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
new file mode 100644
index 0000000..e0f14e2
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.network.RequestChannel
+import kafka.api.{FetchResponse, FetchRequest}
+import kafka.common.{UnknownTopicOrPartitionException, NotLeaderForPartitionException, TopicAndPartition}
+
+import scala.collection.immutable.Map
+import scala.collection.Seq
+
+/**
+ * A delayed fetch request, which is satisfied (or more
+ * accurately, unblocked) -- if:
+ * Case A: This broker is no longer the leader for some partitions it tries to fetch
+ * - should return whatever data is available for the rest partitions.
+ * Case B: This broker is does not know of some partitions it tries to fetch
+ * - should return whatever data is available for the rest partitions.
+ * Case C: The fetch offset locates not on the last segment of the log
+ * - should return all the data on that segment.
+ * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
+ * - should return whatever data is available.
+ */
+
+class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey],
+ override val request: RequestChannel.Request,
+ override val delayMs: Long,
+ val fetch: FetchRequest,
+ private val partitionFetchOffsets: Map[TopicAndPartition, LogOffsetMetadata])
+ extends DelayedRequest(keys, request, delayMs) {
+
+ def isSatisfied(replicaManager: ReplicaManager) : Boolean = {
+ var accumulatedSize = 0
+ val fromFollower = fetch.isFromFollower
+ partitionFetchOffsets.foreach {
+ case (topicAndPartition, fetchOffset) =>
+ try {
+ if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
+ val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
+ val endOffset =
+ if (fromFollower)
+ replica.logEndOffset
+ else
+ replica.highWatermark
+
+ if (endOffset.offsetOnOlderSegment(fetchOffset)) {
+ // Case C, this can happen when the new follower replica fetching on a truncated leader
+ debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition))
+ return true
+ } else if (fetchOffset.offsetOnOlderSegment(endOffset)) {
+ // Case C, this can happen when the folloer replica is lagging too much
+ debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch))
+ return true
+ } else if (fetchOffset.precedes(endOffset)) {
+ accumulatedSize += endOffset.positionDiff(fetchOffset)
+ }
+ }
+ } catch {
+ case utpe: UnknownTopicOrPartitionException => // Case A
+ debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetch))
+ return true
+ case nle: NotLeaderForPartitionException => // Case B
+ debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetch))
+ return true
+ }
+ }
+
+ // Case D
+ accumulatedSize >= fetch.minBytes
+ }
+
+ def respond(replicaManager: ReplicaManager): FetchResponse = {
+ val topicData = replicaManager.readMessageSets(fetch)
+ FetchResponse(fetch.correlationId, topicData.mapValues(_.data))
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
new file mode 100644
index 0000000..9481508
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.api._
+import kafka.common.ErrorMapping
+import kafka.common.TopicAndPartition
+import kafka.utils.Logging
+import kafka.network.RequestChannel
+
+import scala.Some
+import scala.collection.immutable.Map
+import scala.collection.Seq
+
+/** A delayed produce request, which is satisfied (or more
+ * accurately, unblocked) -- if for every partition it produce to:
+ * Case A: This broker is not the leader: unblock - should return error.
+ * Case B: This broker is the leader:
+ * B.1 - If there was a localError (when writing to the local log): unblock - should return error
+ * B.2 - else, at least requiredAcks replicas should be caught up to this request.
+ */
+
+class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey],
+ override val request: RequestChannel.Request,
+ override val delayMs: Long,
+ val produce: ProducerRequest,
+ val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
+ val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
+ extends DelayedRequest(keys, request, delayMs) with Logging {
+
+ // first update the acks pending variable according to the error code
+ partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
+ if (delayedStatus.responseStatus.error == ErrorMapping.NoError) {
+ // Timeout error state will be cleared when required acks are received
+ delayedStatus.acksPending = true
+ delayedStatus.responseStatus.error = ErrorMapping.RequestTimedOutCode
+ } else {
+ delayedStatus.acksPending = false
+ }
+
+ trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
+ }
+
+ def respond(offsetManager: OffsetManager): RequestOrResponse = {
+ val responseStatus = partitionStatus.mapValues(status => status.responseStatus)
+
+ val errorCode = responseStatus.find { case (_, status) =>
+ status.error != ErrorMapping.NoError
+ }.map(_._2.error).getOrElse(ErrorMapping.NoError)
+
+ if (errorCode == ErrorMapping.NoError) {
+ offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
+ }
+
+ val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize))
+ .getOrElse(ProducerResponse(produce.correlationId, responseStatus))
+
+ response
+ }
+
+ def isSatisfied(replicaManager: ReplicaManager) = {
+ // check for each partition if it still has pending acks
+ partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) =>
+ trace("Checking producer request satisfaction for %s, acksPending = %b"
+ .format(topicAndPartition, fetchPartitionStatus.acksPending))
+ // skip those partitions that have already been satisfied
+ if (fetchPartitionStatus.acksPending) {
+ val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
+ val (hasEnough, errorCode) = partitionOpt match {
+ case Some(partition) =>
+ partition.checkEnoughReplicasReachOffset(
+ fetchPartitionStatus.requiredOffset,
+ produce.requiredAcks)
+ case None =>
+ (false, ErrorMapping.UnknownTopicOrPartitionCode)
+ }
+ if (errorCode != ErrorMapping.NoError) {
+ fetchPartitionStatus.acksPending = false
+ fetchPartitionStatus.responseStatus.error = errorCode
+ } else if (hasEnough) {
+ fetchPartitionStatus.acksPending = false
+ fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError
+ }
+ }
+ }
+
+ // unblocked if there are no partitions with pending acks
+ val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
+ satisfied
+ }
+}
+
+case class DelayedProduceResponseStatus(val requiredOffset: Long,
+ val responseStatus: ProducerResponseStatus) {
+ @volatile var acksPending = false
+
+ override def toString =
+ "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
+ acksPending, responseStatus.error, responseStatus.offset, requiredOffset)
+}
diff --git a/core/src/main/scala/kafka/server/DelayedRequestKey.scala b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
new file mode 100644
index 0000000..628ef59
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.TopicAndPartition
+
+/**
+ * Keys used for delayed request metrics recording
+ */
+trait DelayedRequestKey {
+ def keyLabel: String
+}
+
+object DelayedRequestKey {
+ val globalLabel = "All"
+}
+
+case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey {
+
+ def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
+
+ override def keyLabel = "%s-%d".format(topic, partition)
+}
diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala
new file mode 100644
index 0000000..26f278f
--- /dev/null
+++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.message.MessageSet
+
+case class FetchDataInfo(fetchOffset: LogOffsetMetadata, messageSet: MessageSet)
diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
new file mode 100644
index 0000000..ed13188
--- /dev/null
+++ b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.metrics.KafkaMetricsGroup
+import kafka.network.RequestChannel
+import kafka.api.FetchResponseSend
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * The purgatory holding delayed fetch requests
+ */
+class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel)
+ extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) {
+ this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
+
+ private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
+ private val metricPrefix = if (forFollower) "Follower" else "Consumer"
+
+ val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+ }
+
+ private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
+ private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
+
+ private def recordDelayedFetchExpired(forFollower: Boolean) {
+ val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
+ else aggregateNonFollowerFetchRequestMetrics
+
+ metrics.expiredRequestMeter.mark()
+ }
+
+ /**
+ * Check if a specified delayed fetch request is satisfied
+ */
+ def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager)
+
+ /**
+ * When a delayed fetch request expires just answer it with whatever data is present
+ */
+ def expire(delayedFetch: DelayedFetch) {
+ debug("Expiring fetch request %s.".format(delayedFetch.fetch))
+ val fromFollower = delayedFetch.fetch.isFromFollower
+ recordDelayedFetchExpired(fromFollower)
+ respond(delayedFetch)
+ }
+
+ // TODO: purgatory should not be responsible for sending back the responses
+ def respond(delayedFetch: DelayedFetch) {
+ val response = delayedFetch.respond(replicaManager)
+ requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response)))
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fd5f12e..bb94673 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -23,13 +23,10 @@ import kafka.log._
import kafka.message._
import kafka.network._
import kafka.admin.AdminUtils
-import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel.Response
import kafka.controller.KafkaController
-import kafka.utils.{Pool, SystemTime, Logging}
+import kafka.utils.{SystemTime, Logging}
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
import scala.collection._
import org.I0Itec.zkclient.ZkClient
@@ -45,11 +42,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val config: KafkaConfig,
val controller: KafkaController) extends Logging {
- private val producerRequestPurgatory =
- new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
- private val fetchRequestPurgatory =
- new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
- private val delayedRequestMetrics = new DelayedRequestMetrics
+ val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager, offsetManager, requestChannel)
+ val fetchRequestPurgatory = new FetchRequestPurgatory(replicaManager, requestChannel)
+ // TODO: the following line will be removed in 0.9
+ replicaManager.initWithRequestPurgatory(producerRequestPurgatory, fetchRequestPurgatory)
var metadataCache = new MetadataCache
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
@@ -127,22 +123,6 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
}
- /**
- * Check if a partitionData from a produce request can unblock any
- * DelayedFetch requests.
- */
- def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) {
- val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes)
- trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
-
- // send any newly unblocked responses
- for(fetchReq <- satisfied) {
- val topicData = readMessageSets(fetchReq.fetch)
- val response = FetchResponse(fetchReq.fetch.correlationId, topicData)
- requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
- }
- }
-
private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = {
val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map {
case (topicAndPartition, offset) =>
@@ -171,27 +151,21 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle a produce request or offset commit request (which is really a specialized producer request)
*/
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
-
- val (produceRequest, offsetCommitRequestOpt) = if (request.requestId == RequestKeys.OffsetCommitKey) {
- val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
- (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
- }
- else {
- (request.requestObj.asInstanceOf[ProducerRequest], None)
- }
+ val (produceRequest, offsetCommitRequestOpt) =
+ if (request.requestId == RequestKeys.OffsetCommitKey) {
+ val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+ (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
+ } else {
+ (request.requestObj.asInstanceOf[ProducerRequest], None)
+ }
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(produceRequest)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+
val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
val numPartitionsInError = localProduceResults.count(_.error.isDefined)
- produceRequest.data.foreach(partitionAndData =>
- maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes))
-
- val allPartitionHaveReplicationFactorOne =
- !produceRequest.data.keySet.exists(
- m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
if(produceRequest.requiredAcks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
// no response is expected by the producer the handler will send a close connection response to the socket server
@@ -214,7 +188,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} else if (produceRequest.requiredAcks == 1 ||
produceRequest.numPartitions <= 0 ||
- allPartitionHaveReplicationFactorOne ||
numPartitionsInError == produceRequest.numPartitions) {
if (firstErrorCode == ErrorMapping.NoError) {
@@ -229,46 +202,27 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = produceRequest.data.keys.map(
- topicAndPartition => new RequestKey(topicAndPartition)).toSeq
+ topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq
val statuses = localProduceResults.map(r =>
r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedRequest = new DelayedProduce(
producerRequestKeys,
request,
- statuses,
- produceRequest,
produceRequest.ackTimeoutMs.toLong,
+ produceRequest,
+ statuses,
offsetCommitRequestOpt)
- producerRequestPurgatory.watch(delayedRequest)
-
- /*
- * Replica fetch requests may have arrived (and potentially satisfied)
- * delayedProduce requests while they were being added to the purgatory.
- * Here, we explicitly check if any of them can be satisfied.
- */
- var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
- producerRequestKeys.foreach(key =>
- satisfiedProduceRequests ++=
- producerRequestPurgatory.update(key, key))
- debug(satisfiedProduceRequests.size +
- " producer requests unblocked during produce to local log.")
- satisfiedProduceRequests.foreach(_.respond())
-
- // we do not need the data anymore
- produceRequest.emptyData()
+ // add the produce request for watch if it's not satisfied, otherwise send the response back
+ val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
+ if (satisfiedByMe)
+ producerRequestPurgatory.respond(delayedRequest)
}
- }
-
- case class DelayedProduceResponseStatus(requiredOffset: Long,
- status: ProducerResponseStatus) {
- var acksPending = false
- override def toString =
- "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
- acksPending, status.error, status.offset, requiredOffset)
+ // we do not need the data anymore
+ produceRequest.emptyData()
}
-
+
case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) {
def this(key: TopicAndPartition, throwable: Throwable) =
this(key, -1L, -1L, Some(throwable))
@@ -288,13 +242,12 @@ class KafkaApis(val requestChannel: RequestChannel,
partitionAndData.map {case (topicAndPartition, messages) =>
try {
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
- val info =
- partitionOpt match {
- case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
- case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
- .format(topicAndPartition, brokerId))
-
- }
+ val info = partitionOpt match {
+ case Some(partition) =>
+ partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
+ case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
+ .format(topicAndPartition, brokerId))
+ }
val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1)
@@ -338,121 +291,58 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
- if(fetchRequest.isFromFollower) {
- maybeUpdatePartitionHw(fetchRequest)
- // after updating HW, some delayed produce requests may be unblocked
- var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
- fetchRequest.requestInfo.foreach {
- case (topicAndPartition, _) =>
- val key = new RequestKey(topicAndPartition)
- satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
- }
- debug("Replica %d fetch unblocked %d producer requests."
- .format(fetchRequest.replicaId, satisfiedProduceRequests.size))
- satisfiedProduceRequests.foreach(_.respond())
- }
-
- val dataRead = readMessageSets(fetchRequest)
- val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum
+ val dataRead = replicaManager.readMessageSets(fetchRequest)
+
+ // if the fetch request comes from the follower,
+ // update its corresponding log end offset
+ if(fetchRequest.isFromFollower)
+ recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))
+
+ // check if this fetch request can be satisfied right away
+ val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum
+ val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) =>
+ errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError))
+ // send the data immediately if 1) fetch request does not want to wait
+ // 2) fetch request does not require any data
+ // 3) has enough data to respond
+ // 4) some error happens while reading data
if(fetchRequest.maxWait <= 0 ||
+ fetchRequest.numPartitions <= 0 ||
bytesReadable >= fetchRequest.minBytes ||
- fetchRequest.numPartitions <= 0) {
+ errorReadingData) {
debug("Returning fetch response %s for fetch request with correlation id %d to client %s"
- .format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
- val response = new FetchResponse(fetchRequest.correlationId, dataRead)
+ .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
+ val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
} else {
debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId,
fetchRequest.clientId))
// create a list of (topic, partition) pairs to use as keys for this delayed request
- val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_))
- val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable)
- fetchRequestPurgatory.watch(delayedFetch)
+ val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_))
+ val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest,
+ dataRead.mapValues(_.offset))
+
+ // add the fetch request for watch if it's not satisfied, otherwise send the response back
+ val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch)
+ if (satisfiedByMe)
+ fetchRequestPurgatory.respond(delayedFetch)
}
}
- private def maybeUpdatePartitionHw(fetchRequest: FetchRequest) {
- debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest))
- fetchRequest.requestInfo.foreach(info => {
- val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset)
- replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
- })
- }
+ private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) {
+ debug("Record follower log end offsets: %s ".format(offsets))
+ offsets.foreach {
+ case (topicAndPartition, offset) =>
+ replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic,
+ topicAndPartition.partition, replicaId, offset)
- /**
- * Read from all the offset details given and return a map of
- * (topic, partition) -> PartitionData
- */
- private def readMessageSets(fetchRequest: FetchRequest) = {
- val isFetchFromFollower = fetchRequest.isFromFollower
- fetchRequest.requestInfo.map
- {
- case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
- val partitionData =
- try {
- val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
- BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes)
- if (!isFetchFromFollower) {
- new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
- } else {
- debug("Leader %d for partition [%s,%d] received fetch request from follower %d"
- .format(brokerId, topic, partition, fetchRequest.replicaId))
- new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
- }
- } catch {
- // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
- // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
- // for a partition it is the leader for
- case utpe: UnknownTopicOrPartitionException =>
- warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
- fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage))
- new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
- case nle: NotLeaderForPartitionException =>
- warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
- fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
- new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
- case t: Throwable =>
- BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
- BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
- error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s"
- .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage))
- new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
- }
- (TopicAndPartition(topic, partition), partitionData)
+ // for producer requests with ack > 1, we need to check
+ // if they can be unblocked after some follower's log end offsets have moved
+ replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition))
}
}
/**
- * Read from a single topic/partition at the given offset upto maxSize bytes
- */
- private def readMessageSet(topic: String,
- partition: Int,
- offset: Long,
- maxSize: Int,
- fromReplicaId: Int): (MessageSet, Long) = {
- // check if the current broker is the leader for the partitions
- val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)
- replicaManager.getReplicaOrException(topic, partition)
- else
- replicaManager.getLeaderReplicaIfLocal(topic, partition)
- trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
- val maxOffsetOpt =
- if (Request.isValidBrokerId(fromReplicaId))
- None
- else
- Some(localReplica.highWatermark)
- val messages = localReplica.log match {
- case Some(log) =>
- log.read(offset, maxSize, maxOffsetOpt)
- case None =>
- error("Leader for partition [%s,%d] on broker %d does not have a local log".format(topic, partition, brokerId))
- MessageSet.Empty
- }
- (messages, localReplica.highWatermark)
- }
-
- /**
* Service the offset request API
*/
def handleOffsetRequest(request: RequestChannel.Request) {
@@ -473,7 +363,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!offsetRequest.isFromOrdinaryClient) {
allOffsets
} else {
- val hw = localReplica.highWatermark
+ val hw = localReplica.highWatermark.messageOffset
if (allOffsets.exists(_ > hw))
hw +: allOffsets.dropWhile(_ > hw)
else
@@ -643,209 +533,5 @@ class KafkaApis(val requestChannel: RequestChannel,
producerRequestPurgatory.shutdown()
debug("Shut down complete.")
}
-
- private [kafka] trait MetricKey {
- def keyLabel: String
- }
- private [kafka] object MetricKey {
- val globalLabel = "All"
- }
-
- private [kafka] case class RequestKey(topic: String, partition: Int)
- extends MetricKey {
-
- def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
-
- def topicAndPartition = TopicAndPartition(topic, partition)
-
- override def keyLabel = "%s-%d".format(topic, partition)
- }
-
- /**
- * A delayed fetch request
- */
- class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long)
- extends DelayedRequest(keys, request, delayMs) {
- val bytesAccumulated = new AtomicLong(initialSize)
- }
-
- /**
- * A holding pen for fetch requests waiting to be satisfied
- */
- class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
- extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) {
- this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
-
- /**
- * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
- */
- def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = {
- val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
- accumulatedSize >= delayedFetch.fetch.minBytes
- }
-
- /**
- * When a request expires just answer it with whatever data is present
- */
- def expire(delayed: DelayedFetch) {
- debug("Expiring fetch request %s.".format(delayed.fetch))
- try {
- val topicData = readMessageSets(delayed.fetch)
- val response = FetchResponse(delayed.fetch.correlationId, topicData)
- val fromFollower = delayed.fetch.isFromFollower
- delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
- requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
- }
- catch {
- case e1: LeaderNotAvailableException =>
- debug("Leader changed before fetch request %s expired.".format(delayed.fetch))
- case e2: UnknownTopicOrPartitionException =>
- debug("Replica went offline before fetch request %s expired.".format(delayed.fetch))
- }
- }
- }
-
- class DelayedProduce(keys: Seq[RequestKey],
- request: RequestChannel.Request,
- val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus],
- produce: ProducerRequest,
- delayMs: Long,
- offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
- extends DelayedRequest(keys, request, delayMs) with Logging {
-
- // first update the acks pending variable according to the error code
- partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
- if (delayedStatus.status.error == ErrorMapping.NoError) {
- // Timeout error state will be cleared when requiredAcks are received
- delayedStatus.acksPending = true
- delayedStatus.status.error = ErrorMapping.RequestTimedOutCode
- } else {
- delayedStatus.acksPending = false
- }
-
- trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
- }
-
- def respond() {
- val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) =>
- topicAndPartition -> delayedStatus.status
- }
-
- val errorCode = responseStatus.find { case (_, status) =>
- status.error != ErrorMapping.NoError
- }.map(_._2.error).getOrElse(ErrorMapping.NoError)
-
- if (errorCode == ErrorMapping.NoError) {
- offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
- }
-
- val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, config.offsetMetadataMaxSize))
- .getOrElse(ProducerResponse(produce.correlationId, responseStatus))
-
- requestChannel.sendResponse(new RequestChannel.Response(
- request, new BoundedByteBufferSend(response)))
- }
-
- /**
- * Returns true if this delayed produce request is satisfied (or more
- * accurately, unblocked) -- this is the case if for every partition:
- * Case A: This broker is not the leader: unblock - should return error.
- * Case B: This broker is the leader:
- * B.1 - If there was a localError (when writing to the local log): unblock - should return error
- * B.2 - else, at least requiredAcks replicas should be caught up to this request.
- *
- * As partitions become acknowledged, we may be able to unblock
- * DelayedFetchRequests that are pending on those partitions.
- */
- def isSatisfied(followerFetchRequestKey: RequestKey) = {
- val topic = followerFetchRequestKey.topic
- val partitionId = followerFetchRequestKey.partition
- val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId))
- trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
- .format(topic, partitionId, fetchPartitionStatus.acksPending))
- if (fetchPartitionStatus.acksPending) {
- val partitionOpt = replicaManager.getPartition(topic, partitionId)
- val (hasEnough, errorCode) = partitionOpt match {
- case Some(partition) =>
- partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks)
- case None =>
- (false, ErrorMapping.UnknownTopicOrPartitionCode)
- }
- if (errorCode != ErrorMapping.NoError) {
- fetchPartitionStatus. acksPending = false
- fetchPartitionStatus.status.error = errorCode
- } else if (hasEnough) {
- fetchPartitionStatus.acksPending = false
- fetchPartitionStatus.status.error = ErrorMapping.NoError
- }
- if (!fetchPartitionStatus.acksPending) {
- val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition)
- maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes)
- }
- }
-
- // unblocked if there are no partitions with pending acks
- val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
- trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied))
- satisfied
- }
- }
-
- /**
- * A holding pen for produce requests waiting to be satisfied.
- */
- private [kafka] class ProducerRequestPurgatory(purgeInterval: Int)
- extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) {
- this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
-
- protected def checkSatisfied(followerFetchRequestKey: RequestKey,
- delayedProduce: DelayedProduce) =
- delayedProduce.isSatisfied(followerFetchRequestKey)
-
- /**
- * Handle an expired delayed request
- */
- protected def expire(delayedProduce: DelayedProduce) {
- for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
- delayedRequestMetrics.recordDelayedProducerKeyExpired(RequestKey(topicPartition.topic, topicPartition.partition))
-
- delayedProduce.respond()
- }
- }
-
- private class DelayedRequestMetrics {
- private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
- val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
- }
-
-
- private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
- private val metricPrefix = if (forFollower) "Follower" else "Consumer"
-
- val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
- }
-
- private val producerRequestMetricsForKey = {
- val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
- new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory))
- }
-
- private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
-
- private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
- private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
-
- def recordDelayedProducerKeyExpired(key: MetricKey) {
- val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
- List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
- }
-
- def recordDelayedFetchExpired(forFollower: Boolean) {
- val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
- else aggregateNonFollowerFetchRequestMetrics
-
- metrics.expiredRequestMeter.mark()
- }
- }
}
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
new file mode 100644
index 0000000..a868334
--- /dev/null
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.KafkaException
+
+object LogOffsetMetadata {
+ val UnknownOffsetMetadata = new LogOffsetMetadata(-1, 0, 0)
+ val UnknownSegBaseOffset = -1L
+ val UnknownFilePosition = -1
+
+ class OffsetOrdering extends Ordering[LogOffsetMetadata] {
+ override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = {
+ return x.offsetDiff(y).toInt
+ }
+ }
+
+}
+
+/*
+ * A log offset structure, including:
+ * 1. the message offset
+ * 2. the base message offset of the located segment
+ * 3. the physical position on the located segment
+ */
+case class LogOffsetMetadata(messageOffset: Long,
+ segmentBaseOffset: Long = LogOffsetMetadata.UnknownSegBaseOffset,
+ relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
+
+ // check if this offset is already on an older segment compared with the given offset
+ def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = {
+ if (messageOffsetOnly())
+ throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that))
+
+ this.segmentBaseOffset < that.segmentBaseOffset
+ }
+
+ // check if this offset is on the same segment with the given offset
+ def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = {
+ if (messageOffsetOnly())
+ throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that))
+
+ this.segmentBaseOffset == that.segmentBaseOffset
+ }
+
+ // check if this offset is before the given offset
+ def precedes(that: LogOffsetMetadata): Boolean = this.messageOffset < that.messageOffset
+
+ // compute the number of messages between this offset to the given offset
+ def offsetDiff(that: LogOffsetMetadata): Long = {
+ this.messageOffset - that.messageOffset
+ }
+
+ // compute the number of bytes between this offset to the given offset
+ // if they are on the same segment and this offset precedes the given offset
+ def positionDiff(that: LogOffsetMetadata): Int = {
+ if(!offsetOnSameSegment(that))
+ throw new KafkaException("%s cannot compare its segment position with %s since they are not on the same segment".format(this, that))
+ if(messageOffsetOnly())
+ throw new KafkaException("%s cannot compare its segment position with %s since it only has message offset info".format(this, that))
+
+ this.relativePositionInSegment - that.relativePositionInSegment
+ }
+
+ // decide if the offset metadata only contains message offset info
+ def messageOffsetOnly(): Boolean = {
+ segmentBaseOffset == LogOffsetMetadata.UnknownSegBaseOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition
+ }
+
+ override def toString = messageOffset.toString + " [" + segmentBaseOffset + " : " + relativePositionInSegment + "]"
+
+}
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 0e22897..43eb2a3 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -17,26 +17,29 @@
package kafka.server
+import org.apache.kafka.common.protocol.types.{Struct, Schema, Field}
+import org.apache.kafka.common.protocol.types.Type.STRING
+import org.apache.kafka.common.protocol.types.Type.INT32
+import org.apache.kafka.common.protocol.types.Type.INT64
+
import kafka.utils._
import kafka.common._
-import java.nio.ByteBuffer
-import java.util.Properties
import kafka.log.{FileMessageSet, LogConfig}
-import org.I0Itec.zkclient.ZkClient
-import scala.collection._
import kafka.message._
-import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
-import scala.Some
import kafka.common.TopicAndPartition
import kafka.tools.MessageFormatter
+
+import scala.Some
+import scala.collection._
import java.io.PrintStream
-import org.apache.kafka.common.protocol.types.{Struct, Schema, Field}
-import org.apache.kafka.common.protocol.types.Type.STRING
-import org.apache.kafka.common.protocol.types.Type.INT32
-import org.apache.kafka.common.protocol.types.Type.INT64
import java.util.concurrent.atomic.AtomicBoolean
+import java.nio.ByteBuffer
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.yammer.metrics.core.Gauge
+import org.I0Itec.zkclient.ZkClient
/**
@@ -271,7 +274,7 @@ class OffsetManager(val config: OffsetManagerConfig,
// loop breaks if leader changes at any time during the load, since getHighWatermark is -1
while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
buffer.clear()
- val messages = log.read(currOffset, config.loadBufferSize).asInstanceOf[FileMessageSet]
+ val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
messages.readInto(buffer, 0)
val messageSet = new ByteBufferMessageSet(buffer)
messageSet.foreach { msgAndOffset =>
@@ -312,7 +315,7 @@ class OffsetManager(val config: OffsetManagerConfig,
val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, partitionId)
val hw = partitionOpt.map { partition =>
- partition.leaderReplicaIfLocal().map(_.highWatermark).getOrElse(-1L)
+ partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
}.getOrElse(-1L)
hw
diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
new file mode 100644
index 0000000..d4a7d4a
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.Pool
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * The purgatory holding delayed producer requests
+ */
+class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel)
+ extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) {
+ this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
+
+ private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup {
+ val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+ }
+
+ private val producerRequestMetricsForKey = {
+ val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
+ new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory))
+ }
+
+ private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
+
+ private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) {
+ val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
+ List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
+ }
+
+ /**
+ * Check if a specified delayed fetch request is satisfied
+ */
+ def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager)
+
+ /**
+ * When a delayed produce request expires answer it with possible time out error codes
+ */
+ def expire(delayedProduce: DelayedProduce) {
+ debug("Expiring produce request %s.".format(delayedProduce.produce))
+ for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
+ recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition))
+ respond(delayedProduce)
+ }
+
+ // TODO: purgatory should not be responsible for sending back the responses
+ def respond(delayedProduce: DelayedProduce) {
+ val response = delayedProduce.respond(offsetManager)
+ requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response)))
+ }
+}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 75ae1e1..6879e73 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -47,16 +47,19 @@ class ReplicaFetcherThread(name:String,
val replica = replicaMgr.getReplica(topic, partitionId).get
val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
- if (fetchOffset != replica.logEndOffset)
- throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset))
+ if (fetchOffset != replica.logEndOffset.messageOffset)
+ throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
- .format(replica.brokerId, replica.logEndOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw))
+ .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw))
replica.log.get.append(messageSet, assignOffsets = false)
trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
- .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, topicAndPartition))
- val followerHighWatermark = replica.logEndOffset.min(partitionData.hw)
- replica.highWatermark = followerHighWatermark
- trace("Follower %d set replica highwatermark for partition [%s,%d] to %d"
+ .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicAndPartition))
+ val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.hw)
+ // for the follower replica, we do not need to keep
+ // its segment base offset the physical position,
+ // these values will be computed upon making the leader
+ replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
+ trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
.format(replica.brokerId, topic, partitionId, followerHighWatermark))
} catch {
case e: KafkaStorageException =>
@@ -82,7 +85,7 @@ class ReplicaFetcherThread(name:String,
* There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
*/
val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
- if (leaderEndOffset < replica.logEndOffset) {
+ if (leaderEndOffset < replica.logEndOffset.messageOffset) {
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
@@ -91,13 +94,13 @@ class ReplicaFetcherThread(name:String,
// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
" Current leader %d's latest offset %d is less than replica %d's latest offset %d"
- .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset))
+ .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset))
Runtime.getRuntime.halt(1)
}
replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset))
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d"
- .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderEndOffset))
+ .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
leaderEndOffset
} else {
/**
@@ -109,7 +112,7 @@ class ReplicaFetcherThread(name:String,
val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
- .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderStartOffset))
+ .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
leaderStartOffset
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 897783c..68758e3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -16,29 +16,39 @@
*/
package kafka.server
-import collection._
-import mutable.HashMap
-import kafka.cluster.{Broker, Partition, Replica}
+import kafka.api._
+import kafka.common._
import kafka.utils._
+import kafka.cluster.{Broker, Partition, Replica}
import kafka.log.LogManager
import kafka.metrics.KafkaMetricsGroup
-import kafka.common._
-import kafka.api.{UpdateMetadataRequest, StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
import kafka.controller.KafkaController
-import org.I0Itec.zkclient.ZkClient
-import com.yammer.metrics.core.Gauge
+import kafka.common.TopicAndPartition
+import kafka.message.MessageSet
+
import java.util.concurrent.atomic.AtomicBoolean
import java.io.{IOException, File}
import java.util.concurrent.TimeUnit
+import scala.Predef._
+import scala.collection._
+import scala.collection.mutable.HashMap
+import scala.collection.Map
+import scala.collection.Set
+import scala.Some
+
+import org.I0Itec.zkclient.ZkClient
+import com.yammer.metrics.core.Gauge
object ReplicaManager {
- val UnknownLogEndOffset = -1L
val HighWatermarkFilename = "replication-offset-checkpoint"
}
-class ReplicaManager(val config: KafkaConfig,
- time: Time,
- val zkClient: ZkClient,
+case class PartitionDataAndOffset(data: FetchResponsePartitionData, offset: LogOffsetMetadata)
+
+
+class ReplicaManager(val config: KafkaConfig,
+ time: Time,
+ val zkClient: ZkClient,
scheduler: Scheduler,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup {
@@ -54,6 +64,9 @@ class ReplicaManager(val config: KafkaConfig,
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
val stateChangeLogger = KafkaController.stateChangeLogger
+ var producerRequestPurgatory: ProducerRequestPurgatory = null
+ var fetchRequestPurgatory: FetchRequestPurgatory = null
+
newGauge(
"LeaderCount",
new Gauge[Int] {
@@ -87,17 +100,37 @@ class ReplicaManager(val config: KafkaConfig,
}
/**
- * This function is only used in two places: in Partition.updateISR() and KafkaApis.handleProducerRequest().
- * In the former case, the partition should have been created, in the latter case, return -1 will put the request into purgatory
+ * Initialize the replica manager with the request purgatory
+ *
+ * TODO: will be removed in 0.9 where we refactor server structure
*/
- def getReplicationFactorForPartition(topic: String, partitionId: Int) = {
- val partitionOpt = getPartition(topic, partitionId)
- partitionOpt match {
- case Some(partition) =>
- partition.replicationFactor
- case None =>
- -1
- }
+
+ def initWithRequestPurgatory(producerRequestPurgatory: ProducerRequestPurgatory, fetchRequestPurgatory: FetchRequestPurgatory) {
+ this.producerRequestPurgatory = producerRequestPurgatory
+ this.fetchRequestPurgatory = fetchRequestPurgatory
+ }
+
+ /**
+ * Unblock some delayed produce requests with the request key
+ */
+ def unblockDelayedProduceRequests(key: DelayedRequestKey) {
+ val satisfied = producerRequestPurgatory.update(key)
+ debug("Request key %s unblocked %d producer requests."
+ .format(key.keyLabel, satisfied.size))
+
+ // send any newly unblocked responses
+ satisfied.foreach(producerRequestPurgatory.respond(_))
+ }
+
+ /**
+ * Unblock some delayed fetch requests with the request key
+ */
+ def unblockDelayedFetchRequests(key: DelayedRequestKey) {
+ val satisfied = fetchRequestPurgatory.update(key)
+ debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size))
+
+ // send any newly unblocked responses
+ satisfied.foreach(fetchRequestPurgatory.respond(_))
}
def startup() {
@@ -155,10 +188,10 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = {
+ def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
var partition = allPartitions.get((topic, partitionId))
if (partition == null) {
- allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, replicationFactor, time, this))
+ allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this))
partition = allPartitions.get((topic, partitionId))
}
partition
@@ -203,6 +236,77 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ /**
+ * Read from all the offset details given and return a map of
+ * (topic, partition) -> PartitionData
+ */
+ def readMessageSets(fetchRequest: FetchRequest) = {
+ val isFetchFromFollower = fetchRequest.isFromFollower
+ fetchRequest.requestInfo.map
+ {
+ case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
+ val partitionDataAndOffsetInfo =
+ try {
+ val (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
+ BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes)
+ BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes)
+ if (isFetchFromFollower) {
+ debug("Partition [%s,%d] received fetch request from follower %d"
+ .format(topic, partition, fetchRequest.replicaId))
+ }
+ new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, fetchInfo.messageSet), fetchInfo.fetchOffset)
+ } catch {
+ // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
+ // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
+ // for a partition it is the leader for
+ case utpe: UnknownTopicOrPartitionException =>
+ warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
+ fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage))
+ new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata)
+ case nle: NotLeaderForPartitionException =>
+ warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
+ fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
+ new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata)
+ case t: Throwable =>
+ BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
+ BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
+ error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s"
+ .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage))
+ new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata)
+ }
+ (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo)
+ }
+ }
+
+ /**
+ * Read from a single topic/partition at the given offset upto maxSize bytes
+ */
+ private def readMessageSet(topic: String,
+ partition: Int,
+ offset: Long,
+ maxSize: Int,
+ fromReplicaId: Int): (FetchDataInfo, Long) = {
+ // check if the current broker is the leader for the partitions
+ val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)
+ getReplicaOrException(topic, partition)
+ else
+ getLeaderReplicaIfLocal(topic, partition)
+ trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
+ val maxOffsetOpt =
+ if (Request.isValidBrokerId(fromReplicaId))
+ None
+ else
+ Some(localReplica.highWatermark.messageOffset)
+ val fetchInfo = localReplica.log match {
+ case Some(log) =>
+ log.read(offset, maxSize, maxOffsetOpt)
+ case None =>
+ error("Leader for partition [%s,%d] does not have a local log".format(topic, partition))
+ FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty)
+ }
+ (fetchInfo, localReplica.highWatermark.messageOffset)
+ }
+
def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
replicaStateChangeLock synchronized {
if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
@@ -243,7 +347,7 @@ class ReplicaManager(val config: KafkaConfig,
// First check partition's leader epoch
val partitionState = new HashMap[Partition, PartitionStateInfo]()
leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) =>
- val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
+ val partition = getOrCreatePartition(topic, partitionId)
val partitionLeaderEpoch = partition.getLeaderEpoch()
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
@@ -403,7 +507,7 @@ class ReplicaManager(val config: KafkaConfig,
.format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
}
- logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap)
+ logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " +
@@ -421,7 +525,9 @@ class ReplicaManager(val config: KafkaConfig,
else {
// we do not need to check if the leader exists again since this has been done at the beginning of this process
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
- new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap
+ new TopicAndPartition(partition) -> BrokerAndInitialOffset(
+ leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
+ partition.getReplica().get.logEndOffset.messageOffset)).toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
partitionsToMakeFollower.foreach { partition =>
@@ -451,12 +557,23 @@ class ReplicaManager(val config: KafkaConfig,
allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
}
- def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
- val partitionOpt = getPartition(topic, partitionId)
- if(partitionOpt.isDefined) {
- partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
- } else {
- warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId))
+ def updateReplicaLEOAndPartitionHW(topic: String, partitionId: Int, replicaId: Int, offset: LogOffsetMetadata) = {
+ getPartition(topic, partitionId) match {
+ case Some(partition) =>
+ partition.getReplica(replicaId) match {
+ case Some(replica) =>
+ replica.logEndOffset = offset
+ // check if we need to update HW and expand Isr
+ partition.updateLeaderHWAndMaybeExpandIsr(replicaId)
+ debug("Recorded follower %d position %d for partition [%s,%d].".format(replicaId, offset.messageOffset, topic, partitionId))
+ case None =>
+ throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" +
+ " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId,
+ offset.messageOffset, partition.assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId))
+
+ }
+ case None =>
+ warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId))
}
}
@@ -470,7 +587,7 @@ class ReplicaManager(val config: KafkaConfig,
val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
for((dir, reps) <- replicasByDir) {
- val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap
+ val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap
try {
highWatermarkCheckpoints(dir).write(hwms)
} catch {
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 3d0ff1e..ce06d2c 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -17,13 +17,15 @@
package kafka.server
-import scala.collection._
-import java.util.concurrent._
-import java.util.concurrent.atomic._
import kafka.network._
import kafka.utils._
import kafka.metrics.KafkaMetricsGroup
+
import java.util
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import scala.collection._
+
import com.yammer.metrics.core.Gauge
@@ -45,8 +47,10 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
*
* For us the key is generally a (topic, partition) pair.
* By calling
- * watch(delayedRequest)
- * we will add triggers for each of the given keys. It is up to the user to then call
+ * val isSatisfiedByMe = checkAndMaybeWatch(delayedRequest)
+ * we will check if a request is satisfied already, and if not add the request for watch on all its keys.
+ *
+ * It is up to the user to then call
* val satisfied = update(key, request)
* when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this
* new request.
@@ -61,18 +65,23 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
* this function handles delayed requests that have hit their time limit without being satisfied.
*
*/
-abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 1000)
+abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000)
extends Logging with KafkaMetricsGroup {
/* a list of requests watching each key */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
- private val requestCounter = new AtomicInteger(0)
+ /* the number of requests being watched, duplicates added on different watchers are also counted */
+ private val watched = new AtomicInteger(0)
+
+ /* background thread expiring requests that have been waiting too long */
+ private val expiredRequestReaper = new ExpiredRequestReaper
+ private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)
newGauge(
"PurgatorySize",
new Gauge[Int] {
- def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
+ def value = watched.get() + expiredRequestReaper.numRequests
}
)
@@ -83,41 +92,50 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
}
)
- /* background thread expiring requests that have been waiting too long */
- private val expiredRequestReaper = new ExpiredRequestReaper
- private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)
expirationThread.start()
/**
- * Add a new delayed request watching the contained keys
+ * Try to add the request for watch on all keys. Return true iff the request is
+ * satisfied and the satisfaction is done by the caller.
+ *
+ * Requests can be watched on only a few of the keys if it is found satisfied when
+ * trying to add it to each one of the keys. In this case the request is still treated as satisfied
+ * and hence no longer watched. Those already added elements will be later purged by the expire reaper.
*/
- def watch(delayedRequest: T) {
- requestCounter.getAndIncrement()
-
+ def checkAndMaybeWatch(delayedRequest: T): Boolean = {
for(key <- delayedRequest.keys) {
- var lst = watchersFor(key)
- lst.add(delayedRequest)
+ val lst = watchersFor(key)
+ if(!lst.checkAndMaybeAdd(delayedRequest)) {
+ if(delayedRequest.satisfied.compareAndSet(false, true))
+ return true
+ else
+ return false
+ }
}
+
+ // if it is indeed watched, add to the expire queue also
expiredRequestReaper.enqueue(delayedRequest)
+
+ false
}
/**
* Update any watchers and return a list of newly satisfied requests.
*/
- def update(key: Any, request: R): Seq[T] = {
+ def update(key: Any): Seq[T] = {
val w = watchersForKey.get(key)
if(w == null)
Seq.empty
else
- w.collectSatisfiedRequests(request)
+ w.collectSatisfiedRequests()
}
private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key)
/**
- * Check if this request satisfied this delayed request
+ * Check if this delayed request is already satisfied
*/
- protected def checkSatisfied(request: R, delayed: T): Boolean
+ protected def checkSatisfied(request: T): Boolean
/**
* Handle an expired delayed request
@@ -125,7 +143,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
protected def expire(delayed: T)
/**
- * Shutdown the expirey thread
+ * Shutdown the expire reaper thread
*/
def shutdown() {
expiredRequestReaper.shutdown()
@@ -136,17 +154,26 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
* bookkeeping logic.
*/
private class Watchers {
+ private val requests = new util.ArrayList[T]
- private val requests = new util.LinkedList[T]
-
- def numRequests = requests.size
-
- def add(t: T) {
+ // potentially add the element to watch if it is not satisfied yet
+ def checkAndMaybeAdd(t: T): Boolean = {
synchronized {
+ // if it is already satisfied, do not add to the watch list
+ if (t.satisfied.get)
+ return false
+ // synchronize on the delayed request to avoid any race condition
+ // with expire and update threads on client-side.
+ if(t synchronized checkSatisfied(t)) {
+ return false
+ }
requests.add(t)
+ watched.getAndIncrement()
+ return true
}
}
+ // traverse the list and purge satisfied elements
def purgeSatisfied(): Int = {
synchronized {
val iter = requests.iterator()
@@ -155,6 +182,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
val curr = iter.next
if(curr.satisfied.get()) {
iter.remove()
+ watched.getAndDecrement()
purged += 1
}
}
@@ -162,7 +190,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
}
}
- def collectSatisfiedRequests(request: R): Seq[T] = {
+ // traverse the list and try to satisfy watched elements
+ def collectSatisfiedRequests(): Seq[T] = {
val response = new mutable.ArrayBuffer[T]
synchronized {
val iter = requests.iterator()
@@ -174,9 +203,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
} else {
// synchronize on curr to avoid any race condition with expire
// on client-side.
- val satisfied = curr synchronized checkSatisfied(request, curr)
+ val satisfied = curr synchronized checkSatisfied(curr)
if(satisfied) {
iter.remove()
+ watched.getAndDecrement()
val updated = curr.satisfied.compareAndSet(false, true)
if(updated == true) {
response += curr
@@ -215,13 +245,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
expire(curr)
}
}
- if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge
+ if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge
debug("Beginning purgatory purge")
- requestCounter.set(0)
val purged = purgeSatisfied()
debug("Purged %d requests from delay queue.".format(purged))
val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
- debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers))
+ debug("Purged %d requests from watch lists.".format(numPurgedFromWatchers))
}
} catch {
case e: Exception =>
@@ -266,10 +295,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
}
/**
- * Delete all expired events from the delay queue
+ * Delete all satisfied events from the delay queue and the watcher lists
*/
private def purgeSatisfied(): Int = {
var purged = 0
+
+ // purge the delayed queue
val iter = delayed.iterator()
while(iter.hasNext) {
val curr = iter.next()
@@ -278,6 +309,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
purged += 1
}
}
+
purged
}
}
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
index 5f8f6bc..67196f3 100644
--- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
@@ -17,15 +17,17 @@
package kafka.tools
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
+
+import kafka.consumer._
+
import java.util.Properties
import java.util.Arrays
-import kafka.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
object TestEndToEndLatency {
def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages")
+ if (args.length != 6) {
+ System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks")
System.exit(1)
}
@@ -33,31 +35,38 @@ object TestEndToEndLatency {
val zkConnect = args(1)
val topic = args(2)
val numMessages = args(3).toInt
+ val consumerFetchMaxWait = args(4).toInt
+ val producerAcks = args(5).toInt
val consumerProps = new Properties()
consumerProps.put("group.id", topic)
consumerProps.put("auto.commit.enable", "false")
consumerProps.put("auto.offset.reset", "largest")
consumerProps.put("zookeeper.connect", zkConnect)
- consumerProps.put("fetch.wait.max.ms", "1")
+ consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString)
consumerProps.put("socket.timeout.ms", 1201000.toString)
val config = new ConsumerConfig(consumerProps)
val connector = Consumer.create(config)
- var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
+ val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
val iter = stream.iterator
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
+ producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
val producer = new KafkaProducer(producerProps)
+ // make sure the consumer fetcher has started before sending data since otherwise
+ // the consumption from the tail will skip the first message and hence be blocked
+ Thread.sleep(5000)
+
val message = "hello there beautiful".getBytes
var totalTime = 0.0
val latencies = new Array[Long](numMessages)
for (i <- 0 until numMessages) {
- var begin = System.nanoTime
+ val begin = System.nanoTime
producer.send(new ProducerRecord(topic, message))
val received = iter.next
val elapsed = System.nanoTime - begin
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 8fcd068..e19b8b2 100644
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -91,7 +91,7 @@ object StressTestLog {
@volatile var offset = 0
override def work() {
try {
- log.read(offset, 1024, Some(offset+1)) match {
+ log.read(offset, 1024, Some(offset+1)).messageSet match {
case read: FileMessageSet if read.sizeInBytes > 0 => {
val first = read.head
require(first.offset == offset, "We should either read nothing or the message we asked for.")
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 9f04bd3..a5386a0 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -74,7 +74,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val replica = servers.head.replicaManager.getReplica(topic, 0).get
assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
- replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark)
+ replica.logEndOffset.messageOffset > 0 && replica.logEndOffset.equals(replica.highWatermark))
val request = new FetchRequestBuilder()
.clientId("test-client")
@@ -248,13 +248,13 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
"Published messages should be in the log")
val replicaId = servers.head.config.brokerId
- TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark == 2 },
+ TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark.messageOffset == 2 },
"High watermark should equal to log end offset")
- TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark == 2 },
+ TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark.messageOffset == 2 },
"High watermark should equal to log end offset")
- TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark == 2 },
+ TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark.messageOffset == 2 },
"High watermark should equal to log end offset")
- TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark == 2 },
+ TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark.messageOffset == 2 },
"High watermark should equal to log end offset")
// test if the consumer received the messages in the correct order when producer has enabled request pipelining
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 7d4c70c..59bd8a9 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -94,7 +94,7 @@ class LogManagerTest extends JUnit3Suite {
assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
time.sleep(log.config.fileDeleteDelayMs + 1)
assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
- assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
+ assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes)
try {
log.read(0, 1024)
@@ -137,7 +137,7 @@ class LogManagerTest extends JUnit3Suite {
assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
time.sleep(log.config.fileDeleteDelayMs + 1)
assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
- assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
+ assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes)
try {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 6b76037..7b97e6a 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -78,7 +78,7 @@ class LogSegmentTest extends JUnit3Suite {
val seg = createSegment(40)
val ms = messages(50, "hello", "there", "little", "bee")
seg.append(50, ms)
- val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None)
+ val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).messageSet
assertEquals(ms.toList, read.toList)
}
@@ -94,7 +94,7 @@ class LogSegmentTest extends JUnit3Suite {
seg.append(baseOffset, ms)
def validate(offset: Long) =
assertEquals(ms.filter(_.offset == offset).toList,
- seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).toList)
+ seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).messageSet.toList)
validate(50)
validate(51)
validate(52)
@@ -109,7 +109,7 @@ class LogSegmentTest extends JUnit3Suite {
val ms = messages(50, "hello", "there")
seg.append(50, ms)
val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
- assertNull("Read beyond the last offset in the segment should give null", null)
+ assertNull("Read beyond the last offset in the segment should give null", read)
}
/**
@@ -124,7 +124,7 @@ class LogSegmentTest extends JUnit3Suite {
val ms2 = messages(60, "alpha", "beta")
seg.append(60, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
- assertEquals(ms2.toList, read.toList)
+ assertEquals(ms2.toList, read.messageSet.toList)
}
/**
@@ -142,12 +142,12 @@ class LogSegmentTest extends JUnit3Suite {
seg.append(offset+1, ms2)
// check that we can read back both messages
val read = seg.read(offset, None, 10000)
- assertEquals(List(ms1.head, ms2.head), read.toList)
+ assertEquals(List(ms1.head, ms2.head), read.messageSet.toList)
// now truncate off the last message
seg.truncateTo(offset + 1)
val read2 = seg.read(offset, None, 10000)
- assertEquals(1, read2.size)
- assertEquals(ms1.head, read2.head)
+ assertEquals(1, read2.messageSet.size)
+ assertEquals(ms1.head, read2.messageSet.head)
offset += 1
}
}
@@ -204,7 +204,7 @@ class LogSegmentTest extends JUnit3Suite {
TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
seg.recover(64*1024)
for(i <- 0 until 100)
- assertEquals(i, seg.read(i, Some(i+1), 1024).head.offset)
+ assertEquals(i, seg.read(i, Some(i+1), 1024).messageSet.head.offset)
}
/**
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1da1393..577d102 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -131,11 +131,11 @@ class LogTest extends JUnitSuite {
for(i <- 0 until messages.length)
log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
for(i <- 0 until messages.length) {
- val read = log.read(i, 100, Some(i+1)).head
+ val read = log.read(i, 100, Some(i+1)).messageSet.head
assertEquals("Offset read should match order appended.", i, read.offset)
assertEquals("Message should match appended.", messages(i), read.message)
}
- assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size)
+ assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size)
}
/**
@@ -153,7 +153,7 @@ class LogTest extends JUnitSuite {
log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false)
for(i <- 50 until messageIds.max) {
val idx = messageIds.indexWhere(_ >= i)
- val read = log.read(i, 100, None).head
+ val read = log.read(i, 100, None).messageSet.head
assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
assertEquals("Message should match appended.", messages(idx), read.message)
}
@@ -176,7 +176,7 @@ class LogTest extends JUnitSuite {
// now manually truncate off all but one message from the first segment to create a gap in the messages
log.logSegments.head.truncateTo(1)
- assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset)
+ assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset)
}
/**
@@ -188,7 +188,7 @@ class LogTest extends JUnitSuite {
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time)
- assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes)
+ assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).messageSet.sizeInBytes)
try {
log.read(0, 1024)
fail("Expected exception on invalid read.")
@@ -219,12 +219,12 @@ class LogTest extends JUnitSuite {
/* do successive reads to ensure all our messages are there */
var offset = 0L
for(i <- 0 until numMessages) {
- val messages = log.read(offset, 1024*1024)
+ val messages = log.read(offset, 1024*1024).messageSet
assertEquals("Offsets not equal", offset, messages.head.offset)
assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message, messages.head.message)
offset = messages.head.offset + 1
}
- val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1))
+ val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet
assertEquals("Should be no more messages", 0, lastRead.size)
// check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
@@ -245,7 +245,7 @@ class LogTest extends JUnitSuite {
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
- def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message)
+ def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message)
/* we should always get the first message in the compressed set when reading any offset in the set */
assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
@@ -363,7 +363,7 @@ class LogTest extends JUnitSuite {
log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages)
- assertEquals(i, log.read(i, 100, None).head.offset)
+ assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
log.close()
}
@@ -575,15 +575,15 @@ class LogTest extends JUnitSuite {
@Test
def testAppendMessageWithNullPayload() {
- var log = new Log(logDir,
+ val log = new Log(logDir,
LogConfig(),
recoveryPoint = 0L,
time.scheduler,
time)
log.append(new ByteBufferMessageSet(new Message(bytes = null)))
- val ms = log.read(0, 4096, None)
- assertEquals(0, ms.head.offset)
- assertTrue("Message payload should be null.", ms.head.message.isNull)
+ val messageSet = log.read(0, 4096, None).messageSet
+ assertEquals(0, messageSet.head.offset)
+ assertTrue("Message payload should be null.", messageSet.head.message.isNull)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index 6db245c..dd8847f 100644
--- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -31,7 +31,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
def createMessageSet(messages: Seq[Message]): MessageSet
@Test
- def testWrittenEqualsRead {
+ def testWrittenEqualsRead() {
val messageSet = createMessageSet(messages)
checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
}
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index e532c28..03a424d 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -58,7 +58,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
replicaManager.checkpointHighWatermarks()
var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
assertEquals(0L, fooPartition0Hw)
- val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1)
+ val partition0 = replicaManager.getOrCreatePartition(topic, 0)
// create leader and follower replicas
val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig())
val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
@@ -67,18 +67,12 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
partition0.addReplicaIfNotExists(followerReplicaPartition0)
replicaManager.checkpointHighWatermarks()
fooPartition0Hw = hwmFor(replicaManager, topic, 0)
- assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
- try {
- followerReplicaPartition0.highWatermark
- fail("Should fail with KafkaException")
- }catch {
- case e: KafkaException => // this is ok
- }
- // set the highwatermark for local replica
- partition0.getReplica().get.highWatermark = 5L
+ assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
+ // set the high watermark for local replica
+ partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L)
replicaManager.checkpointHighWatermarks()
fooPartition0Hw = hwmFor(replicaManager, topic, 0)
- assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
+ assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
EasyMock.verify(zkClient)
}
@@ -97,7 +91,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
replicaManager.checkpointHighWatermarks()
var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
assertEquals(0L, topic1Partition0Hw)
- val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1)
+ val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
// create leader log
val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig())
// create a local replica for topic1
@@ -105,15 +99,15 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
replicaManager.checkpointHighWatermarks()
topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
- assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw)
- // set the highwatermark for local replica
- topic1Partition0.getReplica().get.highWatermark = 5L
+ assertEquals(leaderReplicaTopic1Partition0.highWatermark.messageOffset, topic1Partition0Hw)
+ // set the high watermark for local replica
+ topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L)
replicaManager.checkpointHighWatermarks()
topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
- assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark)
+ assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark.messageOffset)
assertEquals(5L, topic1Partition0Hw)
// add another partition and set highwatermark
- val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1)
+ val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
// create leader log
val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig())
// create a local replica for topic2
@@ -121,13 +115,13 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
replicaManager.checkpointHighWatermarks()
var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
- assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw)
+ assertEquals(leaderReplicaTopic2Partition0.highWatermark.messageOffset, topic2Partition0Hw)
// set the highwatermark for local replica
- topic2Partition0.getReplica().get.highWatermark = 15L
- assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark)
+ topic2Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(15L)
+ assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark.messageOffset)
// change the highwatermark for topic1
- topic1Partition0.getReplica().get.highWatermark = 10L
- assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark)
+ topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(10L)
+ assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark.messageOffset)
replicaManager.checkpointHighWatermarks()
// verify checkpointed hw for topic 2
topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 2cd3a3f..cd302aa 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -46,7 +46,7 @@ class IsrExpirationTest extends JUnit3Suite {
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
// let the follower catch up to 10
- (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10)
+ (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(10L))
var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages)
assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
@@ -69,7 +69,7 @@ class IsrExpirationTest extends JUnit3Suite {
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
// set remote replicas leo to something low, like 4
- (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 4L)
+ (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(4L))
// now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than
// replicaMaxLagBytes, the follower is out of sync.
@@ -83,7 +83,7 @@ class IsrExpirationTest extends JUnit3Suite {
localLog: Log): Partition = {
val leaderId=config.brokerId
val replicaManager = new ReplicaManager(config, time, null, null, null, new AtomicBoolean(false))
- val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1)
+ val partition = replicaManager.getOrCreatePartition(topic, partitionId)
val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
@@ -97,7 +97,7 @@ class IsrExpirationTest extends JUnit3Suite {
private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = {
val log1 = EasyMock.createMock(classOf[kafka.log.Log])
- EasyMock.expect(log1.logEndOffset).andReturn(logEndOffset).times(expectedCalls)
+ EasyMock.expect(log1.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(logEndOffset)).times(expectedCalls)
EasyMock.replay(log1)
log1
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 0ec120a..d5d351c 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -85,7 +85,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// give some time for the follower 1 to record leader HW
TestUtils.waitUntilTrue(() =>
- server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages,
+ server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == numMessages,
"Failed to update high watermark for follower after timeout")
servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
@@ -134,7 +134,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// give some time for follower 1 to record leader HW of 60
TestUtils.waitUntilTrue(() =>
- server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+ server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
"Failed to update high watermark for follower after timeout")
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
@@ -147,7 +147,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
val hw = 20L
// give some time for follower 1 to record leader HW of 600
TestUtils.waitUntilTrue(() =>
- server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+ server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
"Failed to update high watermark for follower after timeout")
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
@@ -165,7 +165,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// allow some time for the follower to get the leader HW
TestUtils.waitUntilTrue(() =>
- server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+ server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
"Failed to update high watermark for follower after timeout")
// kill the server hosting the preferred replica
server1.shutdown()
@@ -191,7 +191,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// allow some time for the follower to get the leader HW
TestUtils.waitUntilTrue(() =>
- server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+ server1.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
"Failed to update high watermark for follower after timeout")
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9abf219..a9c4ddc 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -39,7 +39,7 @@ class ReplicaManagerTest extends JUnit3Suite {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
- val partition = rm.getOrCreatePartition(topic, 1, 1)
+ val partition = rm.getOrCreatePartition(topic, 1)
partition.getOrCreateReplica(1)
rm.checkpointHighWatermarks()
}
@@ -53,7 +53,7 @@ class ReplicaManagerTest extends JUnit3Suite {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
- val partition = rm.getOrCreatePartition(topic, 1, 1)
+ val partition = rm.getOrCreatePartition(topic, 1)
partition.getOrCreateReplica(1)
rm.checkpointHighWatermarks()
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
index 4f61f84..168712d 100644
--- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
@@ -46,17 +46,17 @@ class RequestPurgatoryTest extends JUnit3Suite {
def testRequestSatisfaction() {
val r1 = new DelayedRequest(Array("test1"), null, 100000L)
val r2 = new DelayedRequest(Array("test2"), null, 100000L)
- assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1", producerRequest1).size)
- purgatory.watch(r1)
- assertEquals("Still nothing satisfied", 0, purgatory.update("test1", producerRequest1).size)
- purgatory.watch(r2)
- assertEquals("Still nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
+ assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size)
+ assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1))
+ assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size)
+ assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2))
+ assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size)
purgatory.satisfied += r1
- assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1", producerRequest1))
- assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size)
+ assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1"))
+ assertEquals("Nothing satisfied", 0, purgatory.update("test1").size)
purgatory.satisfied += r2
- assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2))
- assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
+ assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2"))
+ assertEquals("Nothing satisfied", 0, purgatory.update("test2").size)
}
@Test
@@ -65,8 +65,8 @@ class RequestPurgatoryTest extends JUnit3Suite {
val r1 = new DelayedRequest(Array("test1"), null, expiration)
val r2 = new DelayedRequest(Array("test1"), null, 200000L)
val start = System.currentTimeMillis
- purgatory.watch(r1)
- purgatory.watch(r2)
+ assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1))
+ assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2))
purgatory.awaitExpiration(r1)
val elapsed = System.currentTimeMillis - start
assertTrue("r1 expired", purgatory.expired.contains(r1))
@@ -74,7 +74,7 @@ class RequestPurgatoryTest extends JUnit3Suite {
assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration)
}
- class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
+ class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] {
val satisfied = mutable.Set[DelayedRequest]()
val expired = mutable.Set[DelayedRequest]()
def awaitExpiration(delayed: DelayedRequest) = {
@@ -82,7 +82,7 @@ class RequestPurgatoryTest extends JUnit3Suite {
delayed.wait()
}
}
- def checkSatisfied(request: ProducerRequest, delayed: DelayedRequest): Boolean = satisfied.contains(delayed)
+ def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed)
def expire(delayed: DelayedRequest) {
expired += delayed
delayed synchronized {
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index b1c4ce9..09ed8f5 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -16,17 +16,19 @@
*/
package kafka.server
+import kafka.api._
import kafka.cluster.{Partition, Replica}
+import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.log.Log
import kafka.message.{ByteBufferMessageSet, Message}
import kafka.network.RequestChannel
import kafka.utils.{ZkUtils, Time, TestUtils, MockTime}
+
+import scala.Some
+
import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
-import kafka.api._
-import scala.Some
-import kafka.common.TopicAndPartition
class SimpleFetchTest extends JUnit3Suite {
@@ -45,13 +47,13 @@ class SimpleFetchTest extends JUnit3Suite {
* with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync
* but is still in ISR (hasn't yet expired from ISR).
*
- * When a normal consumer fetches data, it only should only see data upto the HW of the leader,
+ * When a normal consumer fetches data, it should only see data up to the HW of the leader,
* in this case up an offset of "5".
*/
def testNonReplicaSeesHwWhenFetching() {
/* setup */
val time = new MockTime
- val leo = 20
+ val leo = 20L
val hw = 5
val fetchSize = 100
val messages = new Message("test-message".getBytes())
@@ -64,7 +66,11 @@ class SimpleFetchTest extends JUnit3Suite {
val log = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
EasyMock.expect(log)
- EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages))
+ EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(
+ new FetchDataInfo(
+ new LogOffsetMetadata(0L, 0L, leo.toInt),
+ new ByteBufferMessageSet(messages)
+ )).anyTimes()
EasyMock.replay(log)
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
@@ -76,14 +82,26 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.expect(replicaManager.logManager).andReturn(logManager)
EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
+ EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
+ val fetchInfo = log.read(0, fetchSize, Some(hw))
+ val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
+ Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
+ }).anyTimes()
EasyMock.replay(replicaManager)
val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager)
- partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L
+ partition.getReplica(configs(1).brokerId).get.logEndOffset = new LogOffsetMetadata(leo - 5L, 0L, leo.toInt - 5)
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
+ EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject()))
+ EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
+ val fetchInfo = log.read(0, fetchSize, Some(hw))
+ val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
+ Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
+ }).anyTimes()
+
EasyMock.replay(replicaManager)
val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager])
@@ -138,7 +156,11 @@ class SimpleFetchTest extends JUnit3Suite {
val log = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
- EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn(new ByteBufferMessageSet(messages))
+ EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn(
+ new FetchDataInfo(
+ new LogOffsetMetadata(followerLEO, 0L, followerLEO),
+ new ByteBufferMessageSet(messages)
+ )).anyTimes()
EasyMock.replay(log)
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
@@ -150,16 +172,28 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.expect(replicaManager.logManager).andReturn(logManager)
EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
+ EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
+ val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None)
+ val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
+ Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
+ }).anyTimes()
EasyMock.replay(replicaManager)
val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager)
- partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long]
+ partition.getReplica(followerReplicaId).get.logEndOffset = new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO)
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
- EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId, followerLEO))
+ EasyMock.expect(replicaManager.updateReplicaLEOAndPartitionHW(topic, partitionId, followerReplicaId, new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO)))
EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId))
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
+ EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject()))
+ EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
+ val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None)
+ val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
+ Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
+ }).anyTimes()
+ EasyMock.expect(replicaManager.unblockDelayedProduceRequests(EasyMock.anyObject())).anyTimes()
EasyMock.replay(replicaManager)
val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager])
@@ -195,7 +229,7 @@ class SimpleFetchTest extends JUnit3Suite {
private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = {
- val partition = new Partition(topic, partitionId, 2, time, replicaManager)
+ val partition = new Partition(topic, partitionId, time, replicaManager)
val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
@@ -204,7 +238,7 @@ class SimpleFetchTest extends JUnit3Suite {
partition.inSyncReplicas = allReplicas.toSet
// set the leader and its hw and the hw update time
partition.leaderReplicaIdOpt = Some(leaderId)
- leaderReplica.highWatermark = leaderHW
+ leaderReplica.highWatermark = new LogOffsetMetadata(leaderHW)
partition
}
diff --git a/kafka-patch-review.py b/kafka-patch-review.py
index 89afc84..dc45549 100644
--- a/kafka-patch-review.py
+++ b/kafka-patch-review.py
@@ -105,13 +105,11 @@ def main():
print 'ERROR: Your reviewboard was not created/updated. Please run the script with the --debug option to troubleshoot the problem'
p.close()
sys.exit(1)
- if p.close() != None:
- print 'ERROR: reviewboard update failed. Exiting.'
- sys.exit(1)
+ p.close()
if opt.debug:
print 'rb url=',rb_url
- git_command="git format-patch " + opt.branch + " --stdout > " + patch_file
+ git_command="git diff " + opt.branch + " > " + patch_file
if opt.debug:
print git_command
p=os.popen(git_command)