diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index ae2ab8a79..17f24561d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1733,6 +1733,20 @@ public class ConsumerCoordinatorTest { } } + @Test + public void testAUTO_COMMIT_INTERVAL_MS_CONFIG_IS_ZERO() { + try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 0) + ) { + subscriptions.subscribe(singleton(topic1), rebalanceListener); + joinAsFollowerAndReceiveAssignment(coordinator, singletonList(t1p)); + subscriptions.seek(t1p, 100); + prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); + time.sleep(autoCommitIntervalMs); + coordinator.poll(time.timer(Long.MAX_VALUE)); + assertFalse(client.hasPendingResponses()); + } + } + @Test public void testAutoCommitRetryBackoff() { try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) { @@ -2993,7 +3007,8 @@ public class ConsumerCoordinatorTest { private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig, final Metrics metrics, final List assignors, - final boolean autoCommitEnabled) { + final boolean autoCommitEnabled, + final int commitIntervalMs) { return new ConsumerCoordinator( rebalanceConfig, new LogContext(), @@ -3005,11 +3020,18 @@ public class ConsumerCoordinatorTest { consumerId + groupId, time, autoCommitEnabled, - autoCommitIntervalMs, + commitIntervalMs, null, false); } + private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig, + final Metrics metrics, + final List assignors, + final boolean autoCommitEnabled) { + return buildCoordinator(rebalanceConfig, metrics, assignors, autoCommitEnabled, autoCommitIntervalMs); + } + private Collection getRevoked(final List owned, final List assigned) { switch (protocol) {