diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 0e47ff3..5520f57 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -85,4 +85,11 @@ public class Node { return "Node(" + (id < 0 ? "" : id + ", ") + host + ", " + port + ")"; } + /** + * Broker node ids are non-negative int. + */ + public static boolean isIdValid(int nodeId) { + return nodeId >= 0; + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 2652c32..4f7037a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -58,7 +58,7 @@ public class MetadataResponse { if (partError == Errors.NONE.code()) { int partition = partitionInfo.getInt("partition_id"); int leader = partitionInfo.getInt("leader"); - Node leaderNode = leader == -1 ? null : brokers.get(leader); + Node leaderNode = Node.isIdValid(leader) ? brokers.get(leader) : null; Object[] replicas = (Object[]) partitionInfo.get("replicas"); Node[] replicaNodes = new Node[replicas.length]; for (int k = 0; k < replicas.length; k++) diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index a8b73ac..b423836 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -26,6 +26,7 @@ import kafka.consumer.ConsumerConfig import java.util.concurrent.atomic.AtomicInteger import kafka.network.RequestChannel import kafka.message.MessageSet +import org.apache.kafka.common.Node case class PartitionFetchInfo(offset: Long, fetchSize: Int) @@ -132,7 +133,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV }) } - def isFromFollower = Request.isValidBrokerId(replicaId) + def isFromFollower = Node.isIdValid(replicaId) def isFromOrdinaryConsumer = replicaId == Request.OrdinaryConsumerId diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index 57f87a4..fc5dc11 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -24,9 +24,6 @@ import kafka.utils.Logging object Request { val OrdinaryConsumerId: Int = -1 val DebuggingConsumerId: Int = -2 - - // Broker ids are non-negative int. - def isValidBrokerId(brokerId: Int): Boolean = (brokerId >= 0) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0b668f2..1114c11 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -33,6 +33,7 @@ import java.util.concurrent.atomic._ import scala.collection._ import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.common.Node /** * Logic to handle the various Kafka requests @@ -438,7 +439,7 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) val maxOffsetOpt = - if (Request.isValidBrokerId(fromReplicaId)) + if (Node.isIdValid(fromReplicaId)) None else Some(localReplica.highWatermark) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 4da0f2c..e37dee6 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -43,6 +43,7 @@ import kafka.producer.ProducerConfig import junit.framework.AssertionFailedError import junit.framework.Assert._ import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.common.Node /** * Utility functions to help with testing @@ -623,7 +624,7 @@ object TestUtils extends Logging { case None => false case Some(partitionState) => leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader - result && Request.isValidBrokerId(leader) + result && Node.isIdValid(leader) } }, "Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout),