Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9374

Flink Kinesis Producer does not backpressure




      The FlinkKinesisProducer just accepts records and forwards it to a KinesisProducer from the Amazon Kinesis Producer Library (KPL). The KPL internally holds an unbounded queue of records that have not yet been sent.

      Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow indefinitely if Flink sends records faster than the KPL can forward them to Kinesis.

      One way to circumvent this problem is to set a record TTL, so that queued records are dropped after a certain amount of time, but this will lead to data loss under high loads.

      Currently the only time the queue is flushed is during checkpointing: FlinkKinesisProducer consumes records at arbitrary rate, either until a checkpoint is reached (and will wait until the queue is flushed), or until out-of-memory, whichever is reached first. (This gets worse due to the fact that the Java KPL is only a thin wrapper around a C++ process, so it is not even the Java process that runs out of memory, but the C++ process.) The implicit rate-limit due to checkpointing leads to a ragged throughput graph like this (the periods with zero throughput are the wait times before a checkpoint):

      Throughput limited by checkpointing only

      My proposed solution is to add a config option queueLimit to set a maximum number of records that may be waiting in the KPL queue. If this limit is reached, the FlinkKinesisProducer should trigger a flush() and wait (blocking) until the queue length is below the limit again. This automatically leads to backpressuring, since the FlinkKinesisProducer cannot accept records while waiting. For compatibility, queueLimit is set to Integer.MAX_VALUE by default, so the behavior is unchanged unless a client explicitly sets the value. Setting a »sane« default value is not possible unfortunately, since sensible values for the limit depend on the record size (the limit should be chosen so that about 10–100MB of records per shard are accumulated before flushing, otherwise the maximum Kinesis throughput may not be reached).

      Throughput with a queue limit of 100000 records (the spikes are checkpoints, where the queue is still flushed completely)


        1. after.png
          22 kB
          Franz Thoma
        2. before.png
          27 kB
          Franz Thoma

          Issue Links



              • Assignee:
                fmthoma Franz Thoma
                fmthoma Franz Thoma
              • Votes:
                0 Vote for this issue
                5 Start watching this issue


                • Created: