Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
None
-
None
Description
Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
The problem:
The producer holds a pendingRecords value that is incremented on each invoke() and decremented on each callback, used to check if the producer needs to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after flushing the pendingRecords == 0 and asyncException == null (currently, we’re only checking pendingRecords).
A quick fix for this is to check and rethrow async exceptions in the snapshotState method both before and after flushing and pendingRecords becomes 0.