diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9de4af..08d199f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -14,7 +14,10 @@ package org.apache.kafka.clients.producer;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
@@ -77,7 +80,8 @@ public class ProducerConfig extends AbstractConfig {
/** acks */
public static final String ACKS_CONFIG = "acks";
- private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are common: "
+ private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
+ + " durability of records that are sent. The following settings are common: "
+ "
"
+ " acks=0 If set to zero then the producer will not wait for any acknowledgment from the"
+ " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
@@ -87,11 +91,9 @@ public class ProducerConfig extends AbstractConfig {
+ " acks=1 This will mean the leader will write the record to its local log but will respond"
+ " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
+ " acknowledging the record but before the followers have replicated it then the record will be lost."
- + " acks=all This means the leader will wait for the full set of in-sync replicas to"
+ + " acks=-1 This means the leader will wait for the full set of in-sync replicas to"
+ " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
- + " remains alive. This is the strongest available guarantee."
- + " - Other settings such as
acks=2 are also possible, and will require the given number of"
- + " acknowledgements but this is generally less useful.";
+ + " remains alive. This is the strongest available guarantee.";
/** timeout.ms */
public static final String TIMEOUT_CONFIG = "timeout.ms";
@@ -175,7 +177,12 @@ public class ProducerConfig extends AbstractConfig {
config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
- .define(ACKS_CONFIG, Type.STRING, "1", Importance.HIGH, ACKS_DOC)
+ .define(ACKS_CONFIG,
+ Type.STRING,
+ "1",
+ in(Arrays.asList("-1", "0", "1")),
+ Importance.HIGH,
+ ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
@@ -199,7 +206,9 @@ public class ProducerConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
METADATA_FETCH_TIMEOUT_DOC)
- .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
+ .define(METADATA_MAX_AGE_CONFIG, Type.LONG,
+ 5 * 60 * 1000, atLeast(0),
+ Importance.LOW, METADATA_MAX_AGE_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index addc906..227309e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -268,6 +268,48 @@ public class ConfigDef {
}
}
+ public static class ValidString implements Validator {
+ List validStrings;
+
+ private ValidString(List validStrings) {
+ this.validStrings = validStrings;
+ }
+
+ public static ValidString in(List validStrings) {
+ return new ValidString(validStrings);
+ }
+
+ @Override
+ public void ensureValid(String name, Object o) {
+
+ String s = (String) o;
+
+ if (!validStrings.contains(s)) {
+ throw new ConfigException(name,o,"String must be one of:" +join(validStrings));
+ }
+
+ }
+
+ public String toString() {
+ return "[" + join(validStrings) + "]";
+ }
+
+ private String join(List list)
+ {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (String item : list)
+ {
+ if (first)
+ first = false;
+ else
+ sb.append(",");
+ sb.append(item);
+ }
+ return sb.toString();
+ }
+ }
+
private static class ConfigKey {
public final String name;
public final Type type;
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
new file mode 100644
index 0000000..75c80a9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ * This exception is raised when the low ISR size is discovered *after* the message
+ * was already appended to the log. Producer retries will cause duplicates.
+ */
+public class NotEnoughReplicasAfterAppendException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public NotEnoughReplicasAfterAppendException() {
+ super();
+ }
+
+ public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
+ super(message,cause);
+ }
+
+ public NotEnoughReplicasAfterAppendException(String message) {
+ super(message);
+ }
+
+ public NotEnoughReplicasAfterAppendException(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
new file mode 100644
index 0000000..486d515
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ */
+public class NotEnoughReplicasException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public NotEnoughReplicasException() {
+ super();
+ }
+
+ public NotEnoughReplicasException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NotEnoughReplicasException(String message) {
+ super(message);
+ }
+
+ public NotEnoughReplicasException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d434f42..2992353 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.errors.*;
/**
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
* are thus part of the protocol. The names can be changed but the error code cannot.
- *
+ *
* Do not add exceptions that occur only on the client or only on the server here.
*/
public enum Errors {
@@ -43,7 +43,8 @@ public enum Errors {
OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
// TODO: errorCode 14, 15, 16
- INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic."));
+ INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
+ NOT_ENOUGH_REPLICAS(18, new NotEnoughReplicasException("The request requested more in-sync replicas than are currently available."));
private static Map, Errors> classToError = new HashMap, Errors>();
private static Map codeToError = new HashMap();
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ff106b4..8704318 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -269,14 +269,23 @@ class Partition(val topic: String,
else
true /* also count the local (leader) replica */
})
+ val minIsr = leaderReplica.log.get.config.minInSyncReplicas
+
trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
- if ((requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) ||
- (requiredAcks > 0 && numAcks >= requiredAcks)) {
+ if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) {
/*
* requiredAcks < 0 means acknowledge after all replicas in ISR
* are fully caught up to the (local) leader's offset
* corresponding to this produce request.
+ * minIsr means that the topic is configured not to accept messages
+ * if there are not enough replicas in ISR
*/
+ if (minIsr <= curInSyncReplicas.size) {
+ (true, ErrorMapping.NoError)
+ } else {
+ (true, ErrorMapping.NotEnoughReplicasCode)
+ }
+ } else if (requiredAcks > 0 && numAcks >= requiredAcks) {
(true, ErrorMapping.NoError)
} else
(false, ErrorMapping.NoError)
@@ -350,12 +359,21 @@ class Partition(val topic: String,
stuckReplicas ++ slowReplicas
}
- def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
+ def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = {
inReadLock(leaderIsrUpdateLock) {
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
case Some(leaderReplica) =>
val log = leaderReplica.log.get
+ val minIsr = log.config.minInSyncReplicas
+ val inSyncSize = inSyncReplicas.size
+
+ // Avoid writing to leader if there are not enough insync replicas to make it safe
+ if (inSyncSize < minIsr && requiredAcks == -1) {
+ throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
+ .format(topic,partitionId,minIsr,inSyncSize))
+ }
+
val info = log.append(messages, assignOffsets = true)
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId))
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 3fae791..3ce17f0 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -23,7 +23,7 @@ import java.lang.Throwable
import scala.Predef._
/**
- * A bi-directional mapping between error codes and exceptions x
+ * A bi-directional mapping between error codes and exceptions
*/
object ErrorMapping {
val EmptyByteBuffer = ByteBuffer.allocate(0)
@@ -47,8 +47,10 @@ object ErrorMapping {
val ConsumerCoordinatorNotAvailableCode: Short = 15
val NotCoordinatorForConsumerCode: Short = 16
val InvalidTopicCode : Short = 17
+ val NotEnoughReplicasCode : Short = 18
+ val NotEnoughReplicasAfterAppendCode: Short = 19
- private val exceptionToCode =
+ private val exceptionToCode =
Map[Class[Throwable], Short](
classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
@@ -65,15 +67,17 @@ object ErrorMapping {
classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode,
classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode,
classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode,
- classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode
+ classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
+ classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
+ classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode
).withDefaultValue(UnknownCode)
-
+
/* invert the mapping */
- private val codeToException =
+ private val codeToException =
(Map[Short, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
-
+
def codeFor(exception: Class[Throwable]): Short = exceptionToCode(exception)
-
+
def maybeThrowException(code: Short) =
if(code != 0)
throw codeToException(code).newInstance()
diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala
new file mode 100644
index 0000000..c4f9def
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ * This exception is raised when the low ISR size is discovered *after* the message
+ * was already appended to the log. Producer retries will cause duplicates.
+ */
+class NotEnoughReplicasAfterAppendException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala
new file mode 100644
index 0000000..3da2351
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ */
+class NotEnoughReplicasException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 5746ad4..b7578a6 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -36,6 +36,7 @@ object Defaults {
val MinCleanableDirtyRatio = 0.5
val Compact = false
val UncleanLeaderElectionEnable = true
+ val MinInSyncReplicas = 1
}
/**
@@ -53,7 +54,9 @@ object Defaults {
* @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
* @param compact Should old segments in this log be deleted or deduplicated?
* @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property
- * but included here for topic-specific configuration validation purposes
+ * but included here for topic-specific configuration validation purposes
+ * @param minInSyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 required acks
+ *
*/
case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
val segmentMs: Long = Defaults.SegmentMs,
@@ -68,8 +71,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
val deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
val compact: Boolean = Defaults.Compact,
- val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) {
-
+ val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
+ val minInSyncReplicas: Int = Defaults.MinInSyncReplicas) {
+
def toProps: Properties = {
val props = new Properties()
import LogConfig._
@@ -87,9 +91,9 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
+ props.put(MinInSyncReplicasProp, minInSyncReplicas.toString)
props
}
-
}
object LogConfig {
@@ -107,13 +111,14 @@ object LogConfig {
val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
val CleanupPolicyProp = "cleanup.policy"
val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
-
- val ConfigNames = Set(SegmentBytesProp,
- SegmentMsProp,
- SegmentIndexBytesProp,
- FlushMessagesProp,
- FlushMsProp,
- RetentionBytesProp,
+ val MinInSyncReplicasProp = "min.insync.replicas"
+
+ val ConfigNames = Set(SegmentBytesProp,
+ SegmentMsProp,
+ SegmentIndexBytesProp,
+ FlushMessagesProp,
+ FlushMsProp,
+ RetentionBytesProp,
RententionMsProp,
MaxMessageBytesProp,
IndexIntervalBytesProp,
@@ -121,9 +126,9 @@ object LogConfig {
DeleteRetentionMsProp,
MinCleanableDirtyRatioProp,
CleanupPolicyProp,
- UncleanLeaderElectionEnableProp)
-
-
+ UncleanLeaderElectionEnableProp,
+ MinInSyncReplicasProp)
+
/**
* Parse the given properties instance into a LogConfig object
*/
@@ -144,9 +149,10 @@ object LogConfig {
compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete")
.trim.toLowerCase != "delete",
uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp,
- Defaults.UncleanLeaderElectionEnable.toString).toBoolean)
+ Defaults.UncleanLeaderElectionEnable.toString).toBoolean,
+ minInSyncReplicas = props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt)
}
-
+
/**
* Create a log config instance using the given properties and defaults
*/
@@ -155,7 +161,7 @@ object LogConfig {
props.putAll(overrides)
fromProps(props)
}
-
+
/**
* Check that property names are valid
*/
@@ -164,15 +170,27 @@ object LogConfig {
for(name <- props.keys)
require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
}
-
+
/**
* Check that the given properties contain only valid log config names, and that all values can be parsed.
*/
def validate(props: Properties) {
validateNames(props)
+ validateMinInSyncReplicas(props)
LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
}
-
-}
-
-
\ No newline at end of file
+
+ /**
+ * Check that MinInSyncReplicas is reasonable
+ * Unfortunately, we can't validate its smaller than number of replicas
+ * since we don't have this information here
+ */
+ private def validateMinInSyncReplicas(props: Properties) {
+ val minIsr = props.getProperty(MinInSyncReplicasProp)
+ if (minIsr != null && minIsr.toInt < 1) {
+ throw new InvalidConfigException("Wrong value " + minIsr + " of min.insync.replicas in topic configuration; " +
+ " Valid values are at least 1")
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index 69b2d0c..70e8ed9 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -42,11 +42,15 @@ trait SyncProducerConfigShared {
val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId)
/*
- * The required acks of the producer requests - negative value means ack
- * after the replicas in ISR have caught up to the leader's offset
- * corresponding to this produce request.
+ * The number of acknowledgments the producer requires the leader to have received before considering a request complete.
+ * This controls the durability of the messages sent by the producer.
+ *
+ * request.required.acks = 0 - means the producer will not wait for any acknowledgement from the leader.
+ * request.required.acks = 1 - means the leader will write the message to its local log and immediately acknowledge
+ * request.required.acks = -1 - means the leader will wait for acknowledgement from all in-sync replicas before acknowledging the write
*/
- val requestRequiredAcks = props.getShort("request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
+
+ val requestRequiredAcks = props.getShortInRange("request.required.acks", SyncProducerConfig.DefaultRequiredAcks,(-1,1))
/*
* The ack timeout of the producer requests. Value must be non-negative and non-zero
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c584b55..ee13862 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -248,7 +248,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
val info = partitionOpt match {
case Some(partition) =>
- partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
+ partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks)
case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
.format(topicAndPartition, brokerId))
}
@@ -284,6 +284,10 @@ class KafkaApis(val requestChannel: RequestChannel,
warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage))
new ProduceResult(topicAndPartition, nle)
+ case nere: NotEnoughReplicasException =>
+ warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
+ producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nere.getMessage))
+ new ProduceResult(topicAndPartition,nere)
case e: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 39f777b..209a409 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -18,12 +18,12 @@
package kafka.api
import kafka.common.Topic
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{InvalidTopicException,NotEnoughReplicasException}
import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import org.junit.Assert._
-import java.util.Random
+import java.util.{Properties, Random}
import java.lang.Integer
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
@@ -302,6 +302,59 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
}
+ @Test
+ def testNotEnoughReplicas() {
+ val topicName = "minisrtest"
+ val topicProps = new Properties();
+ topicProps.put("min.insync.replicas","3");
+
+
+ TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
+
+
+ val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
+ try {
+ producer3.send(record).get
+ fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas")
+ } catch {
+ case e: ExecutionException =>
+ if (!e.getCause.isInstanceOf[NotEnoughReplicasException]) {
+ fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas")
+ }
+ }
+ }
+
+ @Test
+ def testNotEnoughReplicasAfterBrokerShutdown() {
+ val topicName = "minisrtest2"
+ val topicProps = new Properties();
+ topicProps.put("min.insync.replicas","2");
+
+
+ TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
+
+
+ val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
+ // This should work
+ producer3.send(record).get
+
+ //shut down one broker
+ servers.head.shutdown()
+ servers.head.awaitShutdown()
+ try {
+ producer3.send(record).get
+ fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas")
+ } catch {
+ case e: ExecutionException =>
+ if (!e.getCause.isInstanceOf[NotEnoughReplicasException]) {
+ fail("Expected NotEnoughReplicasException when producing to topic with fewer brokers than min.insync.replicas")
+ }
+ }
+
+ servers.head.startup()
+
+ }
+
private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
{
val numRecords = 1000
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index dd71d81..ce65dab 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -17,6 +17,7 @@
package kafka.producer
+import org.apache.kafka.common.config.ConfigException
import org.scalatest.TestFailedException
import org.scalatest.junit.JUnit3Suite
import kafka.consumer.SimpleConsumer
@@ -143,7 +144,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
@Test
def testSendToNewTopic() {
val props1 = new util.Properties()
- props1.put("request.required.acks", "2")
+ props1.put("request.required.acks", "-1")
val topic = "new-topic"
// create topic with 1 partition and await leadership
@@ -181,24 +182,20 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
// no need to retry since the send will always fail
props2.put("message.send.max.retries", "0")
- val producer2 = TestUtils.createProducer[String, String](
- brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[StringEncoder].getName,
- partitioner = classOf[StaticPartitioner].getName,
- producerProps = props2)
-
try {
- producer2.send(new KeyedMessage[String, String](topic, "test", "test2"))
- fail("Should have timed out for 3 acks.")
+ val producer2 = TestUtils.createProducer[String, String](
+ brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[StringEncoder].getName,
+ partitioner = classOf[StaticPartitioner].getName,
+ producerProps = props2)
+ producer2.close
+ fail("we don't support request.required.acks greater than 1")
}
catch {
- case se: FailedToSendMessageException =>
- // this is expected
+ case iae: IllegalArgumentException => // this is expected
case e: Throwable => fail("Not expected", e)
- }
- finally {
- producer2.close()
+
}
}
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 24deea0..fb61d55 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -18,6 +18,7 @@
package kafka.producer
import java.net.SocketTimeoutException
+import java.util.Properties
import junit.framework.Assert
import kafka.admin.AdminUtils
import kafka.integration.KafkaServerTestHarness
@@ -113,6 +114,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
Assert.assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
}
+
@Test
def testMessageSizeTooLargeWithAckZero() {
val server = servers.head
@@ -225,4 +227,24 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val response = producer.send(emptyRequest)
Assert.assertTrue(response == null)
}
+
+ @Test
+ def testNotEnoughReplicas() {
+ val topicName = "minisrtest"
+ val server = servers.head
+
+ val props = TestUtils.getSyncProducerConfig(server.socketServer.port)
+ props.put("request.required.acks", "-1")
+
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ val topicProps = new Properties();
+ topicProps.put("min.insync.replicas","2");
+ AdminUtils.createTopic(zkClient, topicName, 1, 1,topicProps)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicName, 0)
+
+ val response = producer.send(TestUtils.produceRequest(topicName, 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))
+
+ Assert.assertEquals(ErrorMapping.NotEnoughReplicasCode, response.status(TopicAndPartition(topicName, 0)).error)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2dbdd3c..c26691b 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -168,10 +168,14 @@ object TestUtils extends Logging {
* Wait until the leader is elected and the metadata is propagated to all brokers.
* Return the leader for each partition.
*/
- def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
- servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = {
+ def createTopic(zkClient: ZkClient,
+ topic: String,
+ numPartitions: Int = 1,
+ replicationFactor: Int = 1,
+ servers: Seq[KafkaServer],
+ topicConfig: Properties = new Properties) : scala.collection.immutable.Map[Int, Option[Int]] = {
// create topic
- AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor)
+ AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor,topicConfig)
// wait until the update metadata request for new topic reaches all servers
(0 until numPartitions).map { case i =>
TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)