diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 86e4f2b..3500003 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -119,7 +119,9 @@ public abstract class AbstractCoordinator implements Closeable { private Generation generation = Generation.NO_GENERATION; private RequestFuture findCoordinatorFuture = null; - + + //For testing purposes + private long lastHeartbeat = 0; /** * Initialize the coordination manager. */ @@ -295,6 +297,7 @@ public abstract class AbstractCoordinator implements Closeable { notify(); } heartbeat.poll(now); + lastHeartbeat = now; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index bc6af55..fbfe9c9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1329,6 +1329,98 @@ public class KafkaConsumerTest { consumer.close(0, TimeUnit.MILLISECONDS); } + @Test + public void consumerHeartbeatTest() { + int rebalanceTimeoutMs = 300; + final int sessionTimeoutMs = 200; + int heartbeatIntervalMs = 10; + + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster(topic, 1); + Node node = cluster.nodes().get(0); + + Metadata metadata = new Metadata(0, Long.MAX_VALUE, false); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + final MockClient client = new MockClient(time, metadata); + client.setNode(node); + PartitionAssignor assignor = new RoundRobinAssignor(); + + final KafkaConsumer consumer = newConsumer(time, client, metadata, assignor, + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 1000); + + consumer.subscribe(Collections.singleton(topic), getConsumerRebalanceListener(consumer)); + client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); + + + client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator); + client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE), coordinator); + + client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node); + client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node); + + consumer.poll(0); + + // heartbeat fails due to rebalance in progress + client.prepareResponseFrom(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + return true; + } + }, new HeartbeatResponse(Errors.REBALANCE_IN_PROGRESS), coordinator); + + // join group + final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(Collections.singletonList(topic))); + + // This member becomes the leader + final JoinGroupResponse leaderResponse = new JoinGroupResponse(Errors.NONE, 1, assignor.name(), "memberId", "memberId", + Collections.singletonMap("memberId", byteBuffer)); + client.prepareResponseFrom(leaderResponse, coordinator); + + // sync group fails due to disconnect + client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE), coordinator, true); + + // should try and find the new coordinator + client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); + + // rejoin group + client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator); + client.prepareResponseFrom(syncGroupResponse(Collections.singletonList(tp0), Errors.NONE), coordinator); + + client.prepareResponseFrom(new MockClient.RequestMatcher() { + @Override + public boolean matches(final AbstractRequest body) { + return body instanceof FetchRequest && ((FetchRequest) body).fetchData().containsKey(tp0); + } + }, fetchResponse(tp0, 1, 1), node); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + System.out.println("Executor started"); + final int expectedHeartbeats = (sessionTimeoutMs - sessionTimeoutMs % heartbeatIntervalMs) / heartbeatIntervalMs; + final long now = time.milliseconds(); + executor.execute(new Runnable() { + @Override + public void run() { + client.poll(sessionTimeoutMs, now); + } + }); + final int skippedHeartbeats = skippedHeartbeats(client, coordinator, time, sessionTimeoutMs + now, expectedHeartbeats); + + assertTrue(skippedHeartbeats <= 1); + consumer.close(0, TimeUnit.MILLISECONDS); + executor.shutdown(); + } + + private int skippedHeartbeats(MockClient client, Node coordinator, Time time, long endTime, int expectedHeartbeats) { + int currentHeartbeats = 0; + while (time.milliseconds() <= endTime) { + AtomicBoolean b = prepareHeartbeatResponse(client, coordinator); + if (b.get()) currentHeartbeats++; + } + return expectedHeartbeats - currentHeartbeats; + } + private void consumerCloseTest(final long closeTimeoutMs, List responses, long waitMs, @@ -1622,7 +1714,7 @@ public class KafkaConsumerTest { SubscriptionState subscriptions = new SubscriptionState(autoResetStrategy); LogContext loggerFactory = new LogContext(); ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time, - retryBackoffMs, requestTimeoutMs, Integer.MAX_VALUE); + retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs); ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator( loggerFactory, consumerClient,