Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.18.0
-
None
Description
In the current implementation of KafkaSource and KafkaSink there are some assumption that were made:
- KafkaSource completely relies on Checkpoint to manage and track its offset in KafkaSourceReader<T> class
- KafkaSink in KafkaWriter<IN> class only performs catch-flush when DeliveryGuarantee.EXACTLY_ONCE is specified.
KafkaSource is assuming that checkpoint should be properly fenced and everything it had read up-til checkpoint being initiated will be processed or recorded by operators downstream, including the TwoPhaseCommiter such as KafkaSink
KafkaSink goes by the model of:
flush -> prepareCommit -> commit
In a scenario that:
- KafkaSource ingested records #1 to #100
- KafkaSink only had chance to send records #1 to #96
- with a batching interval of 5ms
when checkpoint has been initiated, flush will only confirm the sending of record #1 to #96.
This allows checkpoint to proceed as there's no error, and record #97 to 100 will be batched after first flush.
Now, if broker goes down / has issue that caused the internal KafkaProducer to not be able to send out the record after a batch, and is on a constant retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), WriterCallback error handling will never be triggered until the next checkpoint flush.
This can be tested by creating a faulty Kafka cluster and run the following code:
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER); props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); props.put(ProducerConfig.ACKS_CONFIG, "all"); final KafkaProducer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 0; i < 10; i++) { System.out.printf("sending record #%d\n", i); String data = UUID.randomUUID().toString(); final ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), data); producer.send(record, new CB(Integer.toString(i), data)); Thread.sleep(10000); //sleep for 10 seconds } } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("flushing"); producer.flush(); System.out.println("closing"); producer.close(); }
Once callback returns due to network timeout, it will cause Flink to restart from previously saved checkpoint (which recorded reading up to record #100), but KafkaWriter never sent record #97 to #100.
This will result in dataloss of record #97 to #100
Because KafkaWriter only catches error after callback, if callback is never invoked (due to broker issue) right after the first flush has taken place, those records are effectively gone unless someone decided to go back and look for it.
This behavior should be ok if user has set DeliveryGuarantee.NONE, but is not expected for DeliveryGuarantee.AT_LEAST_ONCE.
There is a divergence of the process in the event of EXACTLY_ONCE.
prepareCommit will produce a list of KafkaCommittable that corresponds to Transactional KafkaProducer to be committed. And a catch up flush will take place during commit step. Whether this was intentional or not, due to the fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the end of EXACTLY_ONCE actually ensured everything fenced in the current checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
Due the above finding, I'm recommending one of the following fixes:
- need to perform second flush for AT_LEAST_ONCE
- or move flush to the end of the KafkaSink process.
I'm leaning towards 2nd option as it does not make sense to flush then do checkpoint, it should be right before checkpoint completes then we flush, given that's what commit is meant to do.
Attachments
Issue Links
- is related to
-
FLINK-35749 Kafka sink component will lose data when kafka cluster is unavailable for a while
- Resolved
- links to