diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 880ab4a004f078e5d84446ea6e4454ecc06c95f2..2fd2e8ee15c03b0cf984ebc1551a8af6a1789408 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -49,6 +49,7 @@ object ErrorMapping { val MessageSetSizeTooLargeCode: Short = 18 val NotEnoughReplicasCode : Short = 19 val NotEnoughReplicasAfterAppendCode: Short = 20 + val InvalidBrokerCode : Short = 21 private val exceptionToCode = Map[Class[Throwable], Short]( diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1afb0cbc403ad6f0bbaba5a35940d031e71fd3cf..252c66dde5166a76ac33f45fd2cce3ee9ae69548 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -359,6 +359,7 @@ class ReplicaManager(val config: KafkaConfig, "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s") .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(","))) + responseMap.put((topic, partitionId), ErrorMapping.InvalidBrokerCode) } } else { // Otherwise record the error code in response