Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5868

Kafka Consumer Rebalancing takes too long

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.2.0, 0.10.2.1, 0.11.0.0
    • 2.5.0
    • streams
    • None

    Description

      up vote
      0
      down vote
      favorite
      1
      I have a Kafka Streams Application which takes data from few topics and joins the data and puts it in another topic.

      *Kafka Configuration: *

      • 5 kafka brokers
      • Kafka Topics - 15 partitions and 3 replication factor.

      Few millions of records are consumed/produced every hour. Whenever I take any kafka broker down, it goes into rebalancing and it takes approx. 30 minutes or sometimes even more for rebalancing. Also it kills many of my Kafka Streams processes.

      Note: My Kafka Streams processes are running on the same machine as of Kafka Broker.

      Anyone has any idea how to solve rebalancing issue in kafka consumer? Also, many times it throws exception while rebalancing.

      This is stopping us from going live in production environment with this setup. Any help would be appreciated.

      _Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
      Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)

      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
      at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
      at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
      at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
      at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
      at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
      at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
      at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
      at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
      at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
      at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
      at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)_

      *Kafka Streams Config: *

      • bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
      • max.poll.records = 100
      • request.timeout.ms=40000

      ConsumerConfig it internally creates is:

      auto.commit.interval.ms = 5000
      auto.offset.reset = earliest
      bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
      check.crcs = true
      client.id = conversion-live-StreamThread-1-restore-consumer
      connections.max.idle.ms = 540000
      enable.auto.commit = false
      exclude.internal.topics = true
      fetch.max.bytes = 52428800
      fetch.max.wait.ms = 500
      fetch.min.bytes = 1
      group.id =
      heartbeat.interval.ms = 3000
      interceptor.classes = null
      internal.leave.group.on.close = false
      isolation.level = read_uncommitted
      key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
      max.partition.fetch.bytes = 1048576
      max.poll.interval.ms = 2147483647
      max.poll.records = 100
      metadata.max.age.ms = 300000
      metric.reporters = []
      metrics.num.samples = 2
      metrics.recording.level = INFO
      metrics.sample.window.ms = 30000
      partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
      receive.buffer.bytes = 65536
      reconnect.backoff.max.ms = 1000
      reconnect.backoff.ms = 50
      request.timeout.ms = 40000
      retry.backoff.ms = 100
      sasl.jaas.config = null
      sasl.kerberos.kinit.cmd = /usr/bin/kinit
      sasl.kerberos.min.time.before.relogin = 60000
      sasl.kerberos.service.name = null
      sasl.kerberos.ticket.renew.jitter = 0.05
      sasl.kerberos.ticket.renew.window.factor = 0.8
      sasl.mechanism = GSSAPI
      security.protocol = PLAINTEXT
      send.buffer.bytes = 131072
      session.timeout.ms = 10000
      ssl.cipher.suites = null
      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
      ssl.endpoint.identification.algorithm = null
      ssl.key.password = null
      ssl.keymanager.algorithm = SunX509
      ssl.keystore.location = null
      ssl.keystore.password = null
      ssl.keystore.type = JKS
      ssl.protocol = TLS
      ssl.provider = null
      ssl.secure.random.implementation = null
      ssl.trustmanager.algorithm = PKIX
      ssl.truststore.location = null
      ssl.truststore.password = null
      ssl.truststore.type = JKS
      value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              nandishkotadia Nandish Kotadia
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: