Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6032

CEP-Clean up the operator state when not needed.

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Issue Links

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tedyu commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3541#discussion_r112071560

        — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java —
        @@ -31,6 +31,18 @@ public double getVolume() {
        }

        @Override
        + public boolean equals(Object obj)

        { + return obj instanceof SubEvent && + super.equals(obj) && + ((SubEvent) obj).volume == volume; + }

        +
        + @Override
        + public int hashCode() {
        + return super.hashCode() + (int) volume;
        — End diff –

        Common practice is to multiply super.hashCode() by a prime (e.g. 37)

        Show
        githubbot ASF GitHub Bot added a comment - Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r112071560 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java — @@ -31,6 +31,18 @@ public double getVolume() { } @Override + public boolean equals(Object obj) { + return obj instanceof SubEvent && + super.equals(obj) && + ((SubEvent) obj).volume == volume; + } + + @Override + public int hashCode() { + return super.hashCode() + (int) volume; — End diff – Common practice is to multiply super.hashCode() by a prime (e.g. 37)
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tedyu commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3541#discussion_r112071274

        — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java —
        @@ -385,4 +393,25 @@ public int hashCode()

        { return getClass().hashCode(); }

        }
        +
        + ////////////////////// Testing Methods //////////////////////
        +
        + @VisibleForTesting
        + public boolean hasNonEmptyNFA(KEY key) throws IOException

        { + setCurrentKey(key); + return nfaOperatorState.value() != null; + }

        +
        + @VisibleForTesting
        + public boolean hasNonEmptyPQ(KEY key) throws IOException {
        — End diff –

        These 3 methods can be declared package private.

        Show
        githubbot ASF GitHub Bot added a comment - Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r112071274 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java — @@ -385,4 +393,25 @@ public int hashCode() { return getClass().hashCode(); } } + + ////////////////////// Testing Methods ////////////////////// + + @VisibleForTesting + public boolean hasNonEmptyNFA(KEY key) throws IOException { + setCurrentKey(key); + return nfaOperatorState.value() != null; + } + + @VisibleForTesting + public boolean hasNonEmptyPQ(KEY key) throws IOException { — End diff – These 3 methods can be declared package private.
        Hide
        kkl0u Kostas Kloudas added a comment -

        Merged with e5057b72c0749a75578665c4c86a47be33382b4a

        Show
        kkl0u Kostas Kloudas added a comment - Merged with e5057b72c0749a75578665c4c86a47be33382b4a
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/flink/pull/3541

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3541
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on the issue:

        https://github.com/apache/flink/pull/3541

        Thanks for the review @dawidwys ! I will rebase and merge.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3541 Thanks for the review @dawidwys ! I will rebase and merge.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dawidwys commented on the issue:

        https://github.com/apache/flink/pull/3541

        Beside the previous comment, looks good for me Really liked the comments on `NFA`

        Show
        githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/3541 Beside the previous comment, looks good for me Really liked the comments on `NFA`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3541#discussion_r106152948

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java —
        @@ -153,6 +153,7 @@ public void invokeOnWatermarkCallback(Watermark watermark) throws IOException {
        }
        }
        }
        + cleanupRegisteredKeys();
        — End diff –

        Actually I integrated your comment and I pushed a new commit.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r106152948 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java — @@ -153,6 +153,7 @@ public void invokeOnWatermarkCallback(Watermark watermark) throws IOException { } } } + cleanupRegisteredKeys(); — End diff – Actually I integrated your comment and I pushed a new commit.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3541#discussion_r106133528

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java —
        @@ -153,6 +153,7 @@ public void invokeOnWatermarkCallback(Watermark watermark) throws IOException {
        }
        }
        }
        + cleanupRegisteredKeys();
        — End diff –

        Yes unfortunately it is just for testing. I would like to get rid of it because it implies an additional iteration. You can comment it and see which test fails in the `CEPOperatorsTest`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r106133528 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java — @@ -153,6 +153,7 @@ public void invokeOnWatermarkCallback(Watermark watermark) throws IOException { } } } + cleanupRegisteredKeys(); — End diff – Yes unfortunately it is just for testing. I would like to get rid of it because it implies an additional iteration. You can comment it and see which test fails in the `CEPOperatorsTest`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dawidwys commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3541#discussion_r106133141

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java —
        @@ -153,6 +153,7 @@ public void invokeOnWatermarkCallback(Watermark watermark) throws IOException {
        }
        }
        }
        + cleanupRegisteredKeys();
        — End diff –

        Is it directly related to this PR?

        Show
        githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3541#discussion_r106133141 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java — @@ -153,6 +153,7 @@ public void invokeOnWatermarkCallback(Watermark watermark) throws IOException { } } } + cleanupRegisteredKeys(); — End diff – Is it directly related to this PR?
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user kl0u opened a pull request:

        https://github.com/apache/flink/pull/3541

        FLINK-6032 [cep] Clean-up operator state when not needed.

        The CEP operator now cleans the registered state for a key. This happens:
        1) for the priority queue, when the queue is empty.
        2) for the NFA, when its shared buffer is empty.
        3) the key is removed from the `InternalWatermarkCallbackService` when both of the above are empty.

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/kl0u/flink cep-state-cleanup

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3541.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3541


        commit 7a5d5a348d0784599f96f9fa8d12dce72924814f
        Author: kl0u <kkloudas@gmail.com>
        Date: 2017-03-13T19:36:57Z

        FLINK-6032 [cep] Clean-up operator state when not needed.

        The CEP operator now cleans the registered state for a
        key. This happens:
        1) for the priority queue, when the queue is empty.
        2) for the NFA, when its shared buffer is empty.


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3541 FLINK-6032 [cep] Clean-up operator state when not needed. The CEP operator now cleans the registered state for a key. This happens: 1) for the priority queue, when the queue is empty. 2) for the NFA, when its shared buffer is empty. 3) the key is removed from the `InternalWatermarkCallbackService` when both of the above are empty. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-state-cleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3541.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3541 commit 7a5d5a348d0784599f96f9fa8d12dce72924814f Author: kl0u <kkloudas@gmail.com> Date: 2017-03-13T19:36:57Z FLINK-6032 [cep] Clean-up operator state when not needed. The CEP operator now cleans the registered state for a key. This happens: 1) for the priority queue, when the queue is empty. 2) for the NFA, when its shared buffer is empty.

          People

          • Assignee:
            kkl0u Kostas Kloudas
            Reporter:
            kkl0u Kostas Kloudas
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development