Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3211 Add AWS Kinesis streaming connector
  3. FLINK-4080

Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

    XMLWordPrintableJSON

Details

    Description

      I've occasionally experienced unsuccessful ManualExactlyOnceTest after several tries.

      Kinesis records of the same aggregated batch will have the same sequence number, and different sub-sequence numbers (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). The current code of the consumer is committing state every time it finishes processing a record, even de-aggregated ones. This is a bug since this will incorrectly mark all remaining records of the de-aggregated batch as processed in the state.

      Proposed fix:
      1. Use the extended `UserRecord` class in KCL to represent all records (either non- or de-aggregated) instead of the basic `Record` class. This gives access to whether or not the record was originally aggregated.
      2. The sequence number state we are checkpointing needs to be able to indicate that the last seen sequence number of a shard may be a de-aggregated shard, i.e.,

      {"shard0" -> "5:8", "shard1" -> "2"}

      meaning the 8th sub-record of the 5th record was last seen for shard 0. On restore, we start again from record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 we start from record 3 since record 2 is non-aggregated and already fully processed.

      Attachments

        Issue Links

          Activity

            People

              tzulitai Tzu-Li (Gordon) Tai
              tzulitai Tzu-Li (Gordon) Tai
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: