Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29827

[Connector][AsyncSinkWriter] Checkpointed states block writer from sending records




      Hi every one,

      Recently we discovered an issue which blocks Sink operators from sending records to client's endpoint.

      To reproduce the issue, we started our Flink app from an existing savepoint, in which some Sink operators hold some buffered records. For instance, app employs KinesisStreamSink with a parallelism of 4. 2 of them has no buffered records, the other 2 start with existing states of some records, which are leftover from the previous run.

      Behavior: during runtime, we sent records (let's say 200) to this sink in rebalance mode. But only 100 of them (50%) were dispatched from the sink operators.

      After investigation, we found that the implementation AsyncSinkWriter invokes submitRequestEntries() to send the records to their destination. This invocation is performed when a callback is performed, a flush(true) or forced-flush is called, or when the buffered is full (either in size or in quantity).

      The case falls in the first scenario: the callback is not registered when the writer starts with some existing buffered records, initialized from savepoint. Hence in our case, those operators were holding records till their buffers become full, while other operators still perform the usual sending.

      Impacted scope: flink-1.15.2 or later version, for any Sink that implements AsyncSinkWriter.

      We currently treat this as an abnormal behavior of Flink, but please let me know if this behavior is intended by design.

      Thanks in advance.




            chalixar Ahmed Hamdy
            mc8max Hoang Tri Tam
            0 Vote for this issue
            2 Start watching this issue