Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4523

Allow Kinesis Consumer to start from specific timestamp / Date

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Kinesis Connector
    • Labels:
      None

      Description

      We had a Kinesis user requesting this feature on an offline chat.

      To be specific, we let all initial Kinesis shards be iterated starting from records at the given timestamp.
      The AWS Java SDK we're using already provides API for this, so we can add this functionality with fairly low overhead: http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tony810430 opened a pull request:

          https://github.com/apache/flink/pull/2916

          FLINK-4523 [kinesis] Allow Kinesis Consumer to start from specific timestamp / Date

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tony810430/flink FLINK-4523

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2916.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2916


          commit e2d2cf2e604e329523c53ebfb1b4c4597687211b
          Author: 魏偉哲 <tonywei@tonyweis-macbook-pro.local>
          Date: 2016-12-01T03:40:46Z

          FLINK-4523 Allow Kinesis Consumer to start from specific timestamp / Date


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tony810430 opened a pull request: https://github.com/apache/flink/pull/2916 FLINK-4523 [kinesis] Allow Kinesis Consumer to start from specific timestamp / Date You can merge this pull request into a Git repository by running: $ git pull https://github.com/tony810430/flink FLINK-4523 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2916.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2916 commit e2d2cf2e604e329523c53ebfb1b4c4597687211b Author: 魏偉哲 <tonywei@tonyweis-macbook-pro.local> Date: 2016-12-01T03:40:46Z FLINK-4523 Allow Kinesis Consumer to start from specific timestamp / Date
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90410957

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -57,6 +60,17 @@ public static void validateConsumerConfiguration(Properties config) {
          }
          throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + sb.toString());
          }
          +
          + // specified initial timestamp in stream when using AT_TIMESTAMP
          + if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) {
          + if (!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP))

          { + throw new IllegalArgumentException("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + }

          + validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
          + "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
          + + "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
          — End diff –

          That's really good error messaging for the user Like I mentioned above, we need to inform this in the document as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410957 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -57,6 +60,17 @@ public static void validateConsumerConfiguration(Properties config) { } throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + sb.toString()); } + + // specified initial timestamp in stream when using AT_TIMESTAMP + if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) { + if (!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) { + throw new IllegalArgumentException("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + } + validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, + "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " + + "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 ."); — End diff – That's really good error messaging for the user Like I mentioned above, we need to inform this in the document as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90410520

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java —
          @@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe
          */
          @Override
          public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException

          { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withStartingSequenceNumber(startingSeqNum); + return getShardIterator(getShardIteratorRequest); + }

          +
          + /**
          + *

          {@inheritDoc}

          + */
          + @Override
          + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nonnull final Date startingTimestamp) throws InterruptedException {
          + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
          + .withStreamName(shard.getStreamName())
          + .withShardId(shard.getShard().getShardId())
          + .withShardIteratorType(shardIteratorType)
          + .withTimestamp(startingTimestamp);
          — End diff –

          How does the Kinesis API behave, when you set both a non-matching shard iterator type, and a timestamp?
          In other words, what happens when the shard iterator type is perhaps `TRIM_HORIZON`, but a timestamp is also provided?

          If the API call will fail with Kinesis doesn't allow such combinations, we probably should make this method implementation fail-proof of such situations.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410520 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java — @@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe */ @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withStartingSequenceNumber(startingSeqNum); + return getShardIterator(getShardIteratorRequest); + } + + /** + * {@inheritDoc} + */ + @Override + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nonnull final Date startingTimestamp) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withTimestamp(startingTimestamp); — End diff – How does the Kinesis API behave, when you set both a non-matching shard iterator type, and a timestamp? In other words, what happens when the shard iterator type is perhaps `TRIM_HORIZON`, but a timestamp is also provided? If the API call will fail with Kinesis doesn't allow such combinations, we probably should make this method implementation fail-proof of such situations.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90410148

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java —
          @@ -122,12 +138,14 @@ public void run() {
          if (subscribedShard.isClosed())

          { nextShardItr = null; }

          else {

          • nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
            + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), (String) null);
              • End diff –

          Is the String type casting necessary?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410148 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java — @@ -122,12 +138,14 @@ public void run() { if (subscribedShard.isClosed()) { nextShardItr = null; } else { nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), (String) null); End diff – Is the String type casting necessary?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90410691

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -35,6 +37,7 @@

          • Utilities for Flink Kinesis connector configuration.
            */
            public class KinesisConfigUtil {
            + public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
              • End diff –

          We probably need to update the Kinesis document as well, to inform the users of this format.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410691 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -35,6 +37,7 @@ Utilities for Flink Kinesis connector configuration. */ public class KinesisConfigUtil { + public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); End diff – We probably need to update the Kinesis document as well, to inform the users of this format.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90410106

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java —
          @@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
          this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
          ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
          Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
          +
          + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
          — End diff –

          I think it might be cleaner if we extract / parse the `initTimestamp` in the sentinel sequence number case determination at the beginning of run().

          As for checking if the timestamp is parseable, we should do that when validating the user-provided property configs locally at the job client. You can take a look at the constructor code of `FlinkKinesisConsumer` - that's where we validate the properties. This validation is done at the job client, so that if in any case the properties is errorneous / unparseable, we can handle that before a Flink job is actually launched.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410106 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java — @@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { — End diff – I think it might be cleaner if we extract / parse the `initTimestamp` in the sentinel sequence number case determination at the beginning of run(). As for checking if the timestamp is parseable, we should do that when validating the user-provided property configs locally at the job client. You can take a look at the constructor code of `FlinkKinesisConsumer` - that's where we validate the properties. This validation is done at the job client, so that if in any case the properties is errorneous / unparseable, we can handle that before a Flink job is actually launched.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90411159

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -215,4 +229,18 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }
          }
          +
          + private static void validateOptionalDateProperty(Properties config, String key, String message) {
          — End diff –

          Ah, you've already checked the parsing of Date format here.
          I'd say we don't really need to do the `ParseException` handling in `ShardConsumer` then, because that should never fail.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90411159 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -215,4 +229,18 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } } + + private static void validateOptionalDateProperty(Properties config, String key, String message) { — End diff – Ah, you've already checked the parsing of Date format here. I'd say we don't really need to do the `ParseException` handling in `ShardConsumer` then, because that should never fail.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90411420

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java —
          @@ -145,6 +145,39 @@ public void testUnrecognizableStreamInitPositionTypeInConfig() {
          }

          @Test
          + public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig()

          { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + }

          +
          + @Test
          + public void testUnparsableDateForInitialTimestampInConfig() {
          + exception.expect(IllegalArgumentException.class);
          + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
          — End diff –

          I would suggest to not let the expect message be this verbose in tests.
          This increases the likeliness that the tests will need to be altered, whenever we want to tweak the messages a bit.
          I think `Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream` is enough.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90411420 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java — @@ -145,6 +145,39 @@ public void testUnrecognizableStreamInitPositionTypeInConfig() { } @Test + public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDateForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " — End diff – I would suggest to not let the expect message be this verbose in tests. This increases the likeliness that the tests will need to be altered, whenever we want to tweak the messages a bit. I think `Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream` is enough.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90408543

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java —
          @@ -53,6 +56,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
          /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
          public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";

          + /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
          + public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.init.timestamp";
          — End diff –

          What do you think about renaming this to `flink.stream.initpos.timestamp`, instead of `flink.stream.init.timestamp`?
          I personally think it's a good idea to do so, because essentially `STREAM_INITIAL_TIMESTAMP` is a sub-setting of `STREAM_INITIAL_POSITION`. In other words, `STREAM_INITIAL_TIMESTAMP` will be meaningless is a `STREAM_INITIAL_POSITION = AT_TIMESTAMP` isn't set in the config.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90408543 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java — @@ -53,6 +56,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; + /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ + public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.init.timestamp"; — End diff – What do you think about renaming this to `flink.stream.initpos.timestamp`, instead of `flink.stream.init.timestamp`? I personally think it's a good idea to do so, because essentially `STREAM_INITIAL_TIMESTAMP` is a sub-setting of `STREAM_INITIAL_POSITION`. In other words, `STREAM_INITIAL_TIMESTAMP` will be meaningless is a `STREAM_INITIAL_POSITION = AT_TIMESTAMP` isn't set in the config.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90420204

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java —
          @@ -145,6 +145,39 @@ public void testUnrecognizableStreamInitPositionTypeInConfig() {
          }

          @Test
          + public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig()

          { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + }

          +
          + @Test
          + public void testUnparsableDateForInitialTimestampInConfig() {
          + exception.expect(IllegalArgumentException.class);
          + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
          — End diff –

          ok

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420204 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java — @@ -145,6 +145,39 @@ public void testUnrecognizableStreamInitPositionTypeInConfig() { } @Test + public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDateForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " — End diff – ok
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90420632

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java —
          @@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe
          */
          @Override
          public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException

          { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withStartingSequenceNumber(startingSeqNum); + return getShardIterator(getShardIteratorRequest); + }

          +
          + /**
          + *

          {@inheritDoc}

          + */
          + @Override
          + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nonnull final Date startingTimestamp) throws InterruptedException {
          + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
          + .withStreamName(shard.getStreamName())
          + .withShardId(shard.getShard().getShardId())
          + .withShardIteratorType(shardIteratorType)
          + .withTimestamp(startingTimestamp);
          — End diff –

          I will check the shard iterator type in the new method by merging these two 'getShardIterator' methods.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420632 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java — @@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe */ @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withStartingSequenceNumber(startingSeqNum); + return getShardIterator(getShardIteratorRequest); + } + + /** + * {@inheritDoc} + */ + @Override + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nonnull final Date startingTimestamp) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withTimestamp(startingTimestamp); — End diff – I will check the shard iterator type in the new method by merging these two 'getShardIterator' methods.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90420737

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java —
          @@ -122,12 +138,14 @@ public void run() {
          if (subscribedShard.isClosed())

          { nextShardItr = null; }

          else {

          • nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
            + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), (String) null);
              • End diff –

          I will solve this problem with the problem below.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420737 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java — @@ -122,12 +138,14 @@ public void run() { if (subscribedShard.isClosed()) { nextShardItr = null; } else { nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), (String) null); End diff – I will solve this problem with the problem below.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90420855

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java —
          @@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
          this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
          ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
          Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
          +
          + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
          — End diff –

          Because consumerConfig can't be accessed in 'run()'

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420855 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java — @@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { — End diff – Because consumerConfig can't be accessed in 'run()'
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90420861

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java —
          @@ -53,6 +56,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
          /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
          public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";

          + /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
          + public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.init.timestamp";
          — End diff –

          ok

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420861 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java — @@ -53,6 +56,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; + /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ + public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.init.timestamp"; — End diff – ok
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90606695

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java —
          @@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
          this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
          ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
          Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
          +
          + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
          — End diff –

          Ah I see, sorry for missing that. The fix for this seems good.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90606695 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java — @@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { — End diff – Ah I see, sorry for missing that. The fix for this seems good.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90607632

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java —
          @@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe

          • {@inheritDoc}

            */
            @Override

          • public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
            + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
            + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
            + .withStreamName(shard.getStreamName())
            + .withShardId(shard.getShard().getShardId())
            + .withShardIteratorType(shardIteratorType);
            +
            + switch (ShardIteratorType.fromValue(shardIteratorType)) {
            + case TRIM_HORIZON:
            + case LATEST:
            + break;
            + case AT_TIMESTAMP:
            + getShardIteratorRequest.setTimestamp((Date) startingMarker);
              • End diff –

          For the new implementation of this method, we probably should handle type casting exceptions, and wrap them as `IllegalArgumentException`s. Since we're doing the property config validating locally, this really shouldn't happen, but it would be good to add handling type cast errors here to make this class self-contained.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90607632 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java — @@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe {@inheritDoc} */ @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType); + + switch (ShardIteratorType.fromValue(shardIteratorType)) { + case TRIM_HORIZON: + case LATEST: + break; + case AT_TIMESTAMP: + getShardIteratorRequest.setTimestamp((Date) startingMarker); End diff – For the new implementation of this method, we probably should handle type casting exceptions, and wrap them as `IllegalArgumentException`s. Since we're doing the property config validating locally, this really shouldn't happen, but it would be good to add handling type cast errors here to make this class self-contained.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90608161

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java —
          @@ -34,14 +34,16 @@
          *

          • @param shard the shard to get the iterator
          • @param shardIteratorType the iterator type, defining how the shard is to be iterated
          • * (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
          • * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST
            + * (one of: TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
            + * @param startingMarker is null if shardIteratorType is TRIM_HORIZON or LATEST,
            + * is as a timestamp if shardIteratorType is AT_TIMESTAMP,
            + * is as a sequence number if shardIteratorType is AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER
              • End diff –

          I would suggest the following changes to the Javadoc here to be more specific:

          is null --> "should be

          {@code null}

          if ..."
          is as a timestamp --> "should be a

          {@code Date}

          value if ..."
          is as a sequence number --> "should be a

          {@code String}

          representing the sequence number if ..."

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90608161 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java — @@ -34,14 +34,16 @@ * @param shard the shard to get the iterator @param shardIteratorType the iterator type, defining how the shard is to be iterated * (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER) * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST + * (one of: TRIM_HORIZON, LATEST, AT_TIMESTAMP, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER) + * @param startingMarker is null if shardIteratorType is TRIM_HORIZON or LATEST, + * is as a timestamp if shardIteratorType is AT_TIMESTAMP, + * is as a sequence number if shardIteratorType is AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER End diff – I would suggest the following changes to the Javadoc here to be more specific: is null --> "should be {@code null} if ..." is as a timestamp --> "should be a {@code Date} value if ..." is as a sequence number --> "should be a {@code String} representing the sequence number if ..."
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90607651

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java —
          @@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe

          • {@inheritDoc}

            */
            @Override

          • public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
            + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
            + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
            + .withStreamName(shard.getStreamName())
            + .withShardId(shard.getShard().getShardId())
            + .withShardIteratorType(shardIteratorType);
            +
            + switch (ShardIteratorType.fromValue(shardIteratorType)) {
            + case TRIM_HORIZON:
            + case LATEST:
            + break;
            + case AT_TIMESTAMP:
            + getShardIteratorRequest.setTimestamp((Date) startingMarker);
              • End diff –

          For the new implementation of this method, we probably should handle type casting exceptions, and wrap them as IllegalArgumentExceptions. Since we're doing the property config validating locally, this really shouldn't happen, but it would be good to add handling type cast errors here to make this class self-contained.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90607651 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java — @@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe {@inheritDoc} */ @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType); + + switch (ShardIteratorType.fromValue(shardIteratorType)) { + case TRIM_HORIZON: + case LATEST: + break; + case AT_TIMESTAMP: + getShardIteratorRequest.setTimestamp((Date) startingMarker); End diff – For the new implementation of this method, we probably should handle type casting exceptions, and wrap them as IllegalArgumentExceptions. Since we're doing the property config validating locally, this really shouldn't happen, but it would be good to add handling type cast errors here to make this class self-contained.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r90607682

          — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java —
          @@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe

          • {@inheritDoc}

            */
            @Override

          • public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
            + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
            + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
            + .withStreamName(shard.getStreamName())
            + .withShardId(shard.getShard().getShardId())
            + .withShardIteratorType(shardIteratorType);
            +
            + switch (ShardIteratorType.fromValue(shardIteratorType)) {
            + case TRIM_HORIZON:
            + case LATEST:
            + break;
            + case AT_TIMESTAMP:
            + getShardIteratorRequest.setTimestamp((Date) startingMarker);
            + break;
            + case AT_SEQUENCE_NUMBER:
            + case AFTER_SEQUENCE_NUMBER:
            + getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker);
              • End diff –

          Same here, see above: consider handling type case exceptions?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90607682 — Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java — @@ -229,14 +232,33 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe {@inheritDoc} */ @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType); + + switch (ShardIteratorType.fromValue(shardIteratorType)) { + case TRIM_HORIZON: + case LATEST: + break; + case AT_TIMESTAMP: + getShardIteratorRequest.setTimestamp((Date) startingMarker); + break; + case AT_SEQUENCE_NUMBER: + case AFTER_SEQUENCE_NUMBER: + getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker); End diff – Same here, see above: consider handling type case exceptions?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/2916

          I'm not familiar how to update document. I will be grateful if you can take over this work. Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/2916 I'm not familiar how to update document. I will be grateful if you can take over this work. Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2916

          Thanks for addressing the final comments. I'll add the docs and merge this by the end of the day

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2916 Thanks for addressing the final comments. I'll add the docs and merge this by the end of the day
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2916

          Hi @tony810430, I've finished adding documents for this new feature, and opened a pull request at your local `FLINK-4523` branch.

          Can you take a look (would be great if you can review it too!), and merge the pull request to your `FLINK-4523`, so that it gets included in this PR too? Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2916 Hi @tony810430, I've finished adding documents for this new feature, and opened a pull request at your local ` FLINK-4523 ` branch. Can you take a look (would be great if you can review it too!), and merge the pull request to your ` FLINK-4523 `, so that it gets included in this PR too? Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2916

          Rebasing and merging this ...

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2916 Rebasing and merging this ...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2916

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2916
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2916

          Merged to master. Thank you for your contribution @tony810430 !

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2916 Merged to master. Thank you for your contribution @tony810430 !
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Resolved for master via 00d1ad86a021911f25b0a0aa6e095267d51af1f4.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master via 00d1ad86a021911f25b0a0aa6e095267d51af1f4.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Added FLINK-5625 as a follow up for this issue.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Added FLINK-5625 as a follow up for this issue.
          Hide
          tsriharsha Sriharsha Tikkireddy added a comment -

          Wei-Che Wei
          Hey I tried to run using an input of timestamp as start position but the following KinesisUtil function being called always throws an error because if input epoch time the date formatter parsing will throw a parse exception and if you enter in string characters you get an exception for parsing string values:

          private static void validateOptionalDateProperty(Properties config, String key, String message) {
          if (config.containsKey(key)) {
          try {
          initTimestampDateFormat.parse(config.getProperty(key)); —
          double value = Double.parseDouble(config.getProperty(key)); —
          if (value < 0)

          { throw new NumberFormatException(); }

          } catch (ParseException | NumberFormatException e)

          { throw new IllegalArgumentException(message); }

          }
          }

          Show
          tsriharsha Sriharsha Tikkireddy added a comment - Wei-Che Wei Hey I tried to run using an input of timestamp as start position but the following KinesisUtil function being called always throws an error because if input epoch time the date formatter parsing will throw a parse exception and if you enter in string characters you get an exception for parsing string values: private static void validateOptionalDateProperty(Properties config, String key, String message) { if (config.containsKey(key)) { try { initTimestampDateFormat.parse(config.getProperty(key)); — double value = Double.parseDouble(config.getProperty(key)); — if (value < 0) { throw new NumberFormatException(); } } catch (ParseException | NumberFormatException e) { throw new IllegalArgumentException(message); } } }
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tsriharsha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r108598516

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -215,4 +229,18 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }
          }
          +
          + private static void validateOptionalDateProperty(Properties config, String key, String message) {
          + if (config.containsKey(key)) {
          + try {
          + initTimestampDateFormat.parse(config.getProperty(key));
          — End diff –

          Hey this throws an error if a double is passed but the next line throws an exception when a timestamp string is passed. @tzulitai @tony810430

          Show
          githubbot ASF GitHub Bot added a comment - Github user tsriharsha commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r108598516 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -215,4 +229,18 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } } + + private static void validateOptionalDateProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { + try { + initTimestampDateFormat.parse(config.getProperty(key)); — End diff – Hey this throws an error if a double is passed but the next line throws an exception when a timestamp string is passed. @tzulitai @tony810430
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2916#discussion_r108599820

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -215,4 +229,18 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }
          }
          +
          + private static void validateOptionalDateProperty(Properties config, String key, String message) {
          + if (config.containsKey(key)) {
          + try {
          + initTimestampDateFormat.parse(config.getProperty(key));
          — End diff –

          @tsriharsha Thanks for pointing out this problem. I will fix it soon.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r108599820 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -215,4 +229,18 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } } + + private static void validateOptionalDateProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { + try { + initTimestampDateFormat.parse(config.getProperty(key)); — End diff – @tsriharsha Thanks for pointing out this problem. I will fix it soon.
          Hide
          tonywei Wei-Che Wei added a comment -

          Sriharsha Tikkireddy

          I have opened a task (FLINK-6211) to solve it. Thank you again for finding this problem.

          Show
          tonywei Wei-Che Wei added a comment - Sriharsha Tikkireddy I have opened a task ( FLINK-6211 ) to solve it. Thank you again for finding this problem.

            People

            • Assignee:
              tonywei Wei-Che Wei
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development