diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index b1db415d4..5e1149a05 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -254,7 +254,7 @@ public abstract class AbstractCoordinator implements Closeable { client.awaitMetadataUpdate(timer); } else throw future.exception(); - } else if (coordinator != null && client.isUnavailable(coordinator)) { + } else if (coordinator != null && client.getClient().connectionFailed(coordinator)) { // we found the coordinator, but the connection has failed, so mark // it dead and backoff before retrying discovery markCoordinatorUnknown("coordinator unavailable"); @@ -883,7 +883,7 @@ public abstract class AbstractCoordinator implements Closeable { * @return the current coordinator or null if it is unknown */ protected synchronized Node checkAndGetCoordinator() { - if (coordinator != null && client.isUnavailable(coordinator)) { + if (coordinator != null && client.getClient().connectionFailed(coordinator)) { markCoordinatorUnknown(true, "coordinator unavailable"); return null; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index c591074ba..86d17f21b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -726,4 +726,7 @@ public class ConsumerNetworkClient implements Closeable { } } + public KafkaClient getClient() { + return client; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index a18fa29fc..5ea7f28ec 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -608,6 +608,38 @@ public class MockClient implements KafkaClient { default void close() {} } + public static class ManualMockMetadataUpdater implements MockMetadataUpdater { + + private List nodes; + + public ManualMockMetadataUpdater(List nodes) { + this.nodes = nodes; + } + + public ManualMockMetadataUpdater() { + this.nodes = new ArrayList<>(); + } + + @Override + public List fetchNodes() { + return nodes; + } + + public void setNodes(List nodes) { + this.nodes = nodes; + } + + @Override + public boolean isUpdateNeeded() { + return false; + } + + @Override + public void update(Time time, MetadataUpdate update) { + throw new UnsupportedOperationException(); + } + } + private static class DefaultMockMetadataUpdater implements MockMetadataUpdater { private final Metadata metadata; private MetadataUpdate lastUpdate; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index a6d06fa6d..c160a7941 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -64,23 +64,24 @@ import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.requests.AbstractRequest; -import org.apache.kafka.common.requests.AbstractResponse; -import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse; -import org.apache.kafka.common.requests.HeartbeatResponse; -import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.JoinGroupResponse; -import org.apache.kafka.common.requests.LeaveGroupResponse; -import org.apache.kafka.common.requests.ListOffsetsRequest; -import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.MetadataResponse; -import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.LeaveGroupResponse; +import org.apache.kafka.common.requests.HeartbeatResponse; +import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.RequestTestUtils; -import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -512,6 +513,67 @@ public class KafkaConsumerTest { consumer.close(Duration.ofMillis(0)); } + private ConsumerMetadata newConsumerMetadata(SubscriptionState subscription) { + long refreshBackoffMs = 50; + long expireMs = 50000; + return new ConsumerMetadata(refreshBackoffMs, expireMs, false, false, + subscription, new LogContext(), new ClusterResourceListeners()); + } + + @Test + public void testCoordinatorChange() throws InterruptedException { + Time time = new MockTime(); + SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + Node old = new Node(-1, "localhost", 1970); + List originList = new ArrayList<>(); + originList.add(old); + MockClient.ManualMockMetadataUpdater metadataUpdater = new MockClient.ManualMockMetadataUpdater(originList); + ConsumerMetadata metadata = newConsumerMetadata(subscription); + MockClient client = new MockClient(time, metadataUpdater); + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + + KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); + Node coordinator = prepareRebalance(client, old, assignor, singletonList(tp0), null); + assertEquals("localhost", coordinator.host()); + assertEquals(1970, coordinator.port()); + + consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)); + consumer.poll(Duration.ZERO); + + client.prepareResponseFrom(fetchResponse(tp0, 5, 0), old); + AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator, Errors.NONE); + + time.sleep(heartbeatIntervalMs); + Thread.sleep(heartbeatIntervalMs); + + consumer.poll(Duration.ZERO); + assertTrue(heartbeatReceived.get()); + + List newNodes = new ArrayList<>(); + Node newNode = new Node(1, "localhost", 1971); + newNodes.add(newNode); + metadataUpdater.setNodes(newNodes); + time.sleep(heartbeatIntervalMs); + Thread.sleep(heartbeatIntervalMs); + + coordinator = prepareRebalance(client, old, assignor, singletonList(tp0), newNode); + assertEquals("localhost", coordinator.host()); + assertEquals(1971, coordinator.port()); + + consumer.poll(Duration.ZERO); + assertTrue(heartbeatReceived.get()); + + MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.singletonList("test"), true); + newNode = new Node(-2147483648, "localhost", 1971); + client.setUnreachable(newNode, 1000); + long now = time.milliseconds(); + ClientRequest request = client.newClientRequest("-2147483648", builder, now, false); + assertThrows(IllegalStateException.class, () -> client.send(request, now)); + + consumer.close(Duration.ofMillis(0)); + } + @Test public void verifyPollTimesOutDuringMetadataUpdate() { final Time time = new MockTime();