Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5701

FlinkKafkaProducer should check asyncException on checkpoints

    Details

      Description

      Reported in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html

      The problem:

      The producer holds a pendingRecords value that is incremented on each invoke() and decremented on each callback, used to check if the producer needs to sync on pending callbacks on checkpoints.
      On each checkpoint, we should only consider the checkpoint succeeded iff after flushing the pendingRecords == 0 and asyncException == null (currently, we’re only checking pendingRecords).

      A quick fix for this is to check and rethrow async exceptions in the snapshotState method both before and after flushing and pendingRecords becomes 0.

        Attachments

          Activity

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: