Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.14.4, 1.15.0
-
None
Description
The KafkaSourceReader works well for many hours, then fails and re-connects successfully, then continues to work some time. After the first three failures it hangs on "Offset commit failed" and never connected again. Restarting the Flink job does help and it works until the next "3 times fail".
I am aware about the note that Kafka source does NOT rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring.
I agree if the failures are only periodic, but I would argue complete failures are unacceptable
Failed to commit consumer offsets for checkpoint:
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-06-06 14:19:52,297 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464521 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-06-06 14:20:02,297 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464522 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-06-06 14:20:02,297 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464523 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets ..... fails permanently until the job restart
Consumer Config:
allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [test.host.net:9093] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = test-client-id client.rack = connections.max.idle.ms = 180000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = test-group-id group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 180000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 60000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = class com.test.kafka.security.AzureAuthenticateCallbackHandler sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = OAUTHBEARER security.protocol = SASL_SSL security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 30000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Attachments
Attachments
Issue Links
- is duplicated by
-
FLINK-28060 Kafka Commit on checkpointing fails repeatedly after a broker restart
- Closed
- is related to
-
FLINK-25293 Option to let fail if KafkaSource keeps failing to commit offset
- Closed
- relates to
-
FLINK-28060 Kafka Commit on checkpointing fails repeatedly after a broker restart
- Closed