From 2931a4400a49ae7152dff788acc401853cbf7edb Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Sat, 28 Feb 2015 23:58:18 -0800 Subject: [PATCH 1/4] remove unnecessary requiredAcks parameter and clean up few comments --- core/src/main/scala/kafka/cluster/Partition.scala | 20 +++++++++++--------- .../src/main/scala/kafka/server/DelayedProduce.scala | 4 +--- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c4bf48a..a4c4c52 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -288,7 +288,13 @@ class Partition(val topic: String, } } - def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { + /* + * Note that this method will only be called if requiredAcks = -1 + * and we are waiting for all replicas in ISR to be fully caught up to + * the (local) leader's offset corresponding to this produce request + * before we acknowledge the produce request. + */ + def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = { leaderReplicaIfLocal() match { case Some(leaderReplica) => // keep the current immutable replica list reference @@ -301,15 +307,11 @@ class Partition(val topic: String, }) val minIsr = leaderReplica.log.get.config.minInSyncReplicas - trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) + trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks , topic, partitionId)) - if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) { + if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) { /* - * requiredAcks < 0 means acknowledge after all replicas in ISR - * are fully caught up to the (local) leader's offset - * corresponding to this produce request. - * - * minIsr means that the topic is configured not to accept messages + * The topic may be configured not to accept messages * if there are not enough replicas in ISR * in this scenario the request was already appended locally and * then added to the purgatory before the ISR was shrunk @@ -409,7 +411,7 @@ class Partition(val topic: String, // Avoid writing to leader if there are not enough insync replicas to make it safe if (inSyncSize < minIsr && requiredAcks == -1) { throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]" - .format(topic,partitionId,minIsr,inSyncSize)) + .format(topic ,partitionId ,inSyncSize ,minIsr)) } val info = log.append(messages, assignOffsets = true) diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 4d763bf..05078b2 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -89,9 +89,7 @@ class DelayedProduce(delayMs: Long, val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val (hasEnough, errorCode) = partitionOpt match { case Some(partition) => - partition.checkEnoughReplicasReachOffset( - status.requiredOffset, - produceMetadata.produceRequiredAcks) + partition.checkEnoughReplicasReachOffset(status.requiredOffset) case None => // Case A (false, ErrorMapping.UnknownTopicOrPartitionCode) -- 1.9.3 (Apple Git-50) From bb9ba3f54df476da4f8ce3302ff1bf4b0e6f4aa4 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 3 Mar 2015 14:16:08 -0800 Subject: [PATCH 2/4] add logging per Jiangjie Qin comment --- core/src/main/scala/kafka/cluster/Partition.scala | 19 ++++++++++++------- core/src/test/resources/log4j.properties | 6 +++--- .../kafka/api/ProducerFailureHandlingTest.scala | 7 ++++--- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index a4c4c52..3940ab4 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -301,28 +301,33 @@ class Partition(val topic: String, val curInSyncReplicas = inSyncReplicas val numAcks = curInSyncReplicas.count(r => { if (!r.isLocal) - r.logEndOffset.messageOffset >= requiredOffset + if (r.logEndOffset.messageOffset >= requiredOffset) { + trace("Replica %d of %s-%d received offset %s".format(r.brokerId, topic, partitionId, requiredOffset)) + true + } else { + false + } else true /* also count the local (leader) replica */ }) - val minIsr = leaderReplica.log.get.config.minInSyncReplicas trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks , topic, partitionId)) + val minIsr = leaderReplica.log.get.config.minInSyncReplicas + if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) { /* - * The topic may be configured not to accept messages - * if there are not enough replicas in ISR - * in this scenario the request was already appended locally and - * then added to the purgatory before the ISR was shrunk + * The topic may be configured not to accept messages if there are not enough replicas in ISR + * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk */ if (minIsr <= curInSyncReplicas.size) { (true, ErrorMapping.NoError) } else { (true, ErrorMapping.NotEnoughReplicasAfterAppendCode) } - } else + } else { (false, ErrorMapping.NoError) + } case None => (false, ErrorMapping.NotLeaderForPartitionCode) } diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 1b7d5d8..9973dad 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,14 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=OFF, stdout +log4j.rootLogger=TRACE, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=ERROR -log4j.logger.org.apache.kafka=ERROR +log4j.logger.kafka=TRACE +log4j.logger.org.apache.kafka=TRACE # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index ba48a63..e61ff07 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -32,7 +32,7 @@ import kafka.integration.KafkaServerTestHarness import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils} import org.apache.kafka.common.KafkaException -import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException} +import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException} import org.apache.kafka.clients.producer._ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @@ -348,8 +348,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas") } catch { case e: ExecutionException => - if (!e.getCause.isInstanceOf[NotEnoughReplicasException]) { - fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas") + if (!e.getCause.isInstanceOf[NotEnoughReplicasException] && + !e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException]) { + fail("Expected NotEnoughReplicasException or NotEnoughReplicasAfterAppendException when producing to topic with fewer brokers than min.insync.replicas") } } -- 1.9.3 (Apple Git-50) From b4077ec5b756348c34e41adc50ec06a720e7fef7 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 3 Mar 2015 17:17:39 -0800 Subject: [PATCH 3/4] revert unintentional changes to log4j --- core/src/test/resources/log4j.properties | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 9973dad..1b7d5d8 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,14 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=TRACE, stdout +log4j.rootLogger=OFF, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=TRACE -log4j.logger.org.apache.kafka=TRACE +log4j.logger.kafka=ERROR +log4j.logger.org.apache.kafka=ERROR # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN -- 1.9.3 (Apple Git-50) From b483384cd5f63399f8f17cf99246815c676c80d7 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Fri, 6 Mar 2015 13:33:45 -0800 Subject: [PATCH 4/4] few small fixes suggested by Jun --- core/src/main/scala/kafka/cluster/Partition.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3940ab4..0bf55f3 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -302,7 +302,7 @@ class Partition(val topic: String, val numAcks = curInSyncReplicas.count(r => { if (!r.isLocal) if (r.logEndOffset.messageOffset >= requiredOffset) { - trace("Replica %d of %s-%d received offset %s".format(r.brokerId, topic, partitionId, requiredOffset)) + trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset)) true } else { false @@ -325,9 +325,8 @@ class Partition(val topic: String, } else { (true, ErrorMapping.NotEnoughReplicasAfterAppendCode) } - } else { + } else (false, ErrorMapping.NoError) - } case None => (false, ErrorMapping.NotLeaderForPartitionCode) } @@ -416,7 +415,7 @@ class Partition(val topic: String, // Avoid writing to leader if there are not enough insync replicas to make it safe if (inSyncSize < minIsr && requiredAcks == -1) { throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]" - .format(topic ,partitionId ,inSyncSize ,minIsr)) + .format(topic, partitionId, inSyncSize, minIsr)) } val info = log.append(messages, assignOffsets = true) -- 1.9.3 (Apple Git-50)