diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 09a6f11..67ceb75 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1281,7 +1281,7 @@ public class KafkaConsumer implements Consumer { if (!resp.wasDisconnected()) { ConsumerMetadataResponse response = new ConsumerMetadataResponse(resp.responseBody()); if (response.errorCode() == Errors.NONE.code()) - return new Node(Integer.MIN_VALUE, response.node().host(), response.node().port()); + return new Node(Integer.MAX_VALUE - response.node().id(), response.node().host(), response.node().port()); } } return null; diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 0e47ff3..88c3b24 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -82,7 +82,7 @@ public class Node { @Override public String toString() { - return "Node(" + (id < 0 ? "" : id + ", ") + host + ", " + port + ")"; + return "Node(" + id + ", " + host + ", " + port + ")"; } } diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index ef4c9ae..dc0512b 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -64,6 +64,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { val index = TestUtils.random.nextInt(servers.length) if(alive(index)) { servers(index).shutdown() + servers(index).awaitShutdown() alive(index) = false } index