diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java new file mode 100644 index 0000000..9e00187 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class NotEnoughReplicasException extends ApiException { + private static final long serialVersionUID = 1L; + + public NotEnoughReplicasException() { + super(); + } + + public NotEnoughReplicasException(String message, Throwable cause) { + super(message, cause); + } + + public NotEnoughReplicasException(String message) { + super(message); + } + + public NotEnoughReplicasException(Throwable cause) { + super(cause); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index d434f42..a37aa3d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -43,7 +43,8 @@ public enum Errors { OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")), // TODO: errorCode 14, 15, 16 - INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")); + INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), + NOT_ENOUGH_REPLICAS(18, new NotEnoughReplicasException("The request requested more in-sync replicas than are currently available.")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index ff106b4..cbcecbf 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -269,14 +269,27 @@ class Partition(val topic: String, else true /* also count the local (leader) replica */ }) + val minIsr = leaderReplica.log match { + case Some(log) => log.config.minInSyncReplicas + // This shouldn't happen - we already confirmed we have a local replica + case None => new LogConfig().minInSyncReplicas + } + trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) - if ((requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) || - (requiredAcks > 0 && numAcks >= requiredAcks)) { + if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) { /* * requiredAcks < 0 means acknowledge after all replicas in ISR * are fully caught up to the (local) leader's offset * corresponding to this produce request. + * minIsr means that the topic is configured not to accept messages + * if there are not enough replicas in ISR */ + if (minIsr <= curInSyncReplicas.size) { + (true, ErrorMapping.NoError) + } else { + (true, ErrorMapping.NotEnoughReplicasCode) + } + } else if (requiredAcks > 0 && numAcks >= requiredAcks) { (true, ErrorMapping.NoError) } else (false, ErrorMapping.NoError) diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 3fae791..11cbeb4 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -47,6 +47,7 @@ object ErrorMapping { val ConsumerCoordinatorNotAvailableCode: Short = 15 val NotCoordinatorForConsumerCode: Short = 16 val InvalidTopicCode : Short = 17 + val NotEnoughReplicasCode : Short = 18 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -65,7 +66,8 @@ object ErrorMapping { classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode, classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode, classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode, - classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode + classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode, + classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode ).withDefaultValue(UnknownCode) /* invert the mapping */ diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala new file mode 100644 index 0000000..68222b5 --- /dev/null +++ b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + + +class NotEnoughReplicasException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 5746ad4..f799d01 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -36,6 +36,7 @@ object Defaults { val MinCleanableDirtyRatio = 0.5 val Compact = false val UncleanLeaderElectionEnable = true + val MinInSyncReplicas = 1 } /** @@ -53,7 +54,9 @@ object Defaults { * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned * @param compact Should old segments in this log be deleted or deduplicated? * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property - * but included here for topic-specific configuration validation purposes + * but included here for topic-specific configuration validation purposes + * @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 required acks + * */ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val segmentMs: Long = Defaults.SegmentMs, @@ -68,8 +71,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, val compact: Boolean = Defaults.Compact, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) { - + val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, + val minInSyncReplicas: Int = Defaults.MinInSyncReplicas) { + def toProps: Properties = { val props = new Properties() import LogConfig._ @@ -87,9 +91,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) props.put(CleanupPolicyProp, if(compact) "compact" else "delete") props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) + props.put(MinInSyncReplicasProp, minInSyncReplicas.toString) props } - } object LogConfig { @@ -107,13 +111,14 @@ object LogConfig { val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" val CleanupPolicyProp = "cleanup.policy" val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" - - val ConfigNames = Set(SegmentBytesProp, - SegmentMsProp, - SegmentIndexBytesProp, - FlushMessagesProp, - FlushMsProp, - RetentionBytesProp, + val MinInSyncReplicasProp = "min.insync.replicas" + + val ConfigNames = Set(SegmentBytesProp, + SegmentMsProp, + SegmentIndexBytesProp, + FlushMessagesProp, + FlushMsProp, + RetentionBytesProp, RententionMsProp, MaxMessageBytesProp, IndexIntervalBytesProp, @@ -121,9 +126,9 @@ object LogConfig { DeleteRetentionMsProp, MinCleanableDirtyRatioProp, CleanupPolicyProp, - UncleanLeaderElectionEnableProp) - - + UncleanLeaderElectionEnableProp, + MinInSyncReplicasProp) + /** * Parse the given properties instance into a LogConfig object */ @@ -144,9 +149,10 @@ object LogConfig { compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete") .trim.toLowerCase != "delete", uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp, - Defaults.UncleanLeaderElectionEnable.toString).toBoolean) + Defaults.UncleanLeaderElectionEnable.toString).toBoolean, + minInSyncReplicas = props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt) } - + /** * Create a log config instance using the given properties and defaults */ @@ -170,9 +176,21 @@ object LogConfig { */ def validate(props: Properties) { validateNames(props) + validateMinInSyncReplicas(props) LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values } - -} - - \ No newline at end of file + + /** + * Check that MinInSyncReplicas is reasonable + * Unfortunately, we can't validate its smaller than number of replicas + * since we don't have this information here + */ + private def validateMinInSyncReplicas(props: Properties) { + val minIsr = props.getProperty(MinInSyncReplicasProp) + if (minIsr != null && minIsr.toInt < 1) { + throw new InvalidConfigException("Wrong value " + minIsr + " of min.insync.replicas in Topic configuration; " + + " Valid values are at least 1") + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 39f777b..3327afd 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -17,13 +17,13 @@ package kafka.api -import kafka.common.Topic +import kafka.common.{NotEnoughReplicasException, Topic} import org.apache.kafka.common.errors.InvalidTopicException import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import java.util.Random +import java.util.{Properties, Random} import java.lang.Integer import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} @@ -302,6 +302,47 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get } + @Test + def testNotEnoughReplicas() { + val topicName = "minisrtest" + val topicProps = new Properties(); + topicProps.put("min.insync.replicas","3"); + + + TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) + + + val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) + intercept[ExecutionException] { + producer3.send(record).get + } + } + + @Test + def testNotEnoughReplicasAfterBrokerShutdown() { + val topicName = "minisrtest2" + val topicProps = new Properties(); + topicProps.put("min.insync.replicas","2"); + + + TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) + + + val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) + // This should work + producer3.send(record).get + + //shut down one broker + servers.head.shutdown() + servers.head.awaitShutdown() + intercept[ExecutionException] { + producer3.send(record).get + } + + servers.head.startup() + + } + private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) { val numRecords = 1000 diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 24deea0..fb61d55 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -18,6 +18,7 @@ package kafka.producer import java.net.SocketTimeoutException +import java.util.Properties import junit.framework.Assert import kafka.admin.AdminUtils import kafka.integration.KafkaServerTestHarness @@ -113,6 +114,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset) } + @Test def testMessageSizeTooLargeWithAckZero() { val server = servers.head @@ -225,4 +227,24 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val response = producer.send(emptyRequest) Assert.assertTrue(response == null) } + + @Test + def testNotEnoughReplicas() { + val topicName = "minisrtest" + val server = servers.head + + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + props.put("request.required.acks", "-1") + + val producer = new SyncProducer(new SyncProducerConfig(props)) + val topicProps = new Properties(); + topicProps.put("min.insync.replicas","2"); + AdminUtils.createTopic(zkClient, topicName, 1, 1,topicProps) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicName, 0) + + val response = producer.send(TestUtils.produceRequest(topicName, 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1)) + + Assert.assertEquals(ErrorMapping.NotEnoughReplicasCode, response.status(TopicAndPartition(topicName, 0)).error) + } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2dbdd3c..c26691b 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -168,10 +168,14 @@ object TestUtils extends Logging { * Wait until the leader is elected and the metadata is propagated to all brokers. * Return the leader for each partition. */ - def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, - servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + def createTopic(zkClient: ZkClient, + topic: String, + numPartitions: Int = 1, + replicationFactor: Int = 1, + servers: Seq[KafkaServer], + topicConfig: Properties = new Properties) : scala.collection.immutable.Map[Int, Option[Int]] = { // create topic - AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor) + AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor,topicConfig) // wait until the update metadata request for new topic reaches all servers (0 until numPartitions).map { case i => TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)