Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8104

Consumer cannot rejoin to the group after rebalancing

    XMLWordPrintableJSON

Details

    Description

      TL;DR; KafkaConsumer cannot rejoin to the group due to inconsistent AbstractCoordinator.generation (which is NO_GENERATION and AbstractCoordinator.joinFuture (which is succeeded RequestFuture). See explanation below.

      There are 16 consumers in single process (threads from pool-4-thread-1 to pool-4-thread-16). All of them belong to single consumer group hercules.sink.elastic.legacy_logs_elk_c2. Rebalancing has been acquired and consumers have got CommitFailedException as expected:

      2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN  r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing
      org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298)
      	at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156)
      	at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

      After that, most of them successfully rejoined to the group with generation 10699:

      2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group with generation 10699
      2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned partitions [legacy_logs_elk_c2-18]
      ...
      2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group with generation 10699
      2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11]
      ...
      2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned partitions [legacy_logs_elk_c2-24]
      2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-6, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed since group is rebalancing
      2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed since group is rebalancing
      2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-7, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed since group is rebalancing
      2019-03-10T03:17:13.235Z [pool-4-thread-4] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group with generation -1
      
      

      But one consumer (pool-4-thread-4) got strange generation -1 (see last log record from above).
      Further log records in attached log file.

      Finally, 15 consumers successfully rejoined. But consumer with thread pool-4-thread-4 didn't rejoin:

      2019-03-10T03:17:13.355Z [pool-4-thread-4] ERROR r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
      java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
      	at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
      	at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      2019-03-10T03:17:13.360Z [pool-4-thread-4] ERROR r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
      java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
      	at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
      	at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)}}
      

      It is important to note, that KafkaConsumer.coordinator.joinFuture is not null and succeeded, but ConsumerCoordinator cannot perform resetJoinGroupFuture() due to exception was thrown from onJoinComplete():

                  if (future.succeeded()) {
                      // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
                      ByteBuffer memberAssignment = future.value().duplicate();
                      onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
      
                      // We reset the join group future only after the completion callback returns. This ensures
                      // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
                      resetJoinGroupFuture();
                      needsJoinPrepare = true;
                  }
      

      If I understood correctly, the generation was changed to NO_GENERATION in another thread by one of CoordinatorResponseHandlers.

      consumer-rejoin-fail.log

      Attachments

        1. consumer-rejoin-fail.log
          207 kB
          Gregory Koshelev

        Issue Links

          Activity

            People

              nizhikov Nikolay Izhikov
              kgn Gregory Koshelev
              Votes:
              11 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: