Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6007

ConcurrentModificationException in WatermarkCallbackService

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      Currently, if an attempt is made to call InternalWatermarkCallbackService.unregisterKeyFromWatermarkCallback() from within the OnWatermarkCallback, a ConcurrentModificationException is thrown. The reason is that the invokeOnWatermarkCallback iterates over the list of keys and calls the callback for each one of them.

      To fix this, the deleted keys are put into a separate list, and the deletion happens after the iteration over all keys has finished.

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          Or we could duplicate the set of keys before iterating. This way, removal from the original set will be possible while iterating over the copy.

          Show
          aljoscha Aljoscha Krettek added a comment - Or we could duplicate the set of keys before iterating. This way, removal from the original set will be possible while iterating over the copy.
          Hide
          kkl0u Kostas Kloudas added a comment - - edited

          True. This way we do not have to checkpoint the deletionSet.

          Just as a "future" note. This introduces random accesses to the key set. So if we make the set of keys spillable to disk, then this can be a problem (as with the timers). Having an extra set that potentially fits in memory could be a good solution. But again, this is for the future.

          Show
          kkl0u Kostas Kloudas added a comment - - edited True. This way we do not have to checkpoint the deletionSet . Just as a "future" note. This introduces random accesses to the key set. So if we make the set of keys spillable to disk, then this can be a problem (as with the timers). Having an extra set that potentially fits in memory could be a good solution. But again, this is for the future.
          Hide
          kkl0u Kostas Kloudas added a comment -

          Aljoscha Krettek Now that I come to think about it, this solution assumes that 2 x noOfKeys fit in memory.
          The size of the deletion set is at most equal to the set of the registered keys.
          Given this, I am not sure if the second solution is the best

          Show
          kkl0u Kostas Kloudas added a comment - Aljoscha Krettek Now that I come to think about it, this solution assumes that 2 x noOfKeys fit in memory. The size of the deletion set is at most equal to the set of the registered keys. Given this, I am not sure if the second solution is the best
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/3514

          FLINK-6007 Allow key removal from within the watermark callback.

          When deleting a key from the InternalWatermarkCallbackService, the
          deleted key is put into a separate set, and the actual deletion
          happens after the iteration over all keys has finished. To avoid
          checkpointing the deletion set, the actual cleanup also happens
          upon checkpointing.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink watermark-callback-fix

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3514.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3514


          commit d3a1b6e72eb56db40638c0b0889f5277c4671b61
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-03-08T19:18:18Z

          FLINK-6007 Allow key removal from within the watermark callback.

          When deleting a key from the InternalWatermarkCallbackService, the
          deleted key is put into a separate set, and the actual deletion
          happens after the iteration over all keys has finished. To avoid
          checkpointing the deletion set, the actual cleanup also happens
          upon checkpointing.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3514 FLINK-6007 Allow key removal from within the watermark callback. When deleting a key from the InternalWatermarkCallbackService, the deleted key is put into a separate set, and the actual deletion happens after the iteration over all keys has finished. To avoid checkpointing the deletion set, the actual cleanup also happens upon checkpointing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink watermark-callback-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3514.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3514 commit d3a1b6e72eb56db40638c0b0889f5277c4671b61 Author: kl0u <kkloudas@gmail.com> Date: 2017-03-08T19:18:18Z FLINK-6007 Allow key removal from within the watermark callback. When deleting a key from the InternalWatermarkCallbackService, the deleted key is put into a separate set, and the actual deletion happens after the iteration over all keys has finished. To avoid checkpointing the deletion set, the actual cleanup also happens upon checkpointing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3514#discussion_r105648422

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java —
          @@ -58,7 +59,17 @@

          • An array of sets of keys keeping the registered keys split
          • by the key-group they belong to. Each key-group has one set.
            */
          • private final Set<K>[] keysByKeygroup;
            + private final Set<K>[] registeredKeysByKeyGroup;
            +
            + /**
            + * An array of sets of keys keeping the keys "to delete" split
            + * by the key-group they belong to. Each key-group has one set.
            + * <p>
              • End diff –

          IMHO, the formatting on paragraphs should be:
          ```
          Some text bla bla black

          <p>Other Text sdfasfaf
          ```

          on the other hand, we don't have a style guide on that and no checkstyle for checking it .... 😉

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3514#discussion_r105648422 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java — @@ -58,7 +59,17 @@ An array of sets of keys keeping the registered keys split by the key-group they belong to. Each key-group has one set. */ private final Set<K>[] keysByKeygroup; + private final Set<K>[] registeredKeysByKeyGroup; + + /** + * An array of sets of keys keeping the keys "to delete" split + * by the key-group they belong to. Each key-group has one set. + * <p> End diff – IMHO, the formatting on paragraphs should be: ``` Some text bla bla black <p>Other Text sdfasfaf ``` on the other hand, we don't have a style guide on that and no checkstyle for checking it .... 😉
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3514#discussion_r105649773

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java —
          @@ -58,7 +59,17 @@

          • An array of sets of keys keeping the registered keys split
          • by the key-group they belong to. Each key-group has one set.
            */
          • private final Set<K>[] keysByKeygroup;
            + private final Set<K>[] registeredKeysByKeyGroup;
            +
            + /**
            + * An array of sets of keys keeping the keys "to delete" split
            + * by the key-group they belong to. Each key-group has one set.
            + * <p>
              • End diff –

          Fixed this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3514#discussion_r105649773 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java — @@ -58,7 +59,17 @@ An array of sets of keys keeping the registered keys split by the key-group they belong to. Each key-group has one set. */ private final Set<K>[] keysByKeygroup; + private final Set<K>[] registeredKeysByKeyGroup; + + /** + * An array of sets of keys keeping the keys "to delete" split + * by the key-group they belong to. Each key-group has one set. + * <p> End diff – Fixed this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3514

          Thanks for the review @aljoscha . Waiting for travis and then merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3514 Thanks for the review @aljoscha . Waiting for travis and then merging.
          Hide
          kkl0u Kostas Kloudas added a comment -

          Merged at 14c1941d8eaa583eb8f7eeb5478e605850c0d355

          Show
          kkl0u Kostas Kloudas added a comment - Merged at 14c1941d8eaa583eb8f7eeb5478e605850c0d355
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3514

          Merged at 14c1941d8eaa583eb8f7eeb5478e605850c0d355

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3514 Merged at 14c1941d8eaa583eb8f7eeb5478e605850c0d355
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u closed the pull request at:

          https://github.com/apache/flink/pull/3514

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/3514

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              kkl0u Kostas Kloudas
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development