Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27962

KafkaSourceReader fails to commit consumer offsets for checkpoints



    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.14.4, 1.15.0
    • 1.16.0
    • Connectors / Kafka
    • None


      The KafkaSourceReader works well for many hours, then fails and re-connects successfully, then continues to work some time. After the first three failures it hangs on "Offset commit failed" and never connected again. Restarting the Flink job does help and it works until the next "3 times fail".

      I am aware about the note that Kafka source does NOT rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring.

      I agree if the failures are only periodic, but I would argue complete failures are unacceptable

      Failed to commit consumer offsets for checkpoint:

      Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
      2022-06-06 14:19:52,297 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464521
      org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
      Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
      2022-06-06 14:20:02,297 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464522
      org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
      Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
      2022-06-06 14:20:02,297 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464523
      org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets
      ..... fails permanently until the job restart

      Consumer Config:

      allow.auto.create.topics = true
      auto.commit.interval.ms = 5000
      auto.offset.reset = none
      bootstrap.servers = [test.host.net:9093]
      check.crcs = true
      client.dns.lookup = use_all_dns_ips
      client.id = test-client-id
      client.rack =
      connections.max.idle.ms = 180000
      default.api.timeout.ms = 60000
      enable.auto.commit = false
      exclude.internal.topics = true
      fetch.max.bytes = 52428800
      fetch.max.wait.ms = 500
      fetch.min.bytes = 1
      group.id = test-group-id
      group.instance.id = null
      heartbeat.interval.ms = 3000
      interceptor.classes = []
      internal.leave.group.on.close = true
      internal.throw.on.fetch.stable.offset.unsupported = false
      isolation.level = read_uncommitted
      key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
      max.partition.fetch.bytes = 1048576
      max.poll.interval.ms = 300000
      max.poll.records = 500
      metadata.max.age.ms = 180000
      metric.reporters = []
      metrics.num.samples = 2
      metrics.recording.level = INFO
      metrics.sample.window.ms = 30000
      partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
      receive.buffer.bytes = 65536
      reconnect.backoff.max.ms = 1000
      reconnect.backoff.ms = 50
      request.timeout.ms = 60000
      retry.backoff.ms = 100
      sasl.client.callback.handler.class = null
      sasl.jaas.config = [hidden]
      sasl.kerberos.kinit.cmd = /usr/bin/kinit
      sasl.kerberos.min.time.before.relogin = 60000
      sasl.kerberos.service.name = null
      sasl.kerberos.ticket.renew.jitter = 0.05
      sasl.kerberos.ticket.renew.window.factor = 0.8
      sasl.login.callback.handler.class = class com.test.kafka.security.AzureAuthenticateCallbackHandler
      sasl.login.class = null
      sasl.login.refresh.buffer.seconds = 300
      sasl.login.refresh.min.period.seconds = 60
      sasl.login.refresh.window.factor = 0.8
      sasl.login.refresh.window.jitter = 0.05
      sasl.mechanism = OAUTHBEARER
      security.protocol = SASL_SSL
      security.providers = null
      send.buffer.bytes = 131072
      session.timeout.ms = 30000
      socket.connection.setup.timeout.max.ms = 30000
      socket.connection.setup.timeout.ms = 10000
      ssl.cipher.suites = null
      ssl.enabled.protocols = [TLSv1.2]
      ssl.endpoint.identification.algorithm = https
      ssl.engine.factory.class = null
      ssl.key.password = null
      ssl.keymanager.algorithm = SunX509
      ssl.keystore.certificate.chain = null
      ssl.keystore.key = null
      ssl.keystore.location = null
      ssl.keystore.password = null
      ssl.keystore.type = JKS
      ssl.protocol = TLSv1.2
      ssl.provider = null
      ssl.secure.random.implementation = null
      ssl.trustmanager.algorithm = PKIX
      ssl.truststore.certificates = null
      ssl.truststore.location = null
      ssl.truststore.password = null
      ssl.truststore.type = JKS
      value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer 


        Issue Links



              Unassigned Unassigned
              igaevd Dmytro
              3 Vote for this issue
              10 Start watching this issue

