From 2931a4400a49ae7152dff788acc401853cbf7edb Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Sat, 28 Feb 2015 23:58:18 -0800 Subject: [PATCH] remove unnecessary requiredAcks parameter and clean up few comments --- core/src/main/scala/kafka/cluster/Partition.scala | 20 +++++++++++--------- .../src/main/scala/kafka/server/DelayedProduce.scala | 4 +--- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c4bf48a..a4c4c52 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -288,7 +288,13 @@ class Partition(val topic: String, } } - def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { + /* + * Note that this method will only be called if requiredAcks = -1 + * and we are waiting for all replicas in ISR to be fully caught up to + * the (local) leader's offset corresponding to this produce request + * before we acknowledge the produce request. + */ + def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = { leaderReplicaIfLocal() match { case Some(leaderReplica) => // keep the current immutable replica list reference @@ -301,15 +307,11 @@ class Partition(val topic: String, }) val minIsr = leaderReplica.log.get.config.minInSyncReplicas - trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) + trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks , topic, partitionId)) - if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) { + if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) { /* - * requiredAcks < 0 means acknowledge after all replicas in ISR - * are fully caught up to the (local) leader's offset - * corresponding to this produce request. - * - * minIsr means that the topic is configured not to accept messages + * The topic may be configured not to accept messages * if there are not enough replicas in ISR * in this scenario the request was already appended locally and * then added to the purgatory before the ISR was shrunk @@ -409,7 +411,7 @@ class Partition(val topic: String, // Avoid writing to leader if there are not enough insync replicas to make it safe if (inSyncSize < minIsr && requiredAcks == -1) { throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]" - .format(topic,partitionId,minIsr,inSyncSize)) + .format(topic ,partitionId ,inSyncSize ,minIsr)) } val info = log.append(messages, assignOffsets = true) diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 4d763bf..05078b2 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -89,9 +89,7 @@ class DelayedProduce(delayMs: Long, val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val (hasEnough, errorCode) = partitionOpt match { case Some(partition) => - partition.checkEnoughReplicasReachOffset( - status.requiredOffset, - produceMetadata.produceRequiredAcks) + partition.checkEnoughReplicasReachOffset(status.requiredOffset) case None => // Case A (false, ErrorMapping.UnknownTopicOrPartitionCode) -- 1.9.3 (Apple Git-50)