Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5625

Let Date format for timestamp-based start position in Kinesis consumer be configurable.

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Kinesis Connector
    • Labels:
      None

      Description

      Currently, the Kinesis consumer's Date format for timestamp-based start positions is fixed. It'll be nice to make this format configurable.

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Resolved for 1.3.0 with http://git-wip-us.apache.org/repos/asf/flink/commit/a119a30.

          Thanks for the contribution Tony!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for 1.3.0 with http://git-wip-us.apache.org/repos/asf/flink/commit/a119a30 . Thanks for the contribution Tony!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3651

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3651
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3651

          Merging to `master` ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3651 Merging to `master` ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3651

          LGTM, +1! I'll wait for a Travis run and then merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3651 LGTM, +1! I'll wait for a Travis run and then merge this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3651

          @tzulitai Thanks for the careful review. I have addressed those comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3651 @tzulitai Thanks for the careful review. I have addressed those comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108888462

          — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java —
          @@ -216,14 +217,73 @@ public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInCo
          testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
          testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);

          • KinesisConfigUtil.validateConsumerConfiguration(testConfig);
            + try {
            + KinesisConfigUtil.validateConsumerConfiguration(testConfig);
          • try{
            double value = Double.parseDouble(unixTimestamp);
            if (value < 0) { throw new NumberFormatException(); }
              • End diff –

          Because I want to force the value be positive. Is it okey to let it be negative?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108888462 — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java — @@ -216,14 +217,73 @@ public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInCo testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); KinesisConfigUtil.validateConsumerConfiguration(testConfig); + try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); try{ double value = Double.parseDouble(unixTimestamp); if (value < 0) { throw new NumberFormatException(); } End diff – Because I want to force the value be positive. Is it okey to let it be negative?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108884869

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java —
          @@ -59,6 +59,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
          /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
          public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";

          + /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
          — End diff –

          missing ")" at the end.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108884869 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java — @@ -59,6 +59,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp"; + /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ — End diff – missing ")" at the end.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108885717

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java —
          @@ -115,9 +115,15 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,

          if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
          String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
          +
          try

          { - this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - }

          catch (ParseException e)

          { + String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, + ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT); + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + this.initTimestamp = customDateFormat.parse(timestamp); + }

          catch (IllegalArgumentException | NullPointerException exception) {
          + throw new IllegalArgumentException(exception.getCause());
          — End diff –

          I think we should just wrap the whole exception instance and not only the exception object here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108885717 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java — @@ -115,9 +115,15 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP); + try { - this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - } catch (ParseException e) { + String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, + ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT); + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + this.initTimestamp = customDateFormat.parse(timestamp); + } catch (IllegalArgumentException | NullPointerException exception) { + throw new IllegalArgumentException(exception.getCause()); — End diff – I think we should just wrap the whole exception instance and not only the exception object here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108886633

          — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java —
          @@ -216,14 +217,73 @@ public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInCo
          testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
          testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);

          • KinesisConfigUtil.validateConsumerConfiguration(testConfig);
            + try {
            + KinesisConfigUtil.validateConsumerConfiguration(testConfig);
          • try{
            double value = Double.parseDouble(unixTimestamp);
            if (value < 0) { throw new NumberFormatException(); }
          • } catch (Exception e) { + }

            catch (Exception e)

            { + e.printStackTrace(); + fail(); + }

            + }
            +
            + @Test
            + public void testInvalidPatternForInitialTimestampInConfig()

            { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + }

            +
            + @Test
            + public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig()

            { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + }

            +
            + @Test
            + public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
            + String unixTimestamp = "2016-04-04";
            + String pattern = "yyyy-MM-dd";
            +
            + Properties testConfig = new Properties();
            + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
            + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
            + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
            + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
            + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
            + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
            + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
            +
            + try {
            + KinesisConfigUtil.validateConsumerConfiguration(testConfig);
            +
            + SimpleDateFormat customDateFormat = new SimpleDateFormat(pattern);
            + customDateFormat.parse(unixTimestamp);

              • End diff –

          Are these 2 lines necessary?:
          ```
          SimpleDateFormat customDateFormat = new SimpleDateFormat(pattern);
          customDateFormat.parse(unixTimestamp);
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108886633 — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java — @@ -216,14 +217,73 @@ public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInCo testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); KinesisConfigUtil.validateConsumerConfiguration(testConfig); + try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); try{ double value = Double.parseDouble(unixTimestamp); if (value < 0) { throw new NumberFormatException(); } } catch (Exception e) { + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testInvalidPatternForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() { + String unixTimestamp = "2016-04-04"; + String pattern = "yyyy-MM-dd"; + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern); + + try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + + SimpleDateFormat customDateFormat = new SimpleDateFormat(pattern); + customDateFormat.parse(unixTimestamp); End diff – Are these 2 lines necessary?: ``` SimpleDateFormat customDateFormat = new SimpleDateFormat(pattern); customDateFormat.parse(unixTimestamp); ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108886451

          — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java —
          @@ -216,14 +217,73 @@ public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInCo
          testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
          testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);

          • KinesisConfigUtil.validateConsumerConfiguration(testConfig);
            + try {
            + KinesisConfigUtil.validateConsumerConfiguration(testConfig);
          • try{
            double value = Double.parseDouble(unixTimestamp);
            if (value < 0) { throw new NumberFormatException(); }
              • End diff –

          Why do we need this:
          ```
          double value = Double.parseDouble(unixTimestamp);
          if (value < 0)

          { throw new NumberFormatException(); }

          ```
          here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108886451 — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java — @@ -216,14 +217,73 @@ public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInCo testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); KinesisConfigUtil.validateConsumerConfiguration(testConfig); + try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); try{ double value = Double.parseDouble(unixTimestamp); if (value < 0) { throw new NumberFormatException(); } End diff – Why do we need this: ``` double value = Double.parseDouble(unixTimestamp); if (value < 0) { throw new NumberFormatException(); } ``` here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108885047

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java —
          @@ -59,6 +59,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
          /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
          public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";

          + /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
          — End diff –

          Ah the other comments for the keys actually all miss a ")" at the end Could you fix those?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108885047 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java — @@ -59,6 +59,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp"; + /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ — End diff – Ah the other comments for the keys actually all miss a ")" at the end Could you fix those?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3651

          @tzulitai Have already updated. Thanks for your good suggestion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3651 @tzulitai Have already updated. Thanks for your good suggestion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108865011

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }

          • private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) {
            + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) {
            if (config.containsKey(timestampKey)) {
          • if (config.containsKey(formatKey)) {
          • try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - }

            catch (ParseException | IllegalArgumentException | NullPointerException exception)

            { - throw new IllegalArgumentException(message); - }
          • } else {
            + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + customDateFormat.parse(config.getProperty(timestampKey)); + }

            catch (ParseException | IllegalArgumentException | NullPointerException exception) {

              • End diff –

          Yes, or something similar. Generally, I don't think the two conditions should be handled with the same `catch` branch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108865011 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) { if (config.containsKey(timestampKey)) { if (config.containsKey(formatKey)) { try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - } catch (ParseException | IllegalArgumentException | NullPointerException exception) { - throw new IllegalArgumentException(message); - } } else { + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + customDateFormat.parse(config.getProperty(timestampKey)); + } catch (ParseException | IllegalArgumentException | NullPointerException exception) { End diff – Yes, or something similar. Generally, I don't think the two conditions should be handled with the same `catch` branch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108864693

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }

          • private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) {
            + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) {
            if (config.containsKey(timestampKey)) {
          • if (config.containsKey(formatKey)) {
          • try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - }

            catch (ParseException | IllegalArgumentException | NullPointerException exception)

            { - throw new IllegalArgumentException(message); - }
          • } else {
            + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + customDateFormat.parse(config.getProperty(timestampKey)); + }

            catch (ParseException | IllegalArgumentException | NullPointerException exception) {

              • End diff –

          Do you mean we shouldn't let it pass if the timestamp is double but the given format is invalid?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108864693 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) { if (config.containsKey(timestampKey)) { if (config.containsKey(formatKey)) { try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - } catch (ParseException | IllegalArgumentException | NullPointerException exception) { - throw new IllegalArgumentException(message); - } } else { + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + customDateFormat.parse(config.getProperty(timestampKey)); + } catch (ParseException | IllegalArgumentException | NullPointerException exception) { End diff – Do you mean we shouldn't let it pass if the timestamp is double but the given format is invalid?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108864201

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }

          • private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) {
            + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) {
            if (config.containsKey(timestampKey)) {
          • if (config.containsKey(formatKey)) {
          • try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - }

            catch (ParseException | IllegalArgumentException | NullPointerException exception)

            { - throw new IllegalArgumentException(message); - }
          • } else {
            + try {
            + SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
              • End diff –

          IllegalArgumentException would be thrown. See `SimpleDateFormat` [API](https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html#SimpleDateFormat(java.lang.String)).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108864201 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) { if (config.containsKey(timestampKey)) { if (config.containsKey(formatKey)) { try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - } catch (ParseException | IllegalArgumentException | NullPointerException exception) { - throw new IllegalArgumentException(message); - } } else { + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); End diff – IllegalArgumentException would be thrown. See `SimpleDateFormat` [API] ( https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html#SimpleDateFormat(java.lang.String )).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108863143

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }

          • private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) {
            + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) {
            if (config.containsKey(timestampKey)) {
          • if (config.containsKey(formatKey)) {
          • try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - }

            catch (ParseException | IllegalArgumentException | NullPointerException exception)

            { - throw new IllegalArgumentException(message); - }
          • } else {
            + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + customDateFormat.parse(config.getProperty(timestampKey)); + }

            catch (ParseException | IllegalArgumentException | NullPointerException exception) {

              • End diff –

          I think we might want to handle differently between the exception thrown for an invalid format, and an unparsable value. Is that possible?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108863143 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) { if (config.containsKey(timestampKey)) { if (config.containsKey(formatKey)) { try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - } catch (ParseException | IllegalArgumentException | NullPointerException exception) { - throw new IllegalArgumentException(message); - } } else { + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + customDateFormat.parse(config.getProperty(timestampKey)); + } catch (ParseException | IllegalArgumentException | NullPointerException exception) { End diff – I think we might want to handle differently between the exception thrown for an invalid format, and an unparsable value. Is that possible?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108861953

          — Diff: docs/dev/connectors/kinesis.md —
          @@ -122,10 +122,10 @@ one of the following values in the provided configuration properties (the naming

          • `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).
          • `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration
            properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern :
          • - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`).
          • a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`).
          • - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`,
          • (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user).
            + - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`.
            + If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then ehe default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
              • End diff –

          type: "ehe"

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108861953 — Diff: docs/dev/connectors/kinesis.md — @@ -122,10 +122,10 @@ one of the following values in the provided configuration properties (the naming `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`). a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`). - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`, (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user). + - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`. + If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then ehe default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` End diff – type: "ehe"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108862644

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }

          • private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) {
            + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) {
            if (config.containsKey(timestampKey)) {
          • if (config.containsKey(formatKey)) {
          • try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - }

            catch (ParseException | IllegalArgumentException | NullPointerException exception)

            { - throw new IllegalArgumentException(message); - }
          • } else {
            + try {
            + SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
              • End diff –

          What exception would be thrown if the format itself is invalid?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108862644 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) { if (config.containsKey(timestampKey)) { if (config.containsKey(formatKey)) { try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - } catch (ParseException | IllegalArgumentException | NullPointerException exception) { - throw new IllegalArgumentException(message); - } } else { + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); End diff – What exception would be thrown if the format itself is invalid?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108862488

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }

          • private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) {
            + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) {
            if (config.containsKey(timestampKey)) {
          • if (config.containsKey(formatKey)) {
          • try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - }

            catch (ParseException | IllegalArgumentException | NullPointerException exception)

            { - throw new IllegalArgumentException(message); - }
          • } else {
            + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + customDateFormat.parse(config.getProperty(timestampKey)); + }

            catch (ParseException | IllegalArgumentException | NullPointerException exception) {
            try

            { - initTimestampDateFormat.parse(config.getProperty(timestampKey)); - }

            catch (ParseException exception) {

          • try {
          • double value = Double.parseDouble(config.getProperty(timestampKey));
          • if (value < 0) { - throw new NumberFormatException(); - }
          • } catch (NumberFormatException numberFormatException) {
          • throw new IllegalArgumentException(message);
            + double value = Double.parseDouble(config.getProperty(timestampKey));
            + if (value < 0) {
            + throw new NumberFormatException();
              • End diff –

          Shouldn't we actually throw `new IllegalArgumentException(message)` here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108862488 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -224,27 +222,19 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) { if (config.containsKey(timestampKey)) { if (config.containsKey(formatKey)) { try { - SimpleDateFormat customDateFormat = new SimpleDateFormat(config.getProperty(formatKey)); - customDateFormat.parse(config.getProperty(timestampKey)); - } catch (ParseException | IllegalArgumentException | NullPointerException exception) { - throw new IllegalArgumentException(message); - } } else { + try { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + customDateFormat.parse(config.getProperty(timestampKey)); + } catch (ParseException | IllegalArgumentException | NullPointerException exception) { try { - initTimestampDateFormat.parse(config.getProperty(timestampKey)); - } catch (ParseException exception) { try { double value = Double.parseDouble(config.getProperty(timestampKey)); if (value < 0) { - throw new NumberFormatException(); - } } catch (NumberFormatException numberFormatException) { throw new IllegalArgumentException(message); + double value = Double.parseDouble(config.getProperty(timestampKey)); + if (value < 0) { + throw new NumberFormatException(); End diff – Shouldn't we actually throw `new IllegalArgumentException(message)` here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108848859

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -222,19 +224,28 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }

          • private static void validateOptionalDateProperty(Properties config, String key, String message) {
          • if (config.containsKey(key)) {
          • try { - initTimestampDateFormat.parse(config.getProperty(key)); - }

            catch (ParseException parseException) {
            + private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) {
            + if (config.containsKey(timestampKey)) {
            + if (config.containsKey(formatKey)) {

              • End diff –

          I will fix it too.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108848859 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -222,19 +224,28 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } private static void validateOptionalDateProperty(Properties config, String key, String message) { if (config.containsKey(key)) { try { - initTimestampDateFormat.parse(config.getProperty(key)); - } catch (ParseException parseException) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + if (config.containsKey(timestampKey)) { + if (config.containsKey(formatKey)) { End diff – I will fix it too.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108848825

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java —
          @@ -115,10 +116,21 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,

          if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
          String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);

          • try { - this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - }

            catch (ParseException e) {

          • this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
            +
            + if (consumerConfig.containsKey(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT)) {
              • End diff –

          You are right. I will fix it later.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108848825 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java — @@ -115,10 +116,21 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP); try { - this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - } catch (ParseException e) { this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); + + if (consumerConfig.containsKey(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT)) { End diff – You are right. I will fix it later.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108848761

          — Diff: docs/dev/connectors/kinesis.md —
          @@ -121,9 +121,11 @@ one of the following values in the provided configuration properties (the naming

          • `LATEST`: read all shards of all streams starting from the latest record.
          • `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).
          • `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration
            -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern
            -`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds
            -that has elapsed since the Unix epoch (for example, `1459799926.480`).
            +properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern :
            + - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`).
              • End diff –

          This format referred to [Kinesis API](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html). I think it is not bad to use it as default format.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108848761 — Diff: docs/dev/connectors/kinesis.md — @@ -121,9 +121,11 @@ one of the following values in the provided configuration properties (the naming `LATEST`: read all shards of all streams starting from the latest record. `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern -`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds -that has elapsed since the Unix epoch (for example, `1459799926.480`). +properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : + - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`). End diff – This format referred to [Kinesis API] ( http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html ). I think it is not bad to use it as default format.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108844803

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java —
          @@ -115,10 +116,21 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,

          if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
          String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);

          • try { - this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - }

            catch (ParseException e) {

          • this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
            +
            + if (consumerConfig.containsKey(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT)) {
              • End diff –

          I think for this, it would be nice to be able to just say:

          ```
          String dataFormat = consumerConfig.get(
          ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
          ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
          ```
          and then use whatever rendered format to build the data parser. The logic is much easier to understand that way.

          `DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT` would basically be the original format in `KinesisConfigUtil.initTimestampDateFormat`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108844803 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java — @@ -115,10 +116,21 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP); try { - this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - } catch (ParseException e) { this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); + + if (consumerConfig.containsKey(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT)) { End diff – I think for this, it would be nice to be able to just say: ``` String dataFormat = consumerConfig.get( ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT); ``` and then use whatever rendered format to build the data parser. The logic is much easier to understand that way. `DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT` would basically be the original format in `KinesisConfigUtil.initTimestampDateFormat`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108844299

          — Diff: docs/dev/connectors/kinesis.md —
          @@ -121,9 +121,11 @@ one of the following values in the provided configuration properties (the naming

          • `LATEST`: read all shards of all streams starting from the latest record.
          • `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).
          • `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration
            -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern
            -`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds
            -that has elapsed since the Unix epoch (for example, `1459799926.480`).
            +properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern :
            + - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`).
              • End diff –

          I would perhaps try to include info in the doc that this format is the "default" format.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108844299 — Diff: docs/dev/connectors/kinesis.md — @@ -121,9 +121,11 @@ one of the following values in the provided configuration properties (the naming `LATEST`: read all shards of all streams starting from the latest record. `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern -`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds -that has elapsed since the Unix epoch (for example, `1459799926.480`). +properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : + - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`). End diff – I would perhaps try to include info in the doc that this format is the "default" format.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108845204

          — Diff: docs/dev/connectors/kinesis.md —
          @@ -121,9 +121,11 @@ one of the following values in the provided configuration properties (the naming

          • `LATEST`: read all shards of all streams starting from the latest record.
          • `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).
          • `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration
            -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern
            -`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds
            -that has elapsed since the Unix epoch (for example, `1459799926.480`).
            +properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern :
            + - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`).
              • End diff –

          Also, do you think it would make sense to have a simpler default format?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108845204 — Diff: docs/dev/connectors/kinesis.md — @@ -121,9 +121,11 @@ one of the following values in the provided configuration properties (the naming `LATEST`: read all shards of all streams starting from the latest record. `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern -`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds -that has elapsed since the Unix epoch (for example, `1459799926.480`). +properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : + - `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`). End diff – Also, do you think it would make sense to have a simpler default format?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3651#discussion_r108845132

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java —
          @@ -222,19 +224,28 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St
          }
          }

          • private static void validateOptionalDateProperty(Properties config, String key, String message) {
          • if (config.containsKey(key)) {
          • try { - initTimestampDateFormat.parse(config.getProperty(key)); - }

            catch (ParseException parseException) {
            + private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) {
            + if (config.containsKey(timestampKey)) {
            + if (config.containsKey(formatKey)) {

              • End diff –

          This here too is a bit too complicated.
          We can just determine what format will be used by `config.get(formatKey, DEFAULT)`, and focus on validating the date. I don't think we need to make things complicated by checking if the format key exists.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3651#discussion_r108845132 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java — @@ -222,19 +224,28 @@ private static void validateOptionalPositiveDoubleProperty(Properties config, St } } private static void validateOptionalDateProperty(Properties config, String key, String message) { if (config.containsKey(key)) { try { - initTimestampDateFormat.parse(config.getProperty(key)); - } catch (ParseException parseException) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String formatKey, String message) { + if (config.containsKey(timestampKey)) { + if (config.containsKey(formatKey)) { End diff – This here too is a bit too complicated. We can just determine what format will be used by `config.get(formatKey, DEFAULT)`, and focus on validating the date. I don't think we need to make things complicated by checking if the format key exists.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3650

          Make a mistake again because I am not familiar with JIRA. Sorry for that. orz

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3650 Make a mistake again because I am not familiar with JIRA. Sorry for that. orz
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tony810430 opened a pull request:

          https://github.com/apache/flink/pull/3651

          FLINK-5625 [kinesis] Let Date format for timestamp-based start position in Kinesis consumer be configurable

          The patch let user define their own pattern for parsing date string.
          If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is specified then `FlinkKinesisConsumer` will only use this format to parse date string.

          • [v] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [v] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [v] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tony810430/flink FLINK-5625

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3651.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3651


          commit f594d7f4c30dd3cc13ad482eb017689fbb9c0c43
          Author: Tony Wei <tony19920430@gmail.com>
          Date: 2017-03-30T01:48:43Z

          FLINK-5625 Let Date format for timestamp-based start position in Kinesis consumer be configurable


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tony810430 opened a pull request: https://github.com/apache/flink/pull/3651 FLINK-5625 [kinesis] Let Date format for timestamp-based start position in Kinesis consumer be configurable The patch let user define their own pattern for parsing date string. If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is specified then `FlinkKinesisConsumer` will only use this format to parse date string. [v] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [v] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [v] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/tony810430/flink FLINK-5625 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3651.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3651 commit f594d7f4c30dd3cc13ad482eb017689fbb9c0c43 Author: Tony Wei <tony19920430@gmail.com> Date: 2017-03-30T01:48:43Z FLINK-5625 Let Date format for timestamp-based start position in Kinesis consumer be configurable
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 closed the pull request at:

          https://github.com/apache/flink/pull/3650

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 closed the pull request at: https://github.com/apache/flink/pull/3650
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on the issue:

          https://github.com/apache/flink/pull/3650

          Typo on the task id. > <
          Close and open another one to make ASF GitHub Bot can track this PR correctly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on the issue: https://github.com/apache/flink/pull/3650 Typo on the task id. > < Close and open another one to make ASF GitHub Bot can track this PR correctly.

            People

            • Assignee:
              tonywei Wei-Che Wei
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development