Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5113

Make all Testing Functions implement CheckpointedFunction Interface.

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.3.0
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      Currently stateful functions implement the (old) Checkpointed interface.
      This is issue aims at porting all these function to the new CheckpointedFunction interface, so that they can leverage the new
      capabilities by it.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/2939

          FLINK-5113 Ports all functions in the tests to the new checkpointing interface.

          This PR only touches testing code.
          It refactors all the tests that were using the `Checkpointed` and `CheckpointedAsynchronously` interfaces to use the new `ListCheckpointed` and `CheckpointedFunction` ones.

          There is one commit per class, but the changes are very small and easy to review.

          R: @aljoscha @zentol

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink test-func-ref-reb

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2939.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2939


          commit e8eecc97385b66dccdc31c057591e265e1dfc963
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-22T17:38:54Z

          FLINK-5113 CoStreamCheckpointingITCase refactoring.

          commit 588d98175bf14699fca8ef0117ee37f2acdb8c65
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-22T18:23:33Z

          FLINK-5113 EventTimeAllWindowCheckpointingITCase refactoring.

          commit 31d642eeafd98587999cd1ec69aff87b6224f941
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-22T18:32:24Z

          FLINK-5113 AbstractEventTimeWindowCheckpointingITCase refactoring.

          commit 26a1ffefaf4b7fa01b7df80bd175e98337a0eef0
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-22T18:49:47Z

          FLINK-5113 StreamCheckpointingITCase refactoring.

          commit efacdbd5fce6e3ebb590a952da3f8e1b1b7b42ea
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T10:49:24Z

          FLINK-5113 StreamCheckpointNotifierITCase refactoring

          commit 9815ee405fa21d875963039cb145eea21025e737
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T10:56:06Z

          FLINK-5113 BucketingSinkFaultToleranceITCase refactoring.

          commit b1cc019622fedcb08b0e0a232a3deedd2c4e37c5
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T11:11:11Z

          FLINK-5113 StateCheckpointedITCase refactoring

          commit f6e73c2fe6f34953e5029ba79845b720ac5a0c05
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T11:21:24Z

          FLINK-5113 CheckpointingCustomKvStateProgram refactoring.

          commit d6c4ad75f7f47c872fb5fb6650c9f3a111a55108
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T11:31:11Z

          FLINK-5113 JobManagerHACheckpointRecoveryITCase refactoring

          commit 7a99f7bb186be2a235ba28e3d61d65f60a82b571
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T13:14:40Z

          FLINK-5113 CheckpointedStreamingProgram refactoring.

          commit 30096fe6399c8fd3f14bd5c23af4ee1506ca26e2
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T13:24:54Z

          FLINK-5113 WindowCheckpointingITCase refactoring.

          commit 02424d0ea88986618cb66d9b5d902e86512981dc
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T13:31:18Z

          FLINK-5113 TaskManagerProcessFailureStreamingRecoveryITCase refactoring.

          commit 65eed028ff2af4106b509c55f2cf15c902e6f6d6
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T13:41:58Z

          FLINK-5113 RescalingITCase refactoring

          commit f25c387d919eb569fd80abbf3621e3482c9fdb79
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T13:55:02Z

          FLINK-5113 CassandraTupleWriteAheadSinkExample refactoring.

          commit 6071795423b3c9bf5143c32c93ce69e159e9bb4f
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T14:02:10Z

          FLINK-5113 SavepointITCase refactoring.

          commit 4212d45a5592f83016864d4a588219672797142e
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T14:11:29Z

          FLINK-5113 ChaosMonkeyITCase refactoring.

          commit 32738006d4e48ac42985ed2b77b9b6555e352e75
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T14:16:37Z

          FLINK-5113 UdfStreamOperatorCheckpointingITCase refactoring.

          commit a7e883ff6d1c56e84be4ed49e0b53acab2bce54d
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T14:19:58Z

          FLINK-5113 InterruptSensitiveRestoreTest refactoring.

          commit 6c1acbba4f3bae5c3b5ba812beafa3caeddc4a0f
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T16:08:38Z

          FLINK-5113 KafkaConsumerTestBase refactored.

          commit b88d9085ddedff90e68534b84450796d9edd3396
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T16:12:35Z

          FLINK-5113 ValidatingExactlyOnceSink refactoring.

          commit a9e2d86177ccd446bc13e353e09cbb5c8c0a7016
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T16:21:19Z

          FLINK-5113 PartitionedStateCheckpointingITCase refactoring.

          commit 8161507acf63926e52dcd0952565017aafaa1359
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T16:34:45Z

          FLINK-5113 FailingIdentityMapper refactoring.

          commit ff8ee23ccc6b5ba95151dda03f4279cde0e58af1
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T16:48:31Z

          FLINK-5113 SourceStreamTaskTest refactoring.

          commit 2ca6348f4802f3ae744aa1bab829a4df85b2ca08
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-23T16:54:03Z

          FLINK-5113 RollingSinkFaultToleranceITCase refactoring.

          commit c4b029cd3d30d3bc59715291d7e48da97cca48eb
          Author: kl0u <kkloudas@gmail.com>
          Date: 2016-11-28T19:07:52Z

          FLINK-5113 InterruptSensitiveRestoreTest


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2939 FLINK-5113 Ports all functions in the tests to the new checkpointing interface. This PR only touches testing code. It refactors all the tests that were using the `Checkpointed` and `CheckpointedAsynchronously` interfaces to use the new `ListCheckpointed` and `CheckpointedFunction` ones. There is one commit per class, but the changes are very small and easy to review. R: @aljoscha @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink test-func-ref-reb Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2939.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2939 commit e8eecc97385b66dccdc31c057591e265e1dfc963 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-22T17:38:54Z FLINK-5113 CoStreamCheckpointingITCase refactoring. commit 588d98175bf14699fca8ef0117ee37f2acdb8c65 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-22T18:23:33Z FLINK-5113 EventTimeAllWindowCheckpointingITCase refactoring. commit 31d642eeafd98587999cd1ec69aff87b6224f941 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-22T18:32:24Z FLINK-5113 AbstractEventTimeWindowCheckpointingITCase refactoring. commit 26a1ffefaf4b7fa01b7df80bd175e98337a0eef0 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-22T18:49:47Z FLINK-5113 StreamCheckpointingITCase refactoring. commit efacdbd5fce6e3ebb590a952da3f8e1b1b7b42ea Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T10:49:24Z FLINK-5113 StreamCheckpointNotifierITCase refactoring commit 9815ee405fa21d875963039cb145eea21025e737 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T10:56:06Z FLINK-5113 BucketingSinkFaultToleranceITCase refactoring. commit b1cc019622fedcb08b0e0a232a3deedd2c4e37c5 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T11:11:11Z FLINK-5113 StateCheckpointedITCase refactoring commit f6e73c2fe6f34953e5029ba79845b720ac5a0c05 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T11:21:24Z FLINK-5113 CheckpointingCustomKvStateProgram refactoring. commit d6c4ad75f7f47c872fb5fb6650c9f3a111a55108 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T11:31:11Z FLINK-5113 JobManagerHACheckpointRecoveryITCase refactoring commit 7a99f7bb186be2a235ba28e3d61d65f60a82b571 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T13:14:40Z FLINK-5113 CheckpointedStreamingProgram refactoring. commit 30096fe6399c8fd3f14bd5c23af4ee1506ca26e2 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T13:24:54Z FLINK-5113 WindowCheckpointingITCase refactoring. commit 02424d0ea88986618cb66d9b5d902e86512981dc Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T13:31:18Z FLINK-5113 TaskManagerProcessFailureStreamingRecoveryITCase refactoring. commit 65eed028ff2af4106b509c55f2cf15c902e6f6d6 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T13:41:58Z FLINK-5113 RescalingITCase refactoring commit f25c387d919eb569fd80abbf3621e3482c9fdb79 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T13:55:02Z FLINK-5113 CassandraTupleWriteAheadSinkExample refactoring. commit 6071795423b3c9bf5143c32c93ce69e159e9bb4f Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T14:02:10Z FLINK-5113 SavepointITCase refactoring. commit 4212d45a5592f83016864d4a588219672797142e Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T14:11:29Z FLINK-5113 ChaosMonkeyITCase refactoring. commit 32738006d4e48ac42985ed2b77b9b6555e352e75 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T14:16:37Z FLINK-5113 UdfStreamOperatorCheckpointingITCase refactoring. commit a7e883ff6d1c56e84be4ed49e0b53acab2bce54d Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T14:19:58Z FLINK-5113 InterruptSensitiveRestoreTest refactoring. commit 6c1acbba4f3bae5c3b5ba812beafa3caeddc4a0f Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T16:08:38Z FLINK-5113 KafkaConsumerTestBase refactored. commit b88d9085ddedff90e68534b84450796d9edd3396 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T16:12:35Z FLINK-5113 ValidatingExactlyOnceSink refactoring. commit a9e2d86177ccd446bc13e353e09cbb5c8c0a7016 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T16:21:19Z FLINK-5113 PartitionedStateCheckpointingITCase refactoring. commit 8161507acf63926e52dcd0952565017aafaa1359 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T16:34:45Z FLINK-5113 FailingIdentityMapper refactoring. commit ff8ee23ccc6b5ba95151dda03f4279cde0e58af1 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T16:48:31Z FLINK-5113 SourceStreamTaskTest refactoring. commit 2ca6348f4802f3ae744aa1bab829a4df85b2ca08 Author: kl0u <kkloudas@gmail.com> Date: 2016-11-23T16:54:03Z FLINK-5113 RollingSinkFaultToleranceITCase refactoring. commit c4b029cd3d30d3bc59715291d7e48da97cca48eb Author: kl0u <kkloudas@gmail.com> Date: 2016-11-28T19:07:52Z FLINK-5113 InterruptSensitiveRestoreTest
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91721848

          — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java —
          @@ -372,23 +374,25 @@ public void notifyCheckpointComplete(long checkpointId)

          { numSuccessfulCheckpoints++; }
          • @Override
          • public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
          • return numElementsEmitted;
            + public static void reset() {
            + failedBefore = false;
              • End diff –

          method order, snapshot -> restore -> reset

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721848 — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java — @@ -372,23 +374,25 @@ public void notifyCheckpointComplete(long checkpointId) { numSuccessfulCheckpoints++; } @Override public Integer snapshotState(long checkpointId, long checkpointTimestamp) { return numElementsEmitted; + public static void reset() { + failedBefore = false; End diff – method order, snapshot -> restore -> reset
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91721132

          — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java —
          @@ -323,26 +329,25 @@ public PrefixCount map(String value) {
          }

          @Override

          • public Long snapshotState(long checkpointId, long checkpointTimestamp) {
          • return count;
            + public void close() throws IOException {
            + counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
              • End diff –

          Please retain the order of the original methods; snapshot -> restore -> close

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721132 — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java — @@ -323,26 +329,25 @@ public PrefixCount map(String value) { } @Override public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; + public void close() throws IOException { + counts [getRuntimeContext().getIndexOfThisSubtask()] = count; End diff – Please retain the order of the original methods; snapshot -> restore -> close
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91720690

          — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java —
          @@ -566,23 +568,25 @@ public void notifyCheckpointComplete(long checkpointId)

          { numSuccessfulCheckpoints++; }
          • @Override
          • public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
          • return numElementsEmitted;
            + public static void reset() {
              • End diff –

          please move this method to the bottom of the class again.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91720690 — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java — @@ -566,23 +568,25 @@ public void notifyCheckpointComplete(long checkpointId) { numSuccessfulCheckpoints++; } @Override public Integer snapshotState(long checkpointId, long checkpointTimestamp) { return numElementsEmitted; + public static void reset() { End diff – please move this method to the bottom of the class again.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91719698

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java —
          @@ -199,11 +200,13 @@ public Serializable snapshotState(long checkpointId, long checkpointTimestamp) t
          Assert.fail("Count is different at start end end of snapshot.");
          }
          semaphore.release();

          • return sum;
            + return Collections.singletonList((Serializable) sum);
            }

          @Override

          • public void restoreState(Serializable state) {}
            + public void restoreState(List<Serializable> state) throws Exception {
            +
              • End diff –

          can you remove this empty line?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91719698 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java — @@ -199,11 +200,13 @@ public Serializable snapshotState(long checkpointId, long checkpointTimestamp) t Assert.fail("Count is different at start end end of snapshot."); } semaphore.release(); return sum; + return Collections.singletonList((Serializable) sum); } @Override public void restoreState(Serializable state) {} + public void restoreState(List<Serializable> state) throws Exception { + End diff – can you remove this empty line?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91719518

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java —
          @@ -128,11 +131,16 @@ private static Task createTask(
          when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
          .thenReturn(mock(TaskKvStateRegistry.class));

          • ChainedStateHandle<StreamStateHandle> operatorState = new ChainedStateHandle<>(Collections.singletonList(state));
            + ChainedStateHandle<StreamStateHandle> operatorState = null;
            List<KeyGroupsStateHandle> keyGroupStateFromBackend = Collections.emptyList();
            List<KeyGroupsStateHandle> keyGroupStateFromStream = Collections.emptyList();
          • List<Collection<OperatorStateHandle>> operatorStateBackend = Collections.emptyList();
          • List<Collection<OperatorStateHandle>> operatorStateStream = Collections.emptyList();
            +
            + Map<String, long[]> testState = new HashMap<>();
            + testState.put("test", new long[] {0, 10}

            );
            +
            + Collection<OperatorStateHandle> handle = Collections.singletonList(new OperatorStateHandle(testState, state));
            + List<Collection<OperatorStateHandle>> operatorStateBackend = Collections.singletonList(handle);
            + List<Collection<OperatorStateHandle>> operatorStateStream = Collections.singletonList(handle);

              • End diff –

          Can this be an empty list as well?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91719518 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java — @@ -128,11 +131,16 @@ private static Task createTask( when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); ChainedStateHandle<StreamStateHandle> operatorState = new ChainedStateHandle<>(Collections.singletonList(state)); + ChainedStateHandle<StreamStateHandle> operatorState = null; List<KeyGroupsStateHandle> keyGroupStateFromBackend = Collections.emptyList(); List<KeyGroupsStateHandle> keyGroupStateFromStream = Collections.emptyList(); List<Collection<OperatorStateHandle>> operatorStateBackend = Collections.emptyList(); List<Collection<OperatorStateHandle>> operatorStateStream = Collections.emptyList(); + + Map<String, long[]> testState = new HashMap<>(); + testState.put("test", new long[] {0, 10} ); + + Collection<OperatorStateHandle> handle = Collections.singletonList(new OperatorStateHandle(testState, state)); + List<Collection<OperatorStateHandle>> operatorStateBackend = Collections.singletonList(handle); + List<Collection<OperatorStateHandle>> operatorStateStream = Collections.singletonList(handle); End diff – Can this be an empty list as well?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91721756

          — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java —
          @@ -447,5 +434,23 @@ public void notifyCheckpointComplete(long checkpointId)

          { GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); }

          }
          +
          + @Override
          + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
          + if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) {
          — End diff –

          please move this methods up again to reduce the diff. The methods are identical apart from the signature and return statement, the diff should reflect that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721756 — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java — @@ -447,5 +434,23 @@ public void notifyCheckpointComplete(long checkpointId) { GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet(); } } + + @Override + public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) { — End diff – please move this methods up again to reduce the diff. The methods are identical apart from the signature and return statement, the diff should reflect that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91722459

          — Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java —
          @@ -57,7 +59,7 @@ public static void main(String[] args) throws Exception {

          // with Checkpoining

          • public static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> {
              • End diff –

          I understand why you did not replace ´Checkpointed` with a different interface, but this technically changes the test. If a function can still implement these interfaces without doing anything within snapshot/restore, then we should cover this with tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91722459 — Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java — @@ -57,7 +59,7 @@ public static void main(String[] args) throws Exception { // with Checkpoining public static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> { End diff – I understand why you did not replace ´Checkpointed` with a different interface, but this technically changes the test. If a function can still implement these interfaces without doing anything within snapshot/restore, then we should cover this with tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91720508

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java —
          @@ -199,11 +200,13 @@ public Serializable snapshotState(long checkpointId, long checkpointTimestamp) t
          Assert.fail("Count is different at start end end of snapshot.");
          }
          semaphore.release();

          • return sum;
            + return Collections.singletonList((Serializable) sum);
              • End diff –

          You don't to cast here, instead use `return Collections.<Serializable>singletonList(sum);`

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91720508 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java — @@ -199,11 +200,13 @@ public Serializable snapshotState(long checkpointId, long checkpointTimestamp) t Assert.fail("Count is different at start end end of snapshot."); } semaphore.release(); return sum; + return Collections.singletonList((Serializable) sum); End diff – You don't to cast here, instead use `return Collections.<Serializable>singletonList(sum);`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91719644

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java —
          @@ -1,4 +1,4 @@
          -/**
          +/*
          — End diff –

          unrelated change

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91719644 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java — @@ -1,4 +1,4 @@ -/** +/* — End diff – unrelated change
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91721509

          — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java —
          @@ -47,7 +47,7 @@

          • A simple test that runs a streaming topology with checkpointing enabled.
            *
          • The test triggers a failure after a while and verifies that, after completion, the
          • * state defined with either the {@link ValueState} or the {@link Checkpointed}
            + * state defined with either the {@link ValueState}

            or the

            {@link org.apache.flink.streaming.api.checkpoint.ListCheckpointed}
              • End diff –

          do not use the fully qualified class name here. (it's not required since you import the class anyway )

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721509 — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java — @@ -47,7 +47,7 @@ A simple test that runs a streaming topology with checkpointing enabled. * The test triggers a failure after a while and verifies that, after completion, the * state defined with either the {@link ValueState} or the {@link Checkpointed} + * state defined with either the {@link ValueState} or the {@link org.apache.flink.streaming.api.checkpoint.ListCheckpointed} End diff – do not use the fully qualified class name here. (it's not required since you import the class anyway )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91724133

          — Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java —
          @@ -72,34 +74,27 @@ public void run(SourceContext<String> ctx) throws Exception {
          public void cancel()

          { running = false; }

          -

          • @Override
          • public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return null; - }

            -

          • @Override
          • public void restoreState(Integer state) { - - }

            }

          • public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener {
            + public static class StatefulMapper implements MapFunction<String, String>, ListCheckpointed<StatefulMapper>, CheckpointListener {

          private String someState;
          private boolean atLeastOneSnapshotComplete = false;
          private boolean restored = false;

          @Override

          • public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
          • return this;
            + public List<StatefulMapper> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this); }

          @Override

          • public void restoreState(StatefulMapper state) {
          • restored = true;
          • this.someState = state.someState;
          • this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete;
            + public void restoreState(List<StatefulMapper> state) throws Exception {
            + if (!state.isEmpty()) {
              • End diff –

          If the state is empty we should fail immediately; currently (I think) this would cause us to fail with the RuntimeException saying "Intended failure, to trigger restore", which is a bit inaccurate.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91724133 — Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java — @@ -72,34 +74,27 @@ public void run(SourceContext<String> ctx) throws Exception { public void cancel() { running = false; } - @Override public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return null; - } - @Override public void restoreState(Integer state) { - - } } public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener { + public static class StatefulMapper implements MapFunction<String, String>, ListCheckpointed<StatefulMapper>, CheckpointListener { private String someState; private boolean atLeastOneSnapshotComplete = false; private boolean restored = false; @Override public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { return this; + public List<StatefulMapper> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this); } @Override public void restoreState(StatefulMapper state) { restored = true; this.someState = state.someState; this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete; + public void restoreState(List<StatefulMapper> state) throws Exception { + if (!state.isEmpty()) { End diff – If the state is empty we should fail immediately; currently (I think) this would cause us to fail with the RuntimeException saying "Intended failure, to trigger restore", which is a bit inaccurate.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91721182

          — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java —
          @@ -362,19 +367,21 @@ public void flatMap2(String value, Collector<String> out) {
          }

          @Override

          • public Long snapshotState(long checkpointId, long checkpointTimestamp) {
              • End diff –

          Please retain the order of the original methods; snapshot -> restore -> close

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721182 — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java — @@ -362,19 +367,21 @@ public void flatMap2(String value, Collector<String> out) { } @Override public Long snapshotState(long checkpointId, long checkpointTimestamp) { End diff – Please retain the order of the original methods; snapshot -> restore -> close
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91721578

          — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java —
          @@ -338,14 +344,17 @@ public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Except
          }
          }

          +
          — End diff –

          remove new line

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91721578 — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java — @@ -338,14 +344,17 @@ public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Except } } + — End diff – remove new line
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91730804

          — Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java —
          @@ -72,34 +74,27 @@ public void run(SourceContext<String> ctx) throws Exception {
          public void cancel()

          { running = false; }

          -

          • @Override
          • public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return null; - }

            -

          • @Override
          • public void restoreState(Integer state) { - - }

            }

          • public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener {
            + public static class StatefulMapper implements MapFunction<String, String>, ListCheckpointed<StatefulMapper>, CheckpointListener {

          private String someState;
          private boolean atLeastOneSnapshotComplete = false;
          private boolean restored = false;

          @Override

          • public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
          • return this;
            + public List<StatefulMapper> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this); }

          @Override

          • public void restoreState(StatefulMapper state) {
          • restored = true;
          • this.someState = state.someState;
          • this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete;
            + public void restoreState(List<StatefulMapper> state) throws Exception {
            + if (!state.isEmpty()) {
              • End diff –

          Alright, i figured out why we can't fail here immediately. It still seems odd though that do not explicitly differentiate between a call to restore before any state was snapshotted and a broken snapshotting that doesn't return a state, although this applies to all other tests as well.

          If the test is successful f that we are getting the state that we snapshotted we should also have failure condition in case this does not happen; currently we simply enter undefined territory.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91730804 — Diff: flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java — @@ -72,34 +74,27 @@ public void run(SourceContext<String> ctx) throws Exception { public void cancel() { running = false; } - @Override public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return null; - } - @Override public void restoreState(Integer state) { - - } } public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener { + public static class StatefulMapper implements MapFunction<String, String>, ListCheckpointed<StatefulMapper>, CheckpointListener { private String someState; private boolean atLeastOneSnapshotComplete = false; private boolean restored = false; @Override public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { return this; + public List<StatefulMapper> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this); } @Override public void restoreState(StatefulMapper state) { restored = true; this.someState = state.someState; this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete; + public void restoreState(List<StatefulMapper> state) throws Exception { + if (!state.isEmpty()) { End diff – Alright, i figured out why we can't fail here immediately. It still seems odd though that do not explicitly differentiate between a call to restore before any state was snapshotted and a broken snapshotting that doesn't return a state, although this applies to all other tests as well. If the test is successful f that we are getting the state that we snapshotted we should also have failure condition in case this does not happen; currently we simply enter undefined territory.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2939

          @zentol Thanks for the review. I integrated your comments and I am waiting for Travis.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 @zentol Thanks for the review. I integrated your comments and I am waiting for Travis.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91961116

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java —
          @@ -68,15 +69,18 @@ else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
          }

          @Override

          • public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
          • LOG.info("Snapshot of counter "numElements" at checkpoint "+checkpointId);
          • return new Tuple2<>(numElements, duplicateChecker);
            + public List<Tuple2<Integer, BitSet>> snapshotState(long checkpointId, long timestamp) throws Exception {
            + LOG.info("Snapshot of counter "numElements" at checkpoint "+ checkpointId);
              • End diff –

          either add all missing spaces or none

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91961116 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java — @@ -68,15 +69,18 @@ else if (duplicateChecker.nextClearBit(0) != numElementsTotal) { } @Override public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) { LOG.info("Snapshot of counter " numElements " at checkpoint "+checkpointId); return new Tuple2<>(numElements, duplicateChecker); + public List<Tuple2<Integer, BitSet>> snapshotState(long checkpointId, long timestamp) throws Exception { + LOG.info("Snapshot of counter " numElements " at checkpoint "+ checkpointId); End diff – either add all missing spaces or none
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2939#discussion_r91962004

          — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java —
          @@ -513,8 +514,8 @@ public void cancel() {
          }

          private static class StatefulCounter

          • extends RichMapFunction<Integer, Integer>
          • implements Checkpointed<byte[]>, CheckpointListener {
            + extends RichMapFunction<Integer, Integer>
              • End diff –

          please revert the indentation changes

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2939#discussion_r91962004 — Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java — @@ -513,8 +514,8 @@ public void cancel() { } private static class StatefulCounter extends RichMapFunction<Integer, Integer> implements Checkpointed<byte[]>, CheckpointListener { + extends RichMapFunction<Integer, Integer> End diff – please revert the indentation changes
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2939

          Shouldn't all the `ListCheckpointed` functions that actually snapshot something fail if they are given an empty state list?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2939 Shouldn't all the `ListCheckpointed` functions that actually snapshot something fail if they are given an empty state list?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2939

          The test should cover if something semantically wrong happened during restoring. For example, if you expected some state that never came, then the test should fail. This is not a matter of the interface and thus not part of this PR I think.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 The test should cover if something semantically wrong happened during restoring. For example, if you expected some state that never came, then the test should fail. This is not a matter of the interface and thus not part of this PR I think. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2939

          @zentol Thanks for the review. I integrated your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 @zentol Thanks for the review. I integrated your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2939

          It should be part of this PR since this case simply didn't exist before the new interface. The equivalent to no state was the state being null, at which point restore was never called in the first place. Now, if the state is null we get an empty list, afaik.

          Here is what is confusing me: Every single function checks whether the state is empty. Every one. So, there is apparently the possibility that it's empty. But the behavior for that case does not seem well-defined.

          According to the code receiving an empty state list is not a reason to fail for any of these tests.

          If this is the case we don't need to actually implement `restoreState` in the first place since it is irrelevant to the result of the test
          If this is not the case we should try to fail as early as possible by adding a failure condition.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2939 It should be part of this PR since this case simply didn't exist before the new interface. The equivalent to no state was the state being null, at which point restore was never called in the first place. Now, if the state is null we get an empty list, afaik. Here is what is confusing me: Every single function checks whether the state is empty. Every one. So, there is apparently the possibility that it's empty. But the behavior for that case does not seem well-defined. According to the code receiving an empty state list is not a reason to fail for any of these tests. If this is the case we don't need to actually implement `restoreState` in the first place since it is irrelevant to the result of the test If this is not the case we should try to fail as early as possible by adding a failure condition.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2939

          I think your argument is valid. This is how I had it before, but I changed it because it becomes too verbose, given that the `ListState.get()` just gives you an Iterable that you have to iterate over and put its elements in a list in order to check its size. But I think that it is worth doing it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 I think your argument is valid. This is how I had it before, but I changed it because it becomes too verbose, given that the `ListState.get()` just gives you an Iterable that you have to iterate over and put its elements in a list in order to check its size. But I think that it is worth doing it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2939

          Hi @zentol ! I integrated some remaining comments could you have a look?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 Hi @zentol ! I integrated some remaining comments could you have a look?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2939

          @kl0u Looks good, only thing left is to wait for travis.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2939 @kl0u Looks good, only thing left is to wait for travis.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2939

          merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2939 merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2939

          Thanks a lot @zentol .

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2939 Thanks a lot @zentol .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2939

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2939
          Hide
          Zentol Chesnay Schepler added a comment -

          master: 525edf1e6925b55302d991ddf537a2f16caba21d
          1.2: 8b069fde3adccdcd5143de90d3d4834f33b5acff

          Show
          Zentol Chesnay Schepler added a comment - master: 525edf1e6925b55302d991ddf537a2f16caba21d 1.2: 8b069fde3adccdcd5143de90d3d4834f33b5acff

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              kkl0u Kostas Kloudas
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development