Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4514

ExpiredIteratorException in Kinesis Consumer on long catch-ups to head of stream

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Resolved
    • Affects Version/s: 1.1.0, 1.1.1
    • Fix Version/s: 1.2.0, 1.1.3
    • Component/s: Kinesis Connector
    • Labels:
      None

      Description

      Original mailing thread for the reported issue:
      http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-connector-Iterator-expired-exception-td8711.html

      Normally, the exception is thrown when the consumer uses the same shard iterator after 5 minutes since it was retrieved. I've still yet to clarify & reproduce the root cause of the ExpiredIteratorException, because from the code this seems to be impossible. I'm leaning towards suspecting this is a Kinesis-side issue (from the description in the ML, the behaviour also seems indeterminate).

      Either way, the exception can be fairly easily handled so that the consumer doesn't just fail. When caught, we request a new shard iterator from Kinesis with the last sequence number.

        Issue Links

          Activity

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master in http://git-wip-us.apache.org/repos/asf/flink/commit/b7d83899 Resolved for 1.1.2 in http://git-wip-us.apache.org/repos/asf/flink/commit/df72667b
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2432

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2432
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2432

          Merging ...

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2432 Merging ...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2432

          Thanks for the confirmation.
          The build will probably fail again due to an unused import: https://travis-ci.org/tzulitai/flink/jobs/156191304, which was just hotfixed, so we need to rebase again. But I think it's ok to merge this now, because the tests for all the connectors had passed the last run before it was rebased on the bucketed rolling sink.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2432 Thanks for the confirmation. The build will probably fail again due to an unused import: https://travis-ci.org/tzulitai/flink/jobs/156191304 , which was just hotfixed, so we need to rebase again. But I think it's ok to merge this now, because the tests for all the connectors had passed the last run before it was rebased on the bucketed rolling sink.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2432

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2432 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2432

          Max seems to have just hotfixed the failing flink-mesos tests.
          Rebasing this PR on latest master. Merging this once Travis turns green.
          I'll open a separate JIRA to improve the fetch interval implementation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2432 Max seems to have just hotfixed the failing flink-mesos tests. Rebasing this PR on latest master. Merging this once Travis turns green. I'll open a separate JIRA to improve the fetch interval implementation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2432#discussion_r76620194

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java —
          @@ -219,19 +228,52 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
          subscribedShard.getStreamName(),
          subscribedShard.getShard().getShardId());

          • if (record.isAggregated()) { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())); - }

            else

            { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber())); + SequenceNumber collectedSequenceNumber = (record.isAggregated()) + ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) + : new SequenceNumber(record.getSequenceNumber()); + + fetcherRef.emitRecordAndUpdateState( + value, + approxArrivalTimestamp, + subscribedShardStateIndex, + collectedSequenceNumber); + + lastSequenceNum = collectedSequenceNumber; + }

            +
            + /**
            + * Calls

            {@link KinesisProxyInterface#getRecords(String, int)}

            , while also handling unexpected
            + * AWS

            {@link ExpiredIteratorException}

            s to assure that we get results and don't just fail on
            + * such occasions. The returned shard iterator within the successful

            {@link GetRecordsResult}

            should
            + * be used for the next call to this method.
            + *
            + * Note: it is important that this method is not called again before all the records from the last result have been
            + * fully collected with

            {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}

            , otherwise
            + *

            {@link ShardConsumer#lastSequenceNum}

            may refer to a sub-record in the middle of an aggregated record, leading to
            + * incorrect shard iteration if the iterator had to be refreshed.
            + *
            + * @param shardItr shard iterator to use
            + * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
            + * @return get records result
            + * @throws InterruptedException
            + */
            + private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
            + GetRecordsResult getRecordsResult = null;
            + while (getRecordsResult == null) {
            + try

            { + getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); + }

            catch (ExpiredIteratorException eiEx) {
            + LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
            + " refreshing the iterator ...", shardItr, subscribedShard);
            + shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
            +
            + // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
            + if (fetchIntervalMillis != 0)

            { + Thread.sleep(fetchIntervalMillis); + }
              • End diff –

          Sorry for the race commit, didn't realize you was still reviewing.

          I agree. So, if we're to limit the fetch interval configuration to 5 minutes, then we'll likely infinitely get stuck in this loop, right? I think that was what I had in mind for a more strict 4.5 min, to assure this doesn't happen But still, logically, we never know what the `n` will be.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2432#discussion_r76620194 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java — @@ -219,19 +228,52 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record) subscribedShard.getStreamName(), subscribedShard.getShard().getShardId()); if (record.isAggregated()) { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())); - } else { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber())); + SequenceNumber collectedSequenceNumber = (record.isAggregated()) + ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) + : new SequenceNumber(record.getSequenceNumber()); + + fetcherRef.emitRecordAndUpdateState( + value, + approxArrivalTimestamp, + subscribedShardStateIndex, + collectedSequenceNumber); + + lastSequenceNum = collectedSequenceNumber; + } + + /** + * Calls {@link KinesisProxyInterface#getRecords(String, int)} , while also handling unexpected + * AWS {@link ExpiredIteratorException} s to assure that we get results and don't just fail on + * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should + * be used for the next call to this method. + * + * Note: it is important that this method is not called again before all the records from the last result have been + * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)} , otherwise + * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to + * incorrect shard iteration if the iterator had to be refreshed. + * + * @param shardItr shard iterator to use + * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt + * @return get records result + * @throws InterruptedException + */ + private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { + GetRecordsResult getRecordsResult = null; + while (getRecordsResult == null) { + try { + getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); + } catch (ExpiredIteratorException eiEx) { + LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + + " refreshing the iterator ...", shardItr, subscribedShard); + shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + + // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator + if (fetchIntervalMillis != 0) { + Thread.sleep(fetchIntervalMillis); + } End diff – Sorry for the race commit, didn't realize you was still reviewing. I agree. So, if we're to limit the fetch interval configuration to 5 minutes, then we'll likely infinitely get stuck in this loop, right? I think that was what I had in mind for a more strict 4.5 min, to assure this doesn't happen But still, logically, we never know what the `n` will be.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2432#discussion_r76617364

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java —
          @@ -219,19 +228,52 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
          subscribedShard.getStreamName(),
          subscribedShard.getShard().getShardId());

          • if (record.isAggregated()) { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())); - }

            else

            { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber())); + SequenceNumber collectedSequenceNumber = (record.isAggregated()) + ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) + : new SequenceNumber(record.getSequenceNumber()); + + fetcherRef.emitRecordAndUpdateState( + value, + approxArrivalTimestamp, + subscribedShardStateIndex, + collectedSequenceNumber); + + lastSequenceNum = collectedSequenceNumber; + }

            +
            + /**
            + * Calls

            {@link KinesisProxyInterface#getRecords(String, int)}

            , while also handling unexpected
            + * AWS

            {@link ExpiredIteratorException}

            s to assure that we get results and don't just fail on
            + * such occasions. The returned shard iterator within the successful

            {@link GetRecordsResult}

            should
            + * be used for the next call to this method.
            + *
            + * Note: it is important that this method is not called again before all the records from the last result have been
            + * fully collected with

            {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}

            , otherwise
            + *

            {@link ShardConsumer#lastSequenceNum}

            may refer to a sub-record in the middle of an aggregated record, leading to
            + * incorrect shard iteration if the iterator had to be refreshed.
            + *
            + * @param shardItr shard iterator to use
            + * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
            + * @return get records result
            + * @throws InterruptedException
            + */
            + private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
            + GetRecordsResult getRecordsResult = null;
            + while (getRecordsResult == null) {
            + try

            { + getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); + }

            catch (ExpiredIteratorException eiEx) {
            + LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
            + " refreshing the iterator ...", shardItr, subscribedShard);
            + shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
            +
            + // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
            + if (fetchIntervalMillis != 0)

            { + Thread.sleep(fetchIntervalMillis); + }
              • End diff –

          This fetchInterval implementation can lead to much larger fetch intervals.
          If the getRecords call needs `n` milliseconds, the waiting time between each `getRecords` call is is `n + fetchInterval`.
          We don't need to fix this in this PR, but I think in general, we should fix it (if you agree). Also, we need to see how we make this efficient (System.currentTimeMilis() is a somewhat expensive call).

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2432#discussion_r76617364 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java — @@ -219,19 +228,52 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record) subscribedShard.getStreamName(), subscribedShard.getShard().getShardId()); if (record.isAggregated()) { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())); - } else { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber())); + SequenceNumber collectedSequenceNumber = (record.isAggregated()) + ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) + : new SequenceNumber(record.getSequenceNumber()); + + fetcherRef.emitRecordAndUpdateState( + value, + approxArrivalTimestamp, + subscribedShardStateIndex, + collectedSequenceNumber); + + lastSequenceNum = collectedSequenceNumber; + } + + /** + * Calls {@link KinesisProxyInterface#getRecords(String, int)} , while also handling unexpected + * AWS {@link ExpiredIteratorException} s to assure that we get results and don't just fail on + * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should + * be used for the next call to this method. + * + * Note: it is important that this method is not called again before all the records from the last result have been + * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)} , otherwise + * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to + * incorrect shard iteration if the iterator had to be refreshed. + * + * @param shardItr shard iterator to use + * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt + * @return get records result + * @throws InterruptedException + */ + private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { + GetRecordsResult getRecordsResult = null; + while (getRecordsResult == null) { + try { + getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); + } catch (ExpiredIteratorException eiEx) { + LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + + " refreshing the iterator ...", shardItr, subscribedShard); + shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + + // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator + if (fetchIntervalMillis != 0) { + Thread.sleep(fetchIntervalMillis); + } End diff – This fetchInterval implementation can lead to much larger fetch intervals. If the getRecords call needs `n` milliseconds, the waiting time between each `getRecords` call is is `n + fetchInterval`. We don't need to fix this in this PR, but I think in general, we should fix it (if you agree). Also, we need to see how we make this efficient (System.currentTimeMilis() is a somewhat expensive call).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2432

          The latest commit sets the check to be less than 5 minutes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2432 The latest commit sets the check to be less than 5 minutes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2432

          I think a check for the interval to be lower than 5 minutes is sufficient. Setting the limit to 4.5 min seems to be a bit too strict. You never know if some advanced users want to cover a very specific use case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2432 I think a check for the interval to be lower than 5 minutes is sufficient. Setting the limit to 4.5 min seems to be a bit too strict. You never know if some advanced users want to cover a very specific use case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2432

          I think it'll also make sense to limit the config setting `ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` to be lower than the shard iterator expire time, otherwise the shard iterator will definitely timeout on the next `getRecords()`.

          AWS documentation says the expire is 5 minutes (http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html), I propose to set the limit to be 4.5 min, although I don't expect any user would actually set such a high value.

          Adding this now...

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2432 I think it'll also make sense to limit the config setting `ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` to be lower than the shard iterator expire time, otherwise the shard iterator will definitely timeout on the next `getRecords()`. AWS documentation says the expire is 5 minutes ( http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html ), I propose to set the limit to be 4.5 min, although I don't expect any user would actually set such a high value. Adding this now...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2432

          @rmetzger I know it might be a bit of a rush, but could you have a quick look at this too?
          It's not a critical blocker, but might as well would be good to make it into the 1.1.2 patch freeze.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2432 @rmetzger I know it might be a bit of a rush, but could you have a quick look at this too? It's not a critical blocker, but might as well would be good to make it into the 1.1.2 patch freeze.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/2432

          FLINK-4514[kinesis-connector] Handle unexpected ExpiredIteratorExceptions

          Handle any unexpected ExpiredIteratorException}}s on {{getRecords() calls be refreshing the failing shard iterator with a new one.

          A user reported this issue when replaying Kinesis data over a wide time span, but then the consumer was back to normal after the consumer caught up with the latest data. I tried to reproduce the exception, but have come short to be able to reproduce. The behaviour seems to be inconsistent.

          Therefore, this change treats the exception as "unexpected" by simply catching the exception and refreshing the iterator. There's actually no guarantee of how much time had passed between each getRecords() request anyways, so this is a simple way to handle this.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-4514

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2432.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2432


          commit df833ddbca9971b5f03417efb65527408a8ad9c4
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2016-08-29T09:30:39Z

          FLINK-4514[kinesis-connector] Handle unexpected ExpiredIteratorExceptions


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2432 FLINK-4514 [kinesis-connector] Handle unexpected ExpiredIteratorExceptions Handle any unexpected ExpiredIteratorException}}s on {{getRecords() calls be refreshing the failing shard iterator with a new one. A user reported this issue when replaying Kinesis data over a wide time span, but then the consumer was back to normal after the consumer caught up with the latest data. I tried to reproduce the exception, but have come short to be able to reproduce. The behaviour seems to be inconsistent. Therefore, this change treats the exception as "unexpected" by simply catching the exception and refreshing the iterator. There's actually no guarantee of how much time had passed between each getRecords() request anyways, so this is a simple way to handle this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4514 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2432.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2432 commit df833ddbca9971b5f03417efb65527408a8ad9c4 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2016-08-29T09:30:39Z FLINK-4514 [kinesis-connector] Handle unexpected ExpiredIteratorExceptions

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development