Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12759

Kafka consumers with static group membership won't consume from newly subscribed topics

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 2.8.0
    • None
    • None
    • None

    Description

      We've recently started using static group membership and noticed that when adding a new topic to the subscription, it's not consumed from, regardless of how long the consumer is left to run. A workaround we have is shutting down all consumers in the group for longer than session.timeout.ms, then starting them back up. Is this expected behaviour or a bug?

      Sample application:

      import java.time.Duration;
      import java.util.Arrays;
      import java.util.Properties;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.ByteArrayDeserializer;
      
      public class Test {
        static volatile boolean shutdown = false;
        static final Object shutdownLock = new Object();
      
        public static void main(String[] args) {
          Runtime.getRuntime()
              .addShutdownHook(
                  new Thread(
                      () -> {
                        shutdown = true;
                        synchronized (shutdownLock) {
                          try {
                            shutdownLock.wait();
                          } catch (InterruptedException e) {
                            e.printStackTrace();
                          }
                        }
                      }));
      
          Properties props = new Properties();
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
          props.put(
              ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
              ByteArrayDeserializer.class.getCanonicalName());
          props.put(
              ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
              ByteArrayDeserializer.class.getCanonicalName());
      
          props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
          props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000"); // 5 min
          props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "instance1");
      
          KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
      
          consumer.subscribe(Arrays.asList("topic1"));
          // consumer.subscribe(Arrays.asList("topic1", "topic2"));
      
          while (!shutdown) {
            ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(5));
            System.out.println("poll() returned " + records.count() + " records");
          }
      
          System.out.println("Closing consumer");
          consumer.close();
          synchronized (shutdownLock) {
            shutdownLock.notifyAll();
            System.out.println("Done closing consumer");
          }
        }
      }
      

      Steps to reproduce:
      0. update bootstrap server config in example code
      1. run above application, which consumes from topic1
      2. send SIGTERM to process, cleaning closing the consumer
      3. modify code to consume from topic1 AND topic2
      4. run application again, and see that both topics appear in the logs as being part of the subscription, but they're never assigned, regardless of how long you let the consumer run.

      Logs from first run (1 topic subscription):

      ConsumerConfig values: 
      	allow.auto.create.topics = true
      	auto.commit.interval.ms = 5000
      	auto.offset.reset = latest
      	bootstrap.servers = [localhost:9092]
      	check.crcs = true
      	client.dns.lookup = use_all_dns_ips
      	client.id = consumer-myGroupID-instance1
      	client.rack = 
      	connections.max.idle.ms = 540000
      	default.api.timeout.ms = 60000
      	enable.auto.commit = true
      	exclude.internal.topics = true
      	fetch.max.bytes = 52428800
      	fetch.max.wait.ms = 500
      	fetch.min.bytes = 1
      	group.id = myGroupID
      	group.instance.id = instance1
      	heartbeat.interval.ms = 3000
      	interceptor.classes = []
      	internal.leave.group.on.close = true
      	internal.throw.on.fetch.stable.offset.unsupported = false
      	isolation.level = read_uncommitted
      	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
      	max.partition.fetch.bytes = 1048576
      	max.poll.interval.ms = 300000
      	max.poll.records = 500
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
      	receive.buffer.bytes = 65536
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 30000
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	security.providers = null
      	send.buffer.bytes = 131072
      	session.timeout.ms = 300000
      	socket.connection.setup.timeout.max.ms = 30000
      	socket.connection.setup.timeout.ms = 10000
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
      	ssl.endpoint.identification.algorithm = https
      	ssl.engine.factory.class = null
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.certificate.chain = null
      	ssl.keystore.key = null
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLSv1.3
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.certificates = null
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
      
      Kafka version: 2.8.0
      Kafka commitId: ebb1d6e21cc92130
      Kafka startTimeMs: 1620342287841
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Subscribed to topic(s): topic1
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 2147483646 rack: null)
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] (Re-)joining group
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully joined group with generation Generation{generationId=1, memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'}
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Finished assignment for group at generation 1: {instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03=Assignment(partitions=[topic1-0])}
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully synced group in generation Generation{generationId=1, memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'}
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Notifying assignor about the new Assignment(partitions=[topic1-0])
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Adding newly assigned partitions: topic1-0
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Found no committed offset for partition topic1-0
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Resetting offset for partition topic1-0 to position FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: null)], epoch=0}}.
      poll() returned 0 records
      poll() returned 0 records
      poll() returned 0 records
      poll() returned 0 records
      poll() returned 0 records
      Closing consumer
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Revoke previously assigned partitions topic1-0
      Metrics scheduler closed
      Closing reporter org.apache.kafka.common.metrics.JmxReporter
      Metrics reporters closed
      App info kafka.consumer for consumer-myGroupID-instance1 unregistered
      Done closing consumer
      
      Process finished with exit code 130 (interrupted by signal 2: SIGINT)
      

      Logs from second run (2 topic subscription):

      ConsumerConfig values: 
      	allow.auto.create.topics = true
      	auto.commit.interval.ms = 5000
      	auto.offset.reset = latest
      	bootstrap.servers = [localhost:9092]
      	check.crcs = true
      	client.dns.lookup = use_all_dns_ips
      	client.id = consumer-myGroupID-instance1
      	client.rack = 
      	connections.max.idle.ms = 540000
      	default.api.timeout.ms = 60000
      	enable.auto.commit = true
      	exclude.internal.topics = true
      	fetch.max.bytes = 52428800
      	fetch.max.wait.ms = 500
      	fetch.min.bytes = 1
      	group.id = myGroupID
      	group.instance.id = instance1
      	heartbeat.interval.ms = 3000
      	interceptor.classes = []
      	internal.leave.group.on.close = true
      	internal.throw.on.fetch.stable.offset.unsupported = false
      	isolation.level = read_uncommitted
      	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
      	max.partition.fetch.bytes = 1048576
      	max.poll.interval.ms = 300000
      	max.poll.records = 500
      	metadata.max.age.ms = 300000
      	metric.reporters = []
      	metrics.num.samples = 2
      	metrics.recording.level = INFO
      	metrics.sample.window.ms = 30000
      	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
      	receive.buffer.bytes = 65536
      	reconnect.backoff.max.ms = 1000
      	reconnect.backoff.ms = 50
      	request.timeout.ms = 30000
      	retry.backoff.ms = 100
      	sasl.client.callback.handler.class = null
      	sasl.jaas.config = null
      	sasl.kerberos.kinit.cmd = /usr/bin/kinit
      	sasl.kerberos.min.time.before.relogin = 60000
      	sasl.kerberos.service.name = null
      	sasl.kerberos.ticket.renew.jitter = 0.05
      	sasl.kerberos.ticket.renew.window.factor = 0.8
      	sasl.login.callback.handler.class = null
      	sasl.login.class = null
      	sasl.login.refresh.buffer.seconds = 300
      	sasl.login.refresh.min.period.seconds = 60
      	sasl.login.refresh.window.factor = 0.8
      	sasl.login.refresh.window.jitter = 0.05
      	sasl.mechanism = GSSAPI
      	security.protocol = PLAINTEXT
      	security.providers = null
      	send.buffer.bytes = 131072
      	session.timeout.ms = 300000
      	socket.connection.setup.timeout.max.ms = 30000
      	socket.connection.setup.timeout.ms = 10000
      	ssl.cipher.suites = null
      	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
      	ssl.endpoint.identification.algorithm = https
      	ssl.engine.factory.class = null
      	ssl.key.password = null
      	ssl.keymanager.algorithm = SunX509
      	ssl.keystore.certificate.chain = null
      	ssl.keystore.key = null
      	ssl.keystore.location = null
      	ssl.keystore.password = null
      	ssl.keystore.type = JKS
      	ssl.protocol = TLSv1.3
      	ssl.provider = null
      	ssl.secure.random.implementation = null
      	ssl.trustmanager.algorithm = PKIX
      	ssl.truststore.certificates = null
      	ssl.truststore.location = null
      	ssl.truststore.password = null
      	ssl.truststore.type = JKS
      	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
      
      Kafka version: 2.8.0
      Kafka commitId: ebb1d6e21cc92130
      Kafka startTimeMs: 1620342351702
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Subscribed to topic(s): topic1, topic2
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 2147483646 rack: null)
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] (Re-)joining group
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully joined group with generation Generation{generationId=1, memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'}
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully synced group in generation Generation{generationId=1, memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'}
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Notifying assignor about the new Assignment(partitions=[topic1-0])
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Adding newly assigned partitions: topic1-0
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Setting offset for partition topic1-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: null)], epoch=0}}
      poll() returned 0 records
      poll() returned 0 records
      poll() returned 0 records
      poll() returned 0 records
      poll() returned 0 records
      Closing consumer
      [Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Revoke previously assigned partitions topic1-0
      Metrics scheduler closed
      Closing reporter org.apache.kafka.common.metrics.JmxReporter
      Metrics reporters closed
      App info kafka.consumer for consumer-myGroupID-instance1 unregistered
      Done closing consumer
      
      Process finished with exit code 130 (interrupted by signal 2: SIGINT)
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              apolyakov Andrey Polyakov
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: