Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6165

Implement internal continuity for looping states.

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Description

      We should be able to specify an internal continuity for a looping state. The API could look like: zeroOrMore().consecutive(). So that we have a continuity up to the first element of a loop and between elements in the loop.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user dawidwys opened a pull request:

          https://github.com/apache/flink/pull/3621

          FLINK-6165 [cep] Implement internal continuity for looping states.

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/dawidwys/flink consecutive

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3621.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3621


          commit 2ec2db3742596459e8c2cd60529f0319ad2e7776
          Author: Dawid Wysakowicz <dawid@getindata.com>
          Date: 2017-03-27T13:05:11Z

          FLINK-6165 Implement internal continuity for looping states.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/3621 FLINK-6165 [cep] Implement internal continuity for looping states. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink consecutive Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3621.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3621 commit 2ec2db3742596459e8c2cd60529f0319ad2e7776 Author: Dawid Wysakowicz <dawid@getindata.com> Date: 2017-03-27T13:05:11Z FLINK-6165 Implement internal continuity for looping states.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108619604

          — Diff: docs/dev/libs/cep.md —
          @@ -544,6 +579,24 @@ patternState.within(Time.seconds(10))

          {% endhighlight %}

          </td>
          </tr>
          + <tr>
          + <td><strong>Consecutive</strong></td>
          + <td>
          + <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p>
          — End diff –

          Same comments as for the Java documentation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108619604 — Diff: docs/dev/libs/cep.md — @@ -544,6 +579,24 @@ patternState.within(Time.seconds(10)) {% endhighlight %} </td> </tr> + <tr> + <td><strong>Consecutive</strong></td> + <td> + <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p> — End diff – Same comments as for the Java documentation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108620915

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -192,6 +191,30 @@ long getWindowTime()

          { return lastSink; }

          + /**
          + * Creates a pair of states that enables relaxed strictness before a zeroOrMora looping state.
          — End diff –

          Typo "zeroOrMora" -> "zeroOrMore"

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108620915 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -192,6 +191,30 @@ long getWindowTime() { return lastSink; } + /** + * Creates a pair of states that enables relaxed strictness before a zeroOrMora looping state. — End diff – Typo "zeroOrMora" -> "zeroOrMore"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108622716

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java —
          @@ -1659,9 +1658,345 @@ public boolean filter(Event value) throws Exception

          { ), resultingPatterns); }
          • /**
          • * Clearing SharedBuffer
          • */
            +
            + /////////////////////////////// Consecutive ////////////////////////////////////////
            +
              • End diff –

          I would add also tests about: timesNonStrict, startWithZeroOrMoreString, startWithOneOrMoreStrict.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108622716 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java — @@ -1659,9 +1658,345 @@ public boolean filter(Event value) throws Exception { ), resultingPatterns); } /** * Clearing SharedBuffer */ + + /////////////////////////////// Consecutive //////////////////////////////////////// + End diff – I would add also tests about: timesNonStrict, startWithZeroOrMoreString, startWithOneOrMoreStrict.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108621618

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java —
          @@ -267,6 +267,76 @@ public int getTimes() {
          }

          /**
          + * Works in conjunction with

          {@link Pattern#zeroOrMore()}

          ,

          {@link Pattern#oneOrMore()}

          or

          {@link Pattern#times(int)}

          .
          + * Specifies that any not matching element breaks the loop.
          + *
          + * <p>E.g. a pattern like:
          + * <pre>{@code
          + * Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
          + * @Override
          + * public boolean filter(Event value) throws Exception

          { + * return value.getName().equals("c"); + * }

          + * })
          + * .followedBy("middle").where(new FilterFunction<Event>() {
          + * @Override
          + * public boolean filter(Event value) throws Exception

          { + * return value.getName().equals("a"); + * }

          + * })
          + * }<b>.oneOrMore(true).consecutive()</b>{@code
          + * .followedBy("end1").where(new FilterFunction<Event>() {
          + * @Override
          + * public boolean filter(Event value) throws Exception

          { + * return value.getName().equals("b"); + * }

          + * });
          + * }</pre>
          + *
          + * <p>for a sequence: C D A1 A2 A3 D A4 B
          + *
          + * <p>will generate matches:

          {C A1 B}

          ,

          {C A1 A2 B}

          ,

          {C A1 A2 A3 B}

          + *
          + * <p><b>NOTICE:</b> This operator can be applied only when either zeroOrMore,
          + * oneOrMore or times was previously applied!
          + *
          + * <p>By default a relaxed continuity is applied.
          + *
          + * @return pattern with continuity changed to strict
          + */
          + public Pattern<T, F> consecutive() {
          + switch (this.quantifier) {
          +
          + case ZERO_OR_MORE_EAGER:
          + this.quantifier = Quantifier.ZERO_OR_MORE_EAGER_STRICT;
          + break;
          + case ZERO_OR_MORE_COMBINATIONS:
          + this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS_STRICT;
          + break;
          + case ONE_OR_MORE_EAGER:
          + this.quantifier = Quantifier.ONE_OR_MORE_EAGER_STRICT;
          + break;
          + case ONE_OR_MORE_COMBINATIONS:
          + this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS_STRICT;
          + break;
          + case TIMES:
          + this.quantifier = Quantifier.TIMES_STRICT;
          + break;
          + case ZERO_OR_MORE_COMBINATIONS_STRICT:
          + case ONE_OR_MORE_EAGER_STRICT:
          + case ONE_OR_MORE_COMBINATIONS_STRICT:
          + case ZERO_OR_MORE_EAGER_STRICT:
          + case TIMES_STRICT:
          + throw new MalformedPatternException("Strict continuity already applied!");
          — End diff –

          Here we should not through an exception, as it is just redundant, not wrong, right? So we could just ignore it. This will also simplify the code of the method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108621618 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java — @@ -267,6 +267,76 @@ public int getTimes() { } /** + * Works in conjunction with {@link Pattern#zeroOrMore()} , {@link Pattern#oneOrMore()} or {@link Pattern#times(int)} . + * Specifies that any not matching element breaks the loop. + * + * <p>E.g. a pattern like: + * <pre>{@code + * Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + * @Override + * public boolean filter(Event value) throws Exception { + * return value.getName().equals("c"); + * } + * }) + * .followedBy("middle").where(new FilterFunction<Event>() { + * @Override + * public boolean filter(Event value) throws Exception { + * return value.getName().equals("a"); + * } + * }) + * }<b>.oneOrMore(true).consecutive()</b>{@code + * .followedBy("end1").where(new FilterFunction<Event>() { + * @Override + * public boolean filter(Event value) throws Exception { + * return value.getName().equals("b"); + * } + * }); + * }</pre> + * + * <p>for a sequence: C D A1 A2 A3 D A4 B + * + * <p>will generate matches: {C A1 B} , {C A1 A2 B} , {C A1 A2 A3 B} + * + * <p><b>NOTICE:</b> This operator can be applied only when either zeroOrMore, + * oneOrMore or times was previously applied! + * + * <p>By default a relaxed continuity is applied. + * + * @return pattern with continuity changed to strict + */ + public Pattern<T, F> consecutive() { + switch (this.quantifier) { + + case ZERO_OR_MORE_EAGER: + this.quantifier = Quantifier.ZERO_OR_MORE_EAGER_STRICT; + break; + case ZERO_OR_MORE_COMBINATIONS: + this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS_STRICT; + break; + case ONE_OR_MORE_EAGER: + this.quantifier = Quantifier.ONE_OR_MORE_EAGER_STRICT; + break; + case ONE_OR_MORE_COMBINATIONS: + this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS_STRICT; + break; + case TIMES: + this.quantifier = Quantifier.TIMES_STRICT; + break; + case ZERO_OR_MORE_COMBINATIONS_STRICT: + case ONE_OR_MORE_EAGER_STRICT: + case ONE_OR_MORE_COMBINATIONS_STRICT: + case ZERO_OR_MORE_EAGER_STRICT: + case TIMES_STRICT: + throw new MalformedPatternException("Strict continuity already applied!"); — End diff – Here we should not through an exception, as it is just redundant, not wrong, right? So we could just ignore it. This will also simplify the code of the method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108619510

          — Diff: docs/dev/libs/cep.md —
          @@ -429,6 +429,41 @@ patternState.within(Time.seconds(10));

          {% endhighlight %}

          </td>
          </tr>
          + <tr>
          + <td><strong>Consecutive</strong></td>
          + <td>
          + <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p>
          — End diff –

          1) Mention here that the default behavior is relaxed continuity. Same for Scala. Also the default behavior, I would also put it in the docs for `oneToMore`, `zeroToMore`, and `times` and have a forward reference to `consecutive`.
          2) At the end where you have the example, you could also mention that in the case of the default, we would have ...
          3) And also now it is an `SimpleCondition`, not a `FilterFunction`

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108619510 — Diff: docs/dev/libs/cep.md — @@ -429,6 +429,41 @@ patternState.within(Time.seconds(10)); {% endhighlight %} </td> </tr> + <tr> + <td><strong>Consecutive</strong></td> + <td> + <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p> — End diff – 1) Mention here that the default behavior is relaxed continuity. Same for Scala. Also the default behavior, I would also put it in the docs for `oneToMore`, `zeroToMore`, and `times` and have a forward reference to `consecutive`. 2) At the end where you have the example, you could also mention that in the case of the default, we would have ... 3) And also now it is an `SimpleCondition`, not a `FilterFunction`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108621295

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -316,49 +341,45 @@ private void convertToSingletonState(final State<T> sourceState, final State<T>
          }

          /**

          • * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
            + * Creates the given state as a looping one. Looping state is one with TAKE edge to itself and
          • PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
          • for each PROCEED transition branches in computation state graph can be created only once.
            *
          • * <p>If this looping state is first of a graph we should treat the {@link Pattern}

            as

            {@link FollowedByPattern}
          • * to enable combinations.
          • *
          • * @param sourceState the state to converted
          • * @param sinkState the state that the converted state should point to
          • * @param isFirstState if the looping state is first of a graph
            + * @param sinkState the state that the converted state should point to
            + * @return the first state of the created complex state
            */
            @SuppressWarnings("unchecked")
          • private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) {
            + private State<T> createLooping(final State<T> sinkState) {

          + final State<T> loopingState = createNormalState();
          final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition();

          • final IterativeCondition<T> trueFunction = BooleanConditions.<T>trueFunction();
            + final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
          • sourceState.addProceed(sinkState, trueFunction);
          • sourceState.addTake(filterFunction);
          • if (currentPattern instanceof FollowedByPattern || isFirstState) {
          • final State<T> ignoreState = new State<>(
          • currentPattern.getName(),
          • State.StateType.Normal);
            + loopingState.addProceed(sinkState, trueFunction);
            + loopingState.addTake(filterFunction);
            + if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) {
            + final State<T> ignoreState = createNormalState();

          final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);

          • sourceState.addIgnore(ignoreState, ignoreCondition);
          • ignoreState.addTake(sourceState, filterFunction);
            + ignoreState.addTake(loopingState, filterFunction);
            ignoreState.addIgnore(ignoreState, ignoreCondition);
              • End diff –

          here you could remove the first argument and use the `void addIgnore(final FilterFunction<T> condition)` right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108621295 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -316,49 +341,45 @@ private void convertToSingletonState(final State<T> sourceState, final State<T> } /** * Converts the given state into looping one. Looping state is one with TAKE edge to itself and + * Creates the given state as a looping one. Looping state is one with TAKE edge to itself and PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that for each PROCEED transition branches in computation state graph can be created only once. * * <p>If this looping state is first of a graph we should treat the {@link Pattern} as {@link FollowedByPattern} * to enable combinations. * * @param sourceState the state to converted * @param sinkState the state that the converted state should point to * @param isFirstState if the looping state is first of a graph + * @param sinkState the state that the converted state should point to + * @return the first state of the created complex state */ @SuppressWarnings("unchecked") private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) { + private State<T> createLooping(final State<T> sinkState) { + final State<T> loopingState = createNormalState(); final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition(); final IterativeCondition<T> trueFunction = BooleanConditions.<T>trueFunction(); + final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); sourceState.addProceed(sinkState, trueFunction); sourceState.addTake(filterFunction); if (currentPattern instanceof FollowedByPattern || isFirstState) { final State<T> ignoreState = new State<>( currentPattern.getName(), State.StateType.Normal); + loopingState.addProceed(sinkState, trueFunction); + loopingState.addTake(filterFunction); + if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) { + final State<T> ignoreState = createNormalState(); final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern); sourceState.addIgnore(ignoreState, ignoreCondition); ignoreState.addTake(sourceState, filterFunction); + ignoreState.addTake(loopingState, filterFunction); ignoreState.addIgnore(ignoreState, ignoreCondition); End diff – here you could remove the first argument and use the `void addIgnore(final FilterFunction<T> condition)` right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108630386

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -316,49 +341,45 @@ private void convertToSingletonState(final State<T> sourceState, final State<T>
          }

          /**

          • * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
            + * Creates the given state as a looping one. Looping state is one with TAKE edge to itself and
          • PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
          • for each PROCEED transition branches in computation state graph can be created only once.
            *
          • * <p>If this looping state is first of a graph we should treat the {@link Pattern}

            as

            {@link FollowedByPattern}
          • * to enable combinations.
          • *
          • * @param sourceState the state to converted
          • * @param sinkState the state that the converted state should point to
          • * @param isFirstState if the looping state is first of a graph
            + * @param sinkState the state that the converted state should point to
            + * @return the first state of the created complex state
            */
            @SuppressWarnings("unchecked")
          • private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) {
            + private State<T> createLooping(final State<T> sinkState) {

          + final State<T> loopingState = createNormalState();
          final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition();

          • final IterativeCondition<T> trueFunction = BooleanConditions.<T>trueFunction();
            + final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
          • sourceState.addProceed(sinkState, trueFunction);
          • sourceState.addTake(filterFunction);
          • if (currentPattern instanceof FollowedByPattern || isFirstState) {
          • final State<T> ignoreState = new State<>(
          • currentPattern.getName(),
          • State.StateType.Normal);
            + loopingState.addProceed(sinkState, trueFunction);
            + loopingState.addTake(filterFunction);
            + if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) {
            + final State<T> ignoreState = createNormalState();

          final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);

          • sourceState.addIgnore(ignoreState, ignoreCondition);
          • ignoreState.addTake(sourceState, filterFunction);
            + ignoreState.addTake(loopingState, filterFunction);
            ignoreState.addIgnore(ignoreState, ignoreCondition);
              • End diff –

          right, fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108630386 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -316,49 +341,45 @@ private void convertToSingletonState(final State<T> sourceState, final State<T> } /** * Converts the given state into looping one. Looping state is one with TAKE edge to itself and + * Creates the given state as a looping one. Looping state is one with TAKE edge to itself and PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that for each PROCEED transition branches in computation state graph can be created only once. * * <p>If this looping state is first of a graph we should treat the {@link Pattern} as {@link FollowedByPattern} * to enable combinations. * * @param sourceState the state to converted * @param sinkState the state that the converted state should point to * @param isFirstState if the looping state is first of a graph + * @param sinkState the state that the converted state should point to + * @return the first state of the created complex state */ @SuppressWarnings("unchecked") private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) { + private State<T> createLooping(final State<T> sinkState) { + final State<T> loopingState = createNormalState(); final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition(); final IterativeCondition<T> trueFunction = BooleanConditions.<T>trueFunction(); + final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); sourceState.addProceed(sinkState, trueFunction); sourceState.addTake(filterFunction); if (currentPattern instanceof FollowedByPattern || isFirstState) { final State<T> ignoreState = new State<>( currentPattern.getName(), State.StateType.Normal); + loopingState.addProceed(sinkState, trueFunction); + loopingState.addTake(filterFunction); + if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) { + final State<T> ignoreState = createNormalState(); final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern); sourceState.addIgnore(ignoreState, ignoreCondition); ignoreState.addTake(sourceState, filterFunction); + ignoreState.addTake(loopingState, filterFunction); ignoreState.addIgnore(ignoreState, ignoreCondition); End diff – right, fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108630252

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -192,6 +191,30 @@ long getWindowTime()

          { return lastSink; }

          + /**
          + * Creates a pair of states that enables relaxed strictness before a zeroOrMora looping state.
          — End diff –

          done

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108630252 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -192,6 +191,30 @@ long getWindowTime() { return lastSink; } + /** + * Creates a pair of states that enables relaxed strictness before a zeroOrMora looping state. — End diff – done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108649205

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java —
          @@ -267,6 +267,76 @@ public int getTimes() {
          }

          /**
          + * Works in conjunction with

          {@link Pattern#zeroOrMore()}

          ,

          {@link Pattern#oneOrMore()}

          or

          {@link Pattern#times(int)}

          .
          + * Specifies that any not matching element breaks the loop.
          + *
          + * <p>E.g. a pattern like:
          + * <pre>{@code
          + * Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
          + * @Override
          + * public boolean filter(Event value) throws Exception

          { + * return value.getName().equals("c"); + * }

          + * })
          + * .followedBy("middle").where(new FilterFunction<Event>() {
          + * @Override
          + * public boolean filter(Event value) throws Exception

          { + * return value.getName().equals("a"); + * }

          + * })
          + * }<b>.oneOrMore(true).consecutive()</b>{@code
          + * .followedBy("end1").where(new FilterFunction<Event>() {
          + * @Override
          + * public boolean filter(Event value) throws Exception

          { + * return value.getName().equals("b"); + * }

          + * });
          + * }</pre>
          + *
          + * <p>for a sequence: C D A1 A2 A3 D A4 B
          + *
          + * <p>will generate matches:

          {C A1 B}

          ,

          {C A1 A2 B}

          ,

          {C A1 A2 A3 B}

          + *
          + * <p><b>NOTICE:</b> This operator can be applied only when either zeroOrMore,
          + * oneOrMore or times was previously applied!
          + *
          + * <p>By default a relaxed continuity is applied.
          + *
          + * @return pattern with continuity changed to strict
          + */
          + public Pattern<T, F> consecutive() {
          + switch (this.quantifier) {
          +
          + case ZERO_OR_MORE_EAGER:
          + this.quantifier = Quantifier.ZERO_OR_MORE_EAGER_STRICT;
          + break;
          + case ZERO_OR_MORE_COMBINATIONS:
          + this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS_STRICT;
          + break;
          + case ONE_OR_MORE_EAGER:
          + this.quantifier = Quantifier.ONE_OR_MORE_EAGER_STRICT;
          + break;
          + case ONE_OR_MORE_COMBINATIONS:
          + this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS_STRICT;
          + break;
          + case TIMES:
          + this.quantifier = Quantifier.TIMES_STRICT;
          + break;
          + case ZERO_OR_MORE_COMBINATIONS_STRICT:
          + case ONE_OR_MORE_EAGER_STRICT:
          + case ONE_OR_MORE_COMBINATIONS_STRICT:
          + case ZERO_OR_MORE_EAGER_STRICT:
          + case TIMES_STRICT:
          + throw new MalformedPatternException("Strict continuity already applied!");
          — End diff –

          Ok after your second comment I changed just the message of the exception

          As a side note I was considering changing the quantifier field to EnumSet<QuantifierProperty> but then we would not have a clearly defined valid combinations of properties (we would have to maintain it at the API level). This method though would be as simple as .add(QuantifierProperty.STRICT)

          @kl0u What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108649205 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java — @@ -267,6 +267,76 @@ public int getTimes() { } /** + * Works in conjunction with {@link Pattern#zeroOrMore()} , {@link Pattern#oneOrMore()} or {@link Pattern#times(int)} . + * Specifies that any not matching element breaks the loop. + * + * <p>E.g. a pattern like: + * <pre>{@code + * Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + * @Override + * public boolean filter(Event value) throws Exception { + * return value.getName().equals("c"); + * } + * }) + * .followedBy("middle").where(new FilterFunction<Event>() { + * @Override + * public boolean filter(Event value) throws Exception { + * return value.getName().equals("a"); + * } + * }) + * }<b>.oneOrMore(true).consecutive()</b>{@code + * .followedBy("end1").where(new FilterFunction<Event>() { + * @Override + * public boolean filter(Event value) throws Exception { + * return value.getName().equals("b"); + * } + * }); + * }</pre> + * + * <p>for a sequence: C D A1 A2 A3 D A4 B + * + * <p>will generate matches: {C A1 B} , {C A1 A2 B} , {C A1 A2 A3 B} + * + * <p><b>NOTICE:</b> This operator can be applied only when either zeroOrMore, + * oneOrMore or times was previously applied! + * + * <p>By default a relaxed continuity is applied. + * + * @return pattern with continuity changed to strict + */ + public Pattern<T, F> consecutive() { + switch (this.quantifier) { + + case ZERO_OR_MORE_EAGER: + this.quantifier = Quantifier.ZERO_OR_MORE_EAGER_STRICT; + break; + case ZERO_OR_MORE_COMBINATIONS: + this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS_STRICT; + break; + case ONE_OR_MORE_EAGER: + this.quantifier = Quantifier.ONE_OR_MORE_EAGER_STRICT; + break; + case ONE_OR_MORE_COMBINATIONS: + this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS_STRICT; + break; + case TIMES: + this.quantifier = Quantifier.TIMES_STRICT; + break; + case ZERO_OR_MORE_COMBINATIONS_STRICT: + case ONE_OR_MORE_EAGER_STRICT: + case ONE_OR_MORE_COMBINATIONS_STRICT: + case ZERO_OR_MORE_EAGER_STRICT: + case TIMES_STRICT: + throw new MalformedPatternException("Strict continuity already applied!"); — End diff – Ok after your second comment I changed just the message of the exception As a side note I was considering changing the quantifier field to EnumSet<QuantifierProperty> but then we would not have a clearly defined valid combinations of properties (we would have to maintain it at the API level). This method though would be as simple as .add(QuantifierProperty.STRICT) @kl0u What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108641124

          — Diff: docs/dev/libs/cep.md —
          @@ -429,6 +429,41 @@ patternState.within(Time.seconds(10));

          {% endhighlight %}

          </td>
          </tr>
          + <tr>
          + <td><strong>Consecutive</strong></td>
          + <td>
          + <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p>
          — End diff –

          fixed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108641124 — Diff: docs/dev/libs/cep.md — @@ -429,6 +429,41 @@ patternState.within(Time.seconds(10)); {% endhighlight %} </td> </tr> + <tr> + <td><strong>Consecutive</strong></td> + <td> + <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p> — End diff – fixed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108641082

          — Diff: docs/dev/libs/cep.md —
          @@ -544,6 +579,24 @@ patternState.within(Time.seconds(10))

          {% endhighlight %}

          </td>
          </tr>
          + <tr>
          + <td><strong>Consecutive</strong></td>
          + <td>
          + <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p>
          — End diff –

          fixed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108641082 — Diff: docs/dev/libs/cep.md — @@ -544,6 +579,24 @@ patternState.within(Time.seconds(10)) {% endhighlight %} </td> </tr> + <tr> + <td><strong>Consecutive</strong></td> + <td> + <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p> — End diff – fixed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3621#discussion_r108649050

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java —
          @@ -1659,9 +1658,345 @@ public boolean filter(Event value) throws Exception

          { ), resultingPatterns); }
          • /**
          • * Clearing SharedBuffer
          • */
            +
            + /////////////////////////////// Consecutive ////////////////////////////////////////
            +
              • End diff –

          done. Also done some refactoring to remove duplicate code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3621#discussion_r108649050 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java — @@ -1659,9 +1658,345 @@ public boolean filter(Event value) throws Exception { ), resultingPatterns); } /** * Clearing SharedBuffer */ + + /////////////////////////////// Consecutive //////////////////////////////////////// + End diff – done. Also done some refactoring to remove duplicate code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3621

          Good work @dawidwys ! I am waiting for travis and then I will merge!

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3621 Good work @dawidwys ! I am waiting for travis and then I will merge!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3621

          Merged this @dawidwys . Could you close this PR and the related JIRA?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3621 Merged this @dawidwys . Could you close this PR and the related JIRA?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys closed the pull request at:

          https://github.com/apache/flink/pull/3621

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys closed the pull request at: https://github.com/apache/flink/pull/3621

            People

            • Assignee:
              dawidwys Dawid Wysakowicz
              Reporter:
              dawidwys Dawid Wysakowicz
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development