Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11164

Refreshing expired shard iterator fails if no records were retrieved from shard since ShardConsumer was started in FlinkKinesisConsumer

    XMLWordPrintableJSON

    Details

      Description

      Originally reported by: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Encountered-the-following-Expired-Iterator-exception-in-getRecords-using-FlinkKinesisConsumer-td25093.html

      In ShardConsumer.getRecords(...), we catch any expired iterator exceptions so that in the case no data was written to the Kinesis shard, we last iterator which eventually expires doesn't just fail the job, by doing:

      shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
      

      The problem is that, if absolutely no records were retrieved at all from the shard since the ShardConsumer was started, then lastSequenceNumber would still be a sentinel value (e.g. EARLIEST_SEQUENCE_NUMBER, LATEST_SEQUENCE_NUMBER, etc.) instead of an actual value. This isn't recognized by Kinesis, and would fail the job.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                aljoscha Aljoscha Krettek
                Reporter:
                tzulitai Tzu-Li (Gordon) Tai
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m