Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
2.8.0
-
None
-
None
-
None
Description
We've recently started using static group membership and noticed that when adding a new topic to the subscription, it's not consumed from, regardless of how long the consumer is left to run. A workaround we have is shutting down all consumers in the group for longer than session.timeout.ms, then starting them back up. Is this expected behaviour or a bug?
Sample application:
import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; public class Test { static volatile boolean shutdown = false; static final Object shutdownLock = new Object(); public static void main(String[] args) { Runtime.getRuntime() .addShutdownHook( new Thread( () -> { shutdown = true; synchronized (shutdownLock) { try { shutdownLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } })); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000"); // 5 min props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "instance1"); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic1")); // consumer.subscribe(Arrays.asList("topic1", "topic2")); while (!shutdown) { ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(5)); System.out.println("poll() returned " + records.count() + " records"); } System.out.println("Closing consumer"); consumer.close(); synchronized (shutdownLock) { shutdownLock.notifyAll(); System.out.println("Done closing consumer"); } } }
Steps to reproduce:
0. update bootstrap server config in example code
1. run above application, which consumes from topic1
2. send SIGTERM to process, cleaning closing the consumer
3. modify code to consume from topic1 AND topic2
4. run application again, and see that both topics appear in the logs as being part of the subscription, but they're never assigned, regardless of how long you let the consumer run.
Logs from first run (1 topic subscription):
ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-myGroupID-instance1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = myGroupID group.instance.id = instance1 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 300000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer Kafka version: 2.8.0 Kafka commitId: ebb1d6e21cc92130 Kafka startTimeMs: 1620342287841 [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Subscribed to topic(s): topic1 [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 2147483646 rack: null) [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] (Re-)joining group [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully joined group with generation Generation{generationId=1, memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'} [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Finished assignment for group at generation 1: {instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03=Assignment(partitions=[topic1-0])} [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully synced group in generation Generation{generationId=1, memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'} [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Notifying assignor about the new Assignment(partitions=[topic1-0]) [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Adding newly assigned partitions: topic1-0 [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Found no committed offset for partition topic1-0 [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Resetting offset for partition topic1-0 to position FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: null)], epoch=0}}. poll() returned 0 records poll() returned 0 records poll() returned 0 records poll() returned 0 records poll() returned 0 records Closing consumer [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Revoke previously assigned partitions topic1-0 Metrics scheduler closed Closing reporter org.apache.kafka.common.metrics.JmxReporter Metrics reporters closed App info kafka.consumer for consumer-myGroupID-instance1 unregistered Done closing consumer Process finished with exit code 130 (interrupted by signal 2: SIGINT)
Logs from second run (2 topic subscription):
ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-myGroupID-instance1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = myGroupID group.instance.id = instance1 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 300000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer Kafka version: 2.8.0 Kafka commitId: ebb1d6e21cc92130 Kafka startTimeMs: 1620342351702 [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Subscribed to topic(s): topic1, topic2 [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 2147483646 rack: null) [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] (Re-)joining group [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully joined group with generation Generation{generationId=1, memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'} [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully synced group in generation Generation{generationId=1, memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'} [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Notifying assignor about the new Assignment(partitions=[topic1-0]) [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Adding newly assigned partitions: topic1-0 [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Setting offset for partition topic1-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: null)], epoch=0}} poll() returned 0 records poll() returned 0 records poll() returned 0 records poll() returned 0 records poll() returned 0 records Closing consumer [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Revoke previously assigned partitions topic1-0 Metrics scheduler closed Closing reporter org.apache.kafka.common.metrics.JmxReporter Metrics reporters closed App info kafka.consumer for consumer-myGroupID-instance1 unregistered Done closing consumer Process finished with exit code 130 (interrupted by signal 2: SIGINT)
Attachments
Issue Links
- is related to
-
KAFKA-13435 Static membership protocol should let the leader skip assignment (KIP-814)
- Resolved