diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index ff106b4..f514398 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -269,14 +269,27 @@ class Partition(val topic: String, else true /* also count the local (leader) replica */ }) + val minIsr = leaderReplica.log match { + case Some(log) => log.config.minInsyncReplicas + // This shouldn't happen - we already confirmed we have a local replica + case None => new LogConfig().minInsyncReplicas + } + trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId)) - if ((requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset) || - (requiredAcks > 0 && numAcks >= requiredAcks)) { + if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) { /* * requiredAcks < 0 means acknowledge after all replicas in ISR * are fully caught up to the (local) leader's offset * corresponding to this produce request. + * minIsr means that the topic is configured not to accept messages + * if there are not enough replicas in ISR */ + if (minIsr <= curInSyncReplicas.size) { + (true, ErrorMapping.NoError) + } else { + (true, ErrorMapping.NotEnoughReplicasCode) + } + } else if (requiredAcks > 0 && numAcks >= requiredAcks) { (true, ErrorMapping.NoError) } else (false, ErrorMapping.NoError) diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 3fae791..11cbeb4 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -47,6 +47,7 @@ object ErrorMapping { val ConsumerCoordinatorNotAvailableCode: Short = 15 val NotCoordinatorForConsumerCode: Short = 16 val InvalidTopicCode : Short = 17 + val NotEnoughReplicasCode : Short = 18 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -65,7 +66,8 @@ object ErrorMapping { classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode, classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode, classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode, - classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode + classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode, + classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode ).withDefaultValue(UnknownCode) /* invert the mapping */ diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 5746ad4..206e912 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -36,6 +36,7 @@ object Defaults { val MinCleanableDirtyRatio = 0.5 val Compact = false val UncleanLeaderElectionEnable = true + val MinInsyncReplicas = 1 } /** @@ -53,7 +54,9 @@ object Defaults { * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned * @param compact Should old segments in this log be deleted or deduplicated? * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property - * but included here for topic-specific configuration validation purposes + * but included here for topic-specific configuration validation purposes + * @param minInsyncReplicas If number of insync replicas drops below this number, we stop accepting writes with -1 required acks + * */ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val segmentMs: Long = Defaults.SegmentMs, @@ -68,7 +71,8 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, val compact: Boolean = Defaults.Compact, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) { + val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, + val minInsyncReplicas: Int = Defaults.MinInsyncReplicas) { def toProps: Properties = { val props = new Properties() @@ -87,6 +91,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) props.put(CleanupPolicyProp, if(compact) "compact" else "delete") props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) + props.put(MinInsyncReplicasProp, minInsyncReplicas.toString) props } @@ -107,6 +112,7 @@ object LogConfig { val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" val CleanupPolicyProp = "cleanup.policy" val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" + val MinInsyncReplicasProp = "min.insync.replicas" val ConfigNames = Set(SegmentBytesProp, SegmentMsProp, @@ -121,7 +127,8 @@ object LogConfig { DeleteRetentionMsProp, MinCleanableDirtyRatioProp, CleanupPolicyProp, - UncleanLeaderElectionEnableProp) + UncleanLeaderElectionEnableProp, + MinInsyncReplicasProp) /** @@ -144,7 +151,8 @@ object LogConfig { compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete") .trim.toLowerCase != "delete", uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp, - Defaults.UncleanLeaderElectionEnable.toString).toBoolean) + Defaults.UncleanLeaderElectionEnable.toString).toBoolean, + minInsyncReplicas = props.getProperty(MinInsyncReplicasProp,Defaults.MinInsyncReplicas.toString).toInt) } /**