Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Bug
-
1.10.1
-
None
-
None
-
It is a bit unclear to me under what circumstances this can be reproduced. I created a "minimum" non-working example at https://github.com/jcaesar/flink-kafka-ha-failure. Note that this deploys the minimum number of Kafka brokers, but it works just as well with replication factor 3 and 8 brokers, e.g.
I run this with
docker-compose kill; and docker-compose rm -vf; and docker-compose up --abort-on-container-exit --build
The exception should appear on the webui after 5~6 minutes.
To make sure that this isn't dependent on my machine, I've also checked reproducibility on a m5a.2xlarge EC2 instance.You verify that the Kafka cluster is running "normally" e.g. with:
kafkacat -b localhost,localhost:9093 -L
So far, I only know that
- flink.partition-discovery.interval-millis must be set.
- The broker that failed must be part of the bootstrap.servers
- There needs to be a certain amount of topics or producers, but I'm unsure which is crucial
- Changing the values of metadata.request.timeout.ms or flink.partition-discovery.interval-millis does not seem to have any effect.
It is a bit unclear to me under what circumstances this can be reproduced. I created a "minimum" non-working example at https://github.com/jcaesar/flink-kafka-ha-failure . Note that this deploys the minimum number of Kafka brokers, but it works just as well with replication factor 3 and 8 brokers, e.g. I run this with docker-compose kill; and docker-compose rm -vf; and docker-compose up --abort-on-container-exit --build The exception should appear on the webui after 5~6 minutes. To make sure that this isn't dependent on my machine, I've also checked reproducibility on a m5a.2xlarge EC2 instance. You verify that the Kafka cluster is running "normally" e.g. with: kafkacat -b localhost,localhost:9093 -L So far, I only know that flink.partition-discovery.interval-millis must be set. The broker that failed must be part of the bootstrap.servers There needs to be a certain amount of topics or producers, but I'm unsure which is crucial Changing the values of metadata.request.timeout.ms or flink.partition-discovery.interval-millis does not seem to have any effect.
Description
When a Kafka broker fails that is listed among the bootstrap servers and partition discovery is active, the Flink job reading from that Kafka may enter a failing loop.
At first, the job seems to react normally without failure with only a short latency spike when switching Kafka leaders.
Then, it fails with a
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:821) at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136) at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355) at java.lang.Thread.run(Thread.java:748)
It recovers, but processes fewer than the expected amount of records.
Finally, the job fails with
2020-06-05 13:59:37 org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
and repeats doing so while not processing any records. (The exception comes without any backtrace or otherwise interesting information)
I have also observed this behavior with partition-discovery turned off, but only when the Flink job failed (after a broker failure) and had to run checkpoint recovery for some other reason.
Please see the [Environment] description for information on how to reproduce the issue.