Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6418

Support for dynamic state changes in CEP patterns

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.4.0
    • Component/s: CEP
    • Labels:
      None

      Description

      Flink CEP library allows one to define event pattern to match where the match condition can be determined programmatically via the where method. Flink 1.3 will introduce so-called iterative conditions, which allow the predicate to look up events already matched by the pattern and thus be conditional on them.

      1.3 also introduces to the API quantifer methods which allow one to declaratively specific how many times a condition must be matched before there is a state change.

      Alas, there are use cases where the quantifier must be determined dynamically based on the events matched by the pattern so far. Therefore, I propose the adding of a new Pattern: until.

      Like the new iterative variant of where, until would take a predicate function and a context that provides access to events already matched. But whereas where determines if an event is accepted by the pattern, until determines whether is pattern should move on to the next state.

      In our particular use case, we have a pattern where an event is matched a number of times, but depending on the event type, the number (threshold) for the pattern to match is different. We could decompose the pattern into multiple similar patterns, but that could be inefficient if we have many such patterns. If the functionality of until were available, we could make do with a single pattern.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4143

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4143
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/4143

          merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4143 merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/4143

          LGTM. +1 to merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4143 LGTM. +1 to merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4143#discussion_r123713960

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -514,25 +524,46 @@ private void addStopStateToLooping(final State<T> loopingState) {
          */
          @SuppressWarnings("unchecked")
          private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) {

          • final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
            + final IterativeCondition<T> takeCondition = extendWithUntilCondition(
            + (IterativeCondition<T>) currentPattern.getCondition(),
            + (IterativeCondition<T>) currentPattern.getUntilCondition()
            + );

          final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal);
          firstState.addProceed(lastSink, BooleanConditions.<T>trueFunction());

          • firstState.addTake(loopingState, currentCondition);
            + firstState.addTake(loopingState, takeCondition);

          final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
          if (ignoreFunction != null)

          { final State<T> firstStateWithoutProceed = createState(currentPattern.getName(), State.StateType.Normal); firstState.addIgnore(firstStateWithoutProceed, ignoreFunction); firstStateWithoutProceed.addIgnore(ignoreFunction); - firstStateWithoutProceed.addTake(loopingState, currentCondition); + firstStateWithoutProceed.addTake(loopingState, takeCondition); addStopStates(firstStateWithoutProceed); }

          return firstState;
          }

          /**
          + * This method extends the given condition with stop(until) condition if necessary.
          + * The until condition needs to be applied only if both of the given conditions are not null.
          + *
          + * @param condition the condition to extend
          + * @param untilCondition the until condition to join with the given condition
          + * @return condition with AND applied or the original condition
          + */
          + private IterativeCondition<T> extendWithUntilCondition(
          + IterativeCondition<T> condition,
          + IterativeCondition<T> untilCondition) {
          + if (untilCondition != null && condition != null)

          { + return new AndCondition<>(new NotCondition<>(untilCondition), condition); + }

          else

          { + return condition; + }

          + }
          +
          — End diff –

          The way this is, now you do not allow patterns with no condition and only until condition, like:

          ```
          Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
          private static final long serialVersionUID = 5726188262756267490L;

          @Override
          public boolean filter(Event value) throws Exception

          { return value.getName().equals("c"); }
          }).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
          ```

          To do this, I think that transforming the method to the following will do the job:

          ```
          if (untilCondition != null && condition != null) { return new AndCondition<>(new NotCondition<>(untilCondition), condition); }
          if (condition != null) { return condition; }
          if (untilCondition != null) { return new NotCondition<>(untilCondition); }
          return null;
          ```

          And to do this properly, you could also add a test in the `UntilConditionITCase` like:

          ```
          List<StreamRecord<Event>> inputEvents = new ArrayList<>();

          Event startEvent = new Event(40, "c", 1.0);
          Event middleEvent1 = new Event(41, "a", 2.0);
          Event middleEvent2 = new Event(42, "a", 3.0);
          Event startEvent2 = new Event(40, "d", 1.0);
          Event breaking = new Event(44, "a", 5.0);
          Event ignored = new Event(45, "a", 6.0);

          inputEvents.add(new StreamRecord<>(startEvent, 1));
          inputEvents.add(new StreamRecord<>(middleEvent1, 3));
          inputEvents.add(new StreamRecord<>(middleEvent2, 4));
          inputEvents.add(new StreamRecord<>(startEvent2, 4));
          inputEvents.add(new StreamRecord<>(breaking, 6));
          inputEvents.add(new StreamRecord<>(ignored, 7));

          Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
          private static final long serialVersionUID = 5726188262756267490L;

          @Override
          public boolean filter(Event value) throws Exception { return value.getName().equals("c"); }

          }).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);

          NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);

          final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
          ```

          And you could also add one for the case that this method returns `null`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4143#discussion_r123713960 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -514,25 +524,46 @@ private void addStopStateToLooping(final State<T> loopingState) { */ @SuppressWarnings("unchecked") private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) { final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition(); + final IterativeCondition<T> takeCondition = extendWithUntilCondition( + (IterativeCondition<T>) currentPattern.getCondition(), + (IterativeCondition<T>) currentPattern.getUntilCondition() + ); final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal); firstState.addProceed(lastSink, BooleanConditions.<T>trueFunction()); firstState.addTake(loopingState, currentCondition); + firstState.addTake(loopingState, takeCondition); final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern); if (ignoreFunction != null) { final State<T> firstStateWithoutProceed = createState(currentPattern.getName(), State.StateType.Normal); firstState.addIgnore(firstStateWithoutProceed, ignoreFunction); firstStateWithoutProceed.addIgnore(ignoreFunction); - firstStateWithoutProceed.addTake(loopingState, currentCondition); + firstStateWithoutProceed.addTake(loopingState, takeCondition); addStopStates(firstStateWithoutProceed); } return firstState; } /** + * This method extends the given condition with stop(until) condition if necessary. + * The until condition needs to be applied only if both of the given conditions are not null. + * + * @param condition the condition to extend + * @param untilCondition the until condition to join with the given condition + * @return condition with AND applied or the original condition + */ + private IterativeCondition<T> extendWithUntilCondition( + IterativeCondition<T> condition, + IterativeCondition<T> untilCondition) { + if (untilCondition != null && condition != null) { + return new AndCondition<>(new NotCondition<>(untilCondition), condition); + } else { + return condition; + } + } + — End diff – The way this is, now you do not allow patterns with no condition and only until condition, like: ``` Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("middle").oneOrMore().until(UNTIL_CONDITION); ``` To do this, I think that transforming the method to the following will do the job: ``` if (untilCondition != null && condition != null) { return new AndCondition<>(new NotCondition<>(untilCondition), condition); } if (condition != null) { return condition; } if (untilCondition != null) { return new NotCondition<>(untilCondition); } return null; ``` And to do this properly, you could also add a test in the `UntilConditionITCase` like: ``` List<StreamRecord<Event>> inputEvents = new ArrayList<>(); Event startEvent = new Event(40, "c", 1.0); Event middleEvent1 = new Event(41, "a", 2.0); Event middleEvent2 = new Event(42, "a", 3.0); Event startEvent2 = new Event(40, "d", 1.0); Event breaking = new Event(44, "a", 5.0); Event ignored = new Event(45, "a", 6.0); inputEvents.add(new StreamRecord<>(startEvent, 1)); inputEvents.add(new StreamRecord<>(middleEvent1, 3)); inputEvents.add(new StreamRecord<>(middleEvent2, 4)); inputEvents.add(new StreamRecord<>(startEvent2, 4)); inputEvents.add(new StreamRecord<>(breaking, 6)); inputEvents.add(new StreamRecord<>(ignored, 7)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }).followedBy("middle").oneOrMore().until(UNTIL_CONDITION); NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); ``` And you could also add one for the case that this method returns `null`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4143#discussion_r123715389

          — Diff: docs/dev/libs/cep.md —
          @@ -334,6 +334,20 @@ pattern.where(event => ... /* some condition /).or(event => ... / or condition
          </div>
          </div>

          +
          +*Stop condition:* In case of kleene operators (`oneOrMore()` and `oneOrMore().optional()`) you can
          +also specify a stop condition, e.g. accept events with value larger than 5 until the sum of values is smaller than 50.
          — End diff –

          In general, I would not use the "kleene" term, as we do not use it throughout the rest of the docs.
          You can say sth like "looping" or simply oneOrMore.
          In addition, it would also be more consistent to not use "operator" but "pattern". In the docs we have "pattern sequences" composed of "patterns" so we should stick to that.

          So "Kleene Operator" -> "looping pattern" or "oneOrMore pattern"

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4143#discussion_r123715389 — Diff: docs/dev/libs/cep.md — @@ -334,6 +334,20 @@ pattern.where(event => ... /* some condition /).or(event => ... / or condition </div> </div> + +* Stop condition: * In case of kleene operators (`oneOrMore()` and `oneOrMore().optional()`) you can +also specify a stop condition, e.g. accept events with value larger than 5 until the sum of values is smaller than 50. — End diff – In general, I would not use the "kleene" term, as we do not use it throughout the rest of the docs. You can say sth like "looping" or simply oneOrMore. In addition, it would also be more consistent to not use "operator" but "pattern". In the docs we have "pattern sequences" composed of "patterns" so we should stick to that. So "Kleene Operator" -> "looping pattern" or "oneOrMore pattern"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4143#discussion_r123715144

          — Diff: docs/dev/libs/cep.md —
          @@ -547,18 +579,30 @@ pattern.where(event => ... /* some condition */)
          <tr>
          <td><strong>or(condition)</strong></td>
          <td>

          • <p>Adds a new condition which is ORed with an existing one. An event can match the pattern only if it
            + <p>Adds a new condition which is ORed with an existing one. An event can match the pattern only if it
            passes at least one of the conditions:</p> {% highlight scala %}

            pattern.where(event => ... /* some condition */)
            .or(event => ... /* alternative condition */)

            {% endhighlight %}

            </td>
            </tr>
            +<tr>
            + <td><strong>until(condition)</strong></td>
            + <td>
            + <p>Specifies a stop condition for kleene operator. Meaning if event matching the given condition occurs, no more

              • End diff –

          Same here for the "kleene operator".

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4143#discussion_r123715144 — Diff: docs/dev/libs/cep.md — @@ -547,18 +579,30 @@ pattern.where(event => ... /* some condition */) <tr> <td><strong>or(condition)</strong></td> <td> <p>Adds a new condition which is ORed with an existing one. An event can match the pattern only if it + <p>Adds a new condition which is ORed with an existing one. An event can match the pattern only if it passes at least one of the conditions:</p> {% highlight scala %} pattern.where(event => ... /* some condition */) .or(event => ... /* alternative condition */) {% endhighlight %} </td> </tr> +<tr> + <td><strong>until(condition)</strong></td> + <td> + <p>Specifies a stop condition for kleene operator. Meaning if event matching the given condition occurs, no more End diff – Same here for the "kleene operator".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4143#discussion_r123710396

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java —
          @@ -186,6 +193,24 @@ public Quantifier getQuantifier() {
          }

          /**
          + * Applies a stop condition for a looping state. It allows cleaning the underlying state.
          + *
          + * @param untilCondition a condition an event has to satisfy to stop collecting events into looping state
          + * @return The same pattern with applied untilCondition
          + */
          + public Pattern<T, F> until(IterativeCondition<F> untilCondition) {
          + Preconditions.checkNotNull(untilCondition, "The condition cannot be null");
          + if (!quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING))

          { + throw new MalformedPatternException("The until condition is only applicable to looping states."); + }

          +
          — End diff –

          Check also if there is another `until` condition already specified and throw an exception saying that a condition has already been specified for the pattern.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4143#discussion_r123710396 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java — @@ -186,6 +193,24 @@ public Quantifier getQuantifier() { } /** + * Applies a stop condition for a looping state. It allows cleaning the underlying state. + * + * @param untilCondition a condition an event has to satisfy to stop collecting events into looping state + * @return The same pattern with applied untilCondition + */ + public Pattern<T, F> until(IterativeCondition<F> untilCondition) { + Preconditions.checkNotNull(untilCondition, "The condition cannot be null"); + if (!quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) { + throw new MalformedPatternException("The until condition is only applicable to looping states."); + } + — End diff – Check also if there is another `until` condition already specified and throw an exception saying that a condition has already been specified for the pattern.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4143#discussion_r123714098

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java —
          @@ -0,0 +1,427 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.cep.nfa;
          +
          +import org.apache.flink.cep.Event;
          +import org.apache.flink.cep.nfa.compiler.NFACompiler;
          +import org.apache.flink.cep.pattern.Pattern;
          +import org.apache.flink.cep.pattern.conditions.IterativeCondition;
          +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +
          +import com.google.common.collect.Lists;
          +import org.junit.Test;
          +
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
          +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
          +import static org.junit.Assert.assertTrue;
          +
          +/**
          + * Tests for

          {@link Pattern#until(IterativeCondition)}

          .
          + */
          +public class UntilConditionITCase {
          +
          +
          — End diff –

          You could add here the test I mentioned earlier and it can be worth adding a test with a proper `IterativeCondition` for completeness.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4143#discussion_r123714098 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java — @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link Pattern#until(IterativeCondition)} . + */ +public class UntilConditionITCase { + + — End diff – You could add here the test I mentioned earlier and it can be worth adding a test with a proper `IterativeCondition` for completeness.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4143#discussion_r123715235

          — Diff: docs/dev/libs/cep.md —
          @@ -409,11 +423,28 @@ pattern.where(new IterativeCondition<Event>() {
          });

          {% endhighlight %}

          </td>

          • </tr>
            + </tr>
            + <tr>
            + <td><strong>until(condition)</strong></td>
            + <td>
            + <p>Specifies a stop condition for kleene operator. Meaning if event matching the given condition occurs, no more
              • End diff –

          Same as before for the "kleene operator" -> "looping pattern".

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4143#discussion_r123715235 — Diff: docs/dev/libs/cep.md — @@ -409,11 +423,28 @@ pattern.where(new IterativeCondition<Event>() { }); {% endhighlight %} </td> </tr> + </tr> + <tr> + <td><strong>until(condition)</strong></td> + <td> + <p>Specifies a stop condition for kleene operator. Meaning if event matching the given condition occurs, no more End diff – Same as before for the "kleene operator" -> "looping pattern".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/4143

          Thanks a lot @dawidwys ! I will have a look.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4143 Thanks a lot @dawidwys ! I will have a look.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/4143

          Hi @kl0u. I've rebased the PR and added docs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4143 Hi @kl0u. I've rebased the PR and added docs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/4143

          Hi @dawidwys ! Thanks for the work.
          Could you also update the documentation?
          This will also help the review process.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4143 Hi @dawidwys ! Thanks for the work. Could you also update the documentation? This will also help the review process.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/4143

          R: @kl0u

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4143 R: @kl0u
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user dawidwys opened a pull request:

          https://github.com/apache/flink/pull/4143

          FLINK-6418[cep] Support for dynamic state changes in CEP patterns

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/dawidwys/flink cep-loop-until

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4143.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4143


          commit b1832af10410266cedc26654f7be1052ea244c47
          Author: Dawid Wysakowicz <dwysakowicz@apache.org>
          Date: 2017-06-19T14:40:02Z

          FLINK-6418[cep] Support for dynamic state changes in CEP patterns


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/4143 FLINK-6418 [cep] Support for dynamic state changes in CEP patterns Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink cep-loop-until Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4143.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4143 commit b1832af10410266cedc26654f7be1052ea244c47 Author: Dawid Wysakowicz <dwysakowicz@apache.org> Date: 2017-06-19T14:40:02Z FLINK-6418 [cep] Support for dynamic state changes in CEP patterns
          Hide
          kkl0u Kostas Kloudas added a comment -

          Dawid Wysakowicz This is like a very nice extension. This allows to have also custom termination criteria for looping patterns with iterative conditions.

          Show
          kkl0u Kostas Kloudas added a comment - Dawid Wysakowicz This is like a very nice extension. This allows to have also custom termination criteria for looping patterns with iterative conditions.
          Hide
          dawidwys Dawid Wysakowicz added a comment -

          I like the idea. I think right know the oneOrMore/zeroOrMore quantifiers never stops creating new ComputationState. Addition of until method would allow us to clear the NFA state completely. What do you think Kostas Kloudas?

          Show
          dawidwys Dawid Wysakowicz added a comment - I like the idea. I think right know the oneOrMore/zeroOrMore quantifiers never stops creating new ComputationState . Addition of until method would allow us to clear the NFA state completely. What do you think Kostas Kloudas ?

            People

            • Assignee:
              dawidwys Dawid Wysakowicz
              Reporter:
              elevy Elias Levy
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development