Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6197

Add support for iterative conditions.

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Description

      So far, the where clause only supported simple FilterFunction conditions.

      With this, we want to add support for conditions where the an event is accepted not only based on its own properties, e.g. name, as it was before, but also based on some statistic computed over previously accepted events in the pattern, e.g. if the price is higher than the average of the prices of the previously accepted events.

      This in combination with the recently added quantifiers will allow for a lot more expressive patterns.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/3624

          FLINK-6197 [cep] Add support for iterative conditions.

          So far, the where clause only supported simple FilterFunction
          conditions. With this, we add support for conditions where an
          event is accepted not only based on its own properties, e.g.
          name, as it was before, but also based on some statistic
          computed over previously accepted events in the pattern, e.g.
          if the price is higher than the average of the prices of the
          previously accepted events.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink cep-iterative-functions

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3624.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3624


          commit dfb848b83864071ef6738261e425bf4d0e43575d
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-03-22T14:52:07Z

          FLINK-6197 [cep] Add support for iterative conditions.

          So far, the where clause only supported simple FilterFunction
          conditions. With this, we add support for conditions where an
          event is accepted not only based on its own properties, e.g.
          name, as it was before, but also based on some statistic
          computed over previously accepted events in the pattern, e.g.
          if the price is higher than the average of the prices of the
          previously accepted events.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3624 FLINK-6197 [cep] Add support for iterative conditions. So far, the where clause only supported simple FilterFunction conditions. With this, we add support for conditions where an event is accepted not only based on its own properties, e.g. name, as it was before, but also based on some statistic computed over previously accepted events in the pattern, e.g. if the price is higher than the average of the prices of the previously accepted events. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-iterative-functions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3624.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3624 commit dfb848b83864071ef6738261e425bf4d0e43575d Author: kl0u <kkloudas@gmail.com> Date: 2017-03-22T14:52:07Z FLINK-6197 [cep] Add support for iterative conditions. So far, the where clause only supported simple FilterFunction conditions. With this, we add support for conditions where an event is accepted not only based on its own properties, e.g. name, as it was before, but also based on some statistic computed over previously accepted events in the pattern, e.g. if the price is higher than the average of the prices of the previously accepted events.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108232033

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java —
          @@ -0,0 +1,98 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.cep.pattern.conditions;
          +
          +import org.apache.flink.api.common.functions.Function;
          +
          +import java.io.Serializable;
          +
          +/**
          + * A user-defined condition that decides if an element should be accepted in the pattern or not.
          + * Accepting an element also signals a state transition for the corresponding

          {@link org.apache.flink.cep.nfa.NFA}

          .
          + *
          + * <p>A condition can be a simple filter or a more complex condition that iterates over the previously accepted
          + * elements in the pattern and decides to accept a new element or not based on some statistic over these elements.
          + * In the former case, the condition should extend the

          {@link SimpleCondition}

          class. In the later, the condition
          + * should extend this class, which gives you also access to the previously accepted elements through a

          {@link Context}.
          + *
          + * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of the prices of all
          + * accepted elements is less than {@code 5} would look like:
          + *
          + * <pre>
          + * {@code
          + * private class MyCondition extends IterativeCondition<Event> {
          + *
          + * @Override
          + * public boolean filter(Event value, Context<Event> ctx) throws Exception {
          + * if (!value.getName().equals("middle")) { + * return false; + * }
          + *
          + * double sum = 0.0;
          + * for (Event e: ctx.getEventsForPattern("middle")) { + * sum += e.getPrice(); + * }
          + * sum += value.getPrice();
          + * return Double.compare(sum, 5.0) <= 0;
          + * }
          + * }
          + * }
          + * </pre>
          + *
          + * <b>ATTENTION: </b> The call to {@link Context#getEventsForPattern(String) getEventsForPattern(...)} has to find
          + * the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary,
          + * so when implementing your condition, try to minimize the times the method is called.
          + */
          +public abstract class IterativeCondition<T> implements Function, Serializable {
          +
          + private static final long serialVersionUID = 7067817235759351255L;
          +
          + /**
          + * The filter function that evaluates the predicate.
          + * <p>
          + * <strong>IMPORTANT:</strong> The system assumes that the function does not
          + * modify the elements on which the predicate is applied. Violating this assumption
          + * can lead to incorrect results.
          + *
          + * @param value The value to be tested.
          + * @param ctx The {@link Context}

          used for the evaluation of the function and provides access to
          + * the already accepted events in the pattern (see

          {@link Context#getEventsForPattern(String)}

          ).
          + * @return

          {@code true}

          for values that should be retained,

          {@code false}

          + * for values to be filtered out.
          + *
          + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
          + * to fail and may trigger recovery.
          + */
          + public abstract boolean filter(T value, Context<T> ctx) throws Exception;
          +
          + /**
          + * The context used when evaluating the

          {@link IterativeCondition condition}

          .
          + */
          + public interface Context<T> extends Serializable {
          +
          + /**
          + * @return An

          {@link Iterable}

          over the already accepted elements
          + * for a given pattern. Elements are iterated in the order the were
          — End diff –

          misspell : the -> they

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108232033 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java — @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * A user-defined condition that decides if an element should be accepted in the pattern or not. + * Accepting an element also signals a state transition for the corresponding {@link org.apache.flink.cep.nfa.NFA} . + * + * <p>A condition can be a simple filter or a more complex condition that iterates over the previously accepted + * elements in the pattern and decides to accept a new element or not based on some statistic over these elements. + * In the former case, the condition should extend the {@link SimpleCondition} class. In the later, the condition + * should extend this class, which gives you also access to the previously accepted elements through a {@link Context}. + * + * <p>An iterative condition that accepts an element if i) its name is middle, and ii) the sum of the prices of all + * accepted elements is less than {@code 5} would look like: + * + * <pre> + * {@code + * private class MyCondition extends IterativeCondition<Event> { + * + * @Override + * public boolean filter(Event value, Context<Event> ctx) throws Exception { + * if (!value.getName().equals("middle")) { + * return false; + * } + * + * double sum = 0.0; + * for (Event e: ctx.getEventsForPattern("middle")) { + * sum += e.getPrice(); + * } + * sum += value.getPrice(); + * return Double.compare(sum, 5.0) <= 0; + * } + * } + * } + * </pre> + * + * <b>ATTENTION: </b> The call to {@link Context#getEventsForPattern(String) getEventsForPattern(...)} has to find + * the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary, + * so when implementing your condition, try to minimize the times the method is called. + */ +public abstract class IterativeCondition<T> implements Function, Serializable { + + private static final long serialVersionUID = 7067817235759351255L; + + /** + * The filter function that evaluates the predicate. + * <p> + * <strong>IMPORTANT:</strong> The system assumes that the function does not + * modify the elements on which the predicate is applied. Violating this assumption + * can lead to incorrect results. + * + * @param value The value to be tested. + * @param ctx The {@link Context} used for the evaluation of the function and provides access to + * the already accepted events in the pattern (see {@link Context#getEventsForPattern(String)} ). + * @return {@code true} for values that should be retained, {@code false} + * for values to be filtered out. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + public abstract boolean filter(T value, Context<T> ctx) throws Exception; + + /** + * The context used when evaluating the {@link IterativeCondition condition} . + */ + public interface Context<T> extends Serializable { + + /** + * @return An {@link Iterable} over the already accepted elements + * for a given pattern. Elements are iterated in the order the were — End diff – misspell : the -> they
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108355802

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java —
          @@ -553,9 +559,38 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches)

          { return outgoingEdges; }

          + private boolean checkFilterCondition(ComputationState<T> computationState, IterativeCondition<T> condition, T event) throws Exception

          { + return condition == null || condition.filter(event, computationState.getConditionContext()); + }

          +
          + Map<String, List<T>> extractCurrentMatches(final ComputationState<T> computationState) {
          + if (computationState.getPreviousState() == null)

          { + return new HashMap<>(); + }

          +
          + Collection<LinkedHashMultimap<String, T>> paths = stringSharedBuffer.extractPatterns(
          + computationState.getPreviousState().getName(),
          + computationState.getEvent(),
          + computationState.getTimestamp(),
          + computationState.getVersion());

          • private boolean checkFilterCondition(FilterFunction<T> condition, T event) throws Exception {
          • return condition == null || condition.filter(event);
            + // for a given computation state, we cannot have more than one matching patterns.
            + Preconditions.checkArgument(paths.size() <= 1);
              • End diff –

          How about checkState?

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108355802 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java — @@ -553,9 +559,38 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { return outgoingEdges; } + private boolean checkFilterCondition(ComputationState<T> computationState, IterativeCondition<T> condition, T event) throws Exception { + return condition == null || condition.filter(event, computationState.getConditionContext()); + } + + Map<String, List<T>> extractCurrentMatches(final ComputationState<T> computationState) { + if (computationState.getPreviousState() == null) { + return new HashMap<>(); + } + + Collection<LinkedHashMultimap<String, T>> paths = stringSharedBuffer.extractPatterns( + computationState.getPreviousState().getName(), + computationState.getEvent(), + computationState.getTimestamp(), + computationState.getVersion()); private boolean checkFilterCondition(FilterFunction<T> condition, T event) throws Exception { return condition == null || condition.filter(event); + // for a given computation state, we cannot have more than one matching patterns. + Preconditions.checkArgument(paths.size() <= 1); End diff – How about checkState?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108358091

          — Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala —
          @@ -138,7 +139,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {

          • @param filter Or filter function
          • @return The same pattern operator where the new filter condition is set
            */
          • def or(filter: FilterFunction[F]): Pattern[T, F] = {
            + def or(filter: IterativeCondition[F]): Pattern[T, F] = {
              • End diff –

          Missing version accepting lambdas.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108358091 — Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala — @@ -138,7 +139,7 @@ class Pattern [T , F <: T] (jPattern: JPattern [T, F] ) { @param filter Or filter function @return The same pattern operator where the new filter condition is set */ def or(filter: FilterFunction [F] ): Pattern [T, F] = { + def or(filter: IterativeCondition [F] ): Pattern [T, F] = { End diff – Missing version accepting lambdas.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108355851

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java —
          @@ -573,6 +608,9 @@ private boolean checkFilterCondition(FilterFunction<T> condition, T event) throw
          computationState.getTimestamp(),
          computationState.getVersion());

          + // for a given computation state, we cannot have more than one matching patterns.
          + Preconditions.checkArgument(paths.size() <= 1);
          — End diff –

          How about checkState?

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108355851 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java — @@ -573,6 +608,9 @@ private boolean checkFilterCondition(FilterFunction<T> condition, T event) throw computationState.getTimestamp(), computationState.getVersion()); + // for a given computation state, we cannot have more than one matching patterns. + Preconditions.checkArgument(paths.size() <= 1); — End diff – How about checkState?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108352961

          — Diff: docs/dev/libs/cep.md —
          @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start")
          </div>
          </div>

          -Each state must have an unique name to identify the matched events later on.
          +Each state must have a unique name to identify the matched events later on.
          Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
          +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`.
          +
          +*Iterative Conditions:* This type of conditions can iterate over the previously accepted elements in the pattern and
          +decide to accept a new element or not, based on some statistic over those elements.
          +
          +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum
          +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do
          +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
          +`oneToMany` or `zeroToMany`.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +start.where(new IterativeCondition<SubEvent>() {
          + @Override
          + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
          + if (!value.getName().startsWith("foo"))

          { + return false; + }

          +
          + double sum = 0.0;
          + for (Event event : ctx.getEventsForPattern("middle"))

          { + sum += event.getPrice(); + }

          + sum += value.getPrice();
          + return Double.compare(sum, 5.0) < 0;
          + }
          +});
          +

          {% endhighlight %}

          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +

          {% highlight scala %}

          +start.where(
          — End diff –

          I was thinking of some more Scala way of coding it. Maybe:
          ```
          Pattern.begin[Event]("abc").where(
          (a, ctx) =>

          { lazy val sum = ctx.getEventsForPattern("a").asScala.map(_.getPrice).sum a.getName.startsWith("foo") && sum + a.getPrice < 5.0 }

          )
          ```

          Also I would try to make the Scala API more convenient. util.Iterable is not what I would expect. Not sure though if know it is the right moment. We could postpone it to redesigning the API.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108352961 — Diff: docs/dev/libs/cep.md — @@ -124,13 +124,70 @@ val start : Pattern [Event, _] = Pattern.begin("start") </div> </div> -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +* Iterative Conditions: * This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +start.where(new IterativeCondition<SubEvent>() { + @Override + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); + {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> + {% highlight scala %} +start.where( — End diff – I was thinking of some more Scala way of coding it. Maybe: ``` Pattern.begin [Event] ("abc").where( (a, ctx) => { lazy val sum = ctx.getEventsForPattern("a").asScala.map(_.getPrice).sum a.getName.startsWith("foo") && sum + a.getPrice < 5.0 } ) ``` Also I would try to make the Scala API more convenient. util.Iterable is not what I would expect. Not sure though if know it is the right moment. We could postpone it to redesigning the API.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108362971

          — Diff: docs/dev/libs/cep.md —
          @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start")
          </div>
          </div>

          -Each state must have an unique name to identify the matched events later on.
          +Each state must have a unique name to identify the matched events later on.
          Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
          +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`.
          +
          +*Iterative Conditions:* This type of conditions can iterate over the previously accepted elements in the pattern and
          +decide to accept a new element or not, based on some statistic over those elements.
          +
          +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum
          +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do
          +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
          +`oneToMany` or `zeroToMany`.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +start.where(new IterativeCondition<SubEvent>() {
          + @Override
          + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
          + if (!value.getName().startsWith("foo"))

          { + return false; + }

          +
          + double sum = 0.0;
          + for (Event event : ctx.getEventsForPattern("middle"))

          { + sum += event.getPrice(); + }

          + sum += value.getPrice();
          + return Double.compare(sum, 5.0) < 0;
          + }
          +});
          +

          {% endhighlight %}

          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +

          {% highlight scala %}

          +start.where(
          — End diff –

          I agree that this is a lot more scala-ish (apart from the `asScala` part), the only problem is that now we do not show that we should avoid always calling the `getEventsForPattern`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108362971 — Diff: docs/dev/libs/cep.md — @@ -124,13 +124,70 @@ val start : Pattern [Event, _] = Pattern.begin("start") </div> </div> -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +* Iterative Conditions: * This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +start.where(new IterativeCondition<SubEvent>() { + @Override + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); + {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> + {% highlight scala %} +start.where( — End diff – I agree that this is a lot more scala-ish (apart from the `asScala` part), the only problem is that now we do not show that we should avoid always calling the `getEventsForPattern`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108363416

          — Diff: docs/dev/libs/cep.md —
          @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start")
          </div>
          </div>

          -Each state must have an unique name to identify the matched events later on.
          +Each state must have a unique name to identify the matched events later on.
          Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
          +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`.
          +
          +*Iterative Conditions:* This type of conditions can iterate over the previously accepted elements in the pattern and
          +decide to accept a new element or not, based on some statistic over those elements.
          +
          +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum
          +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do
          +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
          +`oneToMany` or `zeroToMany`.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +start.where(new IterativeCondition<SubEvent>() {
          + @Override
          + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
          + if (!value.getName().startsWith("foo"))

          { + return false; + }

          +
          + double sum = 0.0;
          + for (Event event : ctx.getEventsForPattern("middle"))

          { + sum += event.getPrice(); + }

          + sum += value.getPrice();
          + return Double.compare(sum, 5.0) < 0;
          + }
          +});
          +

          {% endhighlight %}

          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +

          {% highlight scala %}

          +start.where(
          — End diff –

          For the `asScala` part, I will address it in another PR which will see the scala api a bit more globally.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108363416 — Diff: docs/dev/libs/cep.md — @@ -124,13 +124,70 @@ val start : Pattern [Event, _] = Pattern.begin("start") </div> </div> -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +* Iterative Conditions: * This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +start.where(new IterativeCondition<SubEvent>() { + @Override + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); + {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> + {% highlight scala %} +start.where( — End diff – For the `asScala` part, I will address it in another PR which will see the scala api a bit more globally.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108364477

          — Diff: docs/dev/libs/cep.md —
          @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start")
          </div>
          </div>

          -Each state must have an unique name to identify the matched events later on.
          +Each state must have a unique name to identify the matched events later on.
          Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
          +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`.
          +
          +*Iterative Conditions:* This type of conditions can iterate over the previously accepted elements in the pattern and
          +decide to accept a new element or not, based on some statistic over those elements.
          +
          +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum
          +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do
          +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
          +`oneToMany` or `zeroToMany`.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +start.where(new IterativeCondition<SubEvent>() {
          + @Override
          + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
          + if (!value.getName().startsWith("foo"))

          { + return false; + }

          +
          + double sum = 0.0;
          + for (Event event : ctx.getEventsForPattern("middle"))

          { + sum += event.getPrice(); + }

          + sum += value.getPrice();
          + return Double.compare(sum, 5.0) < 0;
          + }
          +});
          +

          {% endhighlight %}

          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +

          {% highlight scala %}

          +start.where(
          — End diff –

          It is not that obvious but the `lazy` keyword plays the role of avoiding calling `getEventsForPattern`. But I agree the java-ish way is more straightforward.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108364477 — Diff: docs/dev/libs/cep.md — @@ -124,13 +124,70 @@ val start : Pattern [Event, _] = Pattern.begin("start") </div> </div> -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +* Iterative Conditions: * This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +start.where(new IterativeCondition<SubEvent>() { + @Override + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); + {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> + {% highlight scala %} +start.where( — End diff – It is not that obvious but the `lazy` keyword plays the role of avoiding calling `getEventsForPattern`. But I agree the java-ish way is more straightforward.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108364748

          — Diff: docs/dev/libs/cep.md —
          @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start")
          </div>
          </div>

          -Each state must have an unique name to identify the matched events later on.
          +Each state must have a unique name to identify the matched events later on.
          Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
          +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`.
          +
          +*Iterative Conditions:* This type of conditions can iterate over the previously accepted elements in the pattern and
          +decide to accept a new element or not, based on some statistic over those elements.
          +
          +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum
          +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do
          +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
          +`oneToMany` or `zeroToMany`.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +start.where(new IterativeCondition<SubEvent>() {
          + @Override
          + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
          + if (!value.getName().startsWith("foo"))

          { + return false; + }

          +
          + double sum = 0.0;
          + for (Event event : ctx.getEventsForPattern("middle"))

          { + sum += event.getPrice(); + }

          + sum += value.getPrice();
          + return Double.compare(sum, 5.0) < 0;
          + }
          +});
          +

          {% endhighlight %}

          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +

          {% highlight scala %}

          +start.where(
          — End diff –

          You are right! I am fixing it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108364748 — Diff: docs/dev/libs/cep.md — @@ -124,13 +124,70 @@ val start : Pattern [Event, _] = Pattern.begin("start") </div> </div> -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +* Iterative Conditions: * This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +start.where(new IterativeCondition<SubEvent>() { + @Override + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); + {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> + {% highlight scala %} +start.where( — End diff – You are right! I am fixing it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108365413

          — Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala —
          @@ -138,7 +139,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {

          • @param filter Or filter function
          • @return The same pattern operator where the new filter condition is set
            */
          • def or(filter: FilterFunction[F]): Pattern[T, F] = {
            + def or(filter: IterativeCondition[F]): Pattern[T, F] = {
              • End diff –

          Fixing this

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108365413 — Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala — @@ -138,7 +139,7 @@ class Pattern [T , F <: T] (jPattern: JPattern [T, F] ) { @param filter Or filter function @return The same pattern operator where the new filter condition is set */ def or(filter: FilterFunction [F] ): Pattern [T, F] = { + def or(filter: IterativeCondition [F] ): Pattern [T, F] = { End diff – Fixing this
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108366616

          — Diff: docs/dev/libs/cep.md —
          @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start")
          </div>
          </div>

          -Each state must have an unique name to identify the matched events later on.
          +Each state must have a unique name to identify the matched events later on.
          Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
          +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`.
          +
          +*Iterative Conditions:* This type of conditions can iterate over the previously accepted elements in the pattern and
          +decide to accept a new element or not, based on some statistic over those elements.
          +
          +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum
          +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do
          +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
          +`oneToMany` or `zeroToMany`.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +start.where(new IterativeCondition<SubEvent>() {
          + @Override
          + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
          + if (!value.getName().startsWith("foo"))

          { + return false; + }

          +
          + double sum = 0.0;
          + for (Event event : ctx.getEventsForPattern("middle"))

          { + sum += event.getPrice(); + }

          + sum += value.getPrice();
          + return Double.compare(sum, 5.0) < 0;
          + }
          +});
          +

          {% endhighlight %}

          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +

          {% highlight scala %}

          +start.where(
          — End diff –

          BTW why iterable is not what you would expect? What do you think would be a better choice? This is aligned also with our windowing APIs and also I think it is expressive enough.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108366616 — Diff: docs/dev/libs/cep.md — @@ -124,13 +124,70 @@ val start : Pattern [Event, _] = Pattern.begin("start") </div> </div> -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +* Iterative Conditions: * This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +start.where(new IterativeCondition<SubEvent>() { + @Override + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); + {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> + {% highlight scala %} +start.where( — End diff – BTW why iterable is not what you would expect? What do you think would be a better choice? This is aligned also with our windowing APIs and also I think it is expressive enough.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108367951

          — Diff: docs/dev/libs/cep.md —
          @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start")
          </div>
          </div>

          -Each state must have an unique name to identify the matched events later on.
          +Each state must have a unique name to identify the matched events later on.
          Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
          +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`.
          +
          +*Iterative Conditions:* This type of conditions can iterate over the previously accepted elements in the pattern and
          +decide to accept a new element or not, based on some statistic over those elements.
          +
          +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum
          +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do
          +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
          +`oneToMany` or `zeroToMany`.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +start.where(new IterativeCondition<SubEvent>() {
          + @Override
          + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
          + if (!value.getName().startsWith("foo"))

          { + return false; + }

          +
          + double sum = 0.0;
          + for (Event event : ctx.getEventsForPattern("middle"))

          { + sum += event.getPrice(); + }

          + sum += value.getPrice();
          + return Double.compare(sum, 5.0) < 0;
          + }
          +});
          +

          {% endhighlight %}

          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +

          {% highlight scala %}

          +start.where(
          — End diff –

          I just meant I would expect scala version of `Iterable` in scala code. Not the java version.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108367951 — Diff: docs/dev/libs/cep.md — @@ -124,13 +124,70 @@ val start : Pattern [Event, _] = Pattern.begin("start") </div> </div> -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +* Iterative Conditions: * This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +start.where(new IterativeCondition<SubEvent>() { + @Override + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); + {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> + {% highlight scala %} +start.where( — End diff – I just meant I would expect scala version of `Iterable` in scala code. Not the java version.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108368612

          — Diff: docs/dev/libs/cep.md —
          @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start")
          </div>
          </div>

          -Each state must have an unique name to identify the matched events later on.
          +Each state must have a unique name to identify the matched events later on.
          Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
          +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`.
          +
          +*Iterative Conditions:* This type of conditions can iterate over the previously accepted elements in the pattern and
          +decide to accept a new element or not, based on some statistic over those elements.
          +
          +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum
          +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do
          +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
          +`oneToMany` or `zeroToMany`.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +start.where(new IterativeCondition<SubEvent>() {
          + @Override
          + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
          + if (!value.getName().startsWith("foo"))

          { + return false; + }

          +
          + double sum = 0.0;
          + for (Event event : ctx.getEventsForPattern("middle"))

          { + sum += event.getPrice(); + }

          + sum += value.getPrice();
          + return Double.compare(sum, 5.0) < 0;
          + }
          +});
          +

          {% endhighlight %}

          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +

          {% highlight scala %}

          +start.where(
          — End diff –

          I see! Then we are on the same page. I was also thinking for the scala API to create a scala version of the `IterativeCondition` and then have a wrapper to expose it as a java `IterativeCondition`. This way we can wrap everything in more scala-flavored structures.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108368612 — Diff: docs/dev/libs/cep.md — @@ -124,13 +124,70 @@ val start : Pattern [Event, _] = Pattern.begin("start") </div> </div> -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +* Iterative Conditions: * This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +start.where(new IterativeCondition<SubEvent>() { + @Override + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); + {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> + {% highlight scala %} +start.where( — End diff – I see! Then we are on the same page. I was also thinking for the scala API to create a scala version of the `IterativeCondition` and then have a wrapper to expose it as a java `IterativeCondition`. This way we can wrap everything in more scala-flavored structures.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108369790

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java —
          @@ -553,9 +559,38 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches)

          { return outgoingEdges; }

          + private boolean checkFilterCondition(ComputationState<T> computationState, IterativeCondition<T> condition, T event) throws Exception

          { + return condition == null || condition.filter(event, computationState.getConditionContext()); + }

          +
          + Map<String, List<T>> extractCurrentMatches(final ComputationState<T> computationState) {
          + if (computationState.getPreviousState() == null)

          { + return new HashMap<>(); + }

          +
          + Collection<LinkedHashMultimap<String, T>> paths = stringSharedBuffer.extractPatterns(
          + computationState.getPreviousState().getName(),
          + computationState.getEvent(),
          + computationState.getTimestamp(),
          + computationState.getVersion());

          • private boolean checkFilterCondition(FilterFunction<T> condition, T event) throws Exception {
          • return condition == null || condition.filter(event);
            + // for a given computation state, we cannot have more than one matching patterns.
            + Preconditions.checkArgument(paths.size() <= 1);
              • End diff –

          Fixing this

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108369790 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java — @@ -553,9 +559,38 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { return outgoingEdges; } + private boolean checkFilterCondition(ComputationState<T> computationState, IterativeCondition<T> condition, T event) throws Exception { + return condition == null || condition.filter(event, computationState.getConditionContext()); + } + + Map<String, List<T>> extractCurrentMatches(final ComputationState<T> computationState) { + if (computationState.getPreviousState() == null) { + return new HashMap<>(); + } + + Collection<LinkedHashMultimap<String, T>> paths = stringSharedBuffer.extractPatterns( + computationState.getPreviousState().getName(), + computationState.getEvent(), + computationState.getTimestamp(), + computationState.getVersion()); private boolean checkFilterCondition(FilterFunction<T> condition, T event) throws Exception { return condition == null || condition.filter(event); + // for a given computation state, we cannot have more than one matching patterns. + Preconditions.checkArgument(paths.size() <= 1); End diff – Fixing this
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108369832

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java —
          @@ -573,6 +608,9 @@ private boolean checkFilterCondition(FilterFunction<T> condition, T event) throw
          computationState.getTimestamp(),
          computationState.getVersion());

          + // for a given computation state, we cannot have more than one matching patterns.
          + Preconditions.checkArgument(paths.size() <= 1);
          — End diff –

          Fixing this

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108369832 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java — @@ -573,6 +608,9 @@ private boolean checkFilterCondition(FilterFunction<T> condition, T event) throw computationState.getTimestamp(), computationState.getVersion()); + // for a given computation state, we cannot have more than one matching patterns. + Preconditions.checkArgument(paths.size() <= 1); — End diff – Fixing this
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3624

          Thanks for the review @dawidwys ! I integrated the comments. I will wait for travis and then merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3624 Thanks for the review @dawidwys ! I integrated the comments. I will wait for travis and then merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3624#discussion_r108372187

          — Diff: docs/dev/libs/cep.md —
          @@ -124,13 +124,70 @@ val start : Pattern[Event, _] = Pattern.begin("start")
          </div>
          </div>

          -Each state must have an unique name to identify the matched events later on.
          +Each state must have a unique name to identify the matched events later on.
          Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method.
          +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`.
          +
          +*Iterative Conditions:* This type of conditions can iterate over the previously accepted elements in the pattern and
          +decide to accept a new element or not, based on some statistic over those elements.
          +
          +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum
          +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do
          +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g.
          +`oneToMany` or `zeroToMany`.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +start.where(new IterativeCondition<SubEvent>() {
          + @Override
          + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
          + if (!value.getName().startsWith("foo"))

          { + return false; + }

          +
          + double sum = 0.0;
          + for (Event event : ctx.getEventsForPattern("middle"))

          { + sum += event.getPrice(); + }

          + sum += value.getPrice();
          + return Double.compare(sum, 5.0) < 0;
          + }
          +});
          +

          {% endhighlight %}

          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +

          {% highlight scala %}

          +start.where(
          — End diff –

          The other option would be to have just scala wrapper around `Context` and use it in the methods accepting lambdas. Underneath it would be used in IterativeCondition. Not sure if the methods accepting objects e.g. `IterativeCondition` in Scala would be used at all.

          E.g.
          ```
          where( filterFun: (K, ScalaContext[K]) => Boolean) : Pattern[T, F] = {
          val filter = new IterativeCondition[F]

          { val cleanFilter = cep.scala.cleanClosure(filterFun) override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value, ScalaContext(ctx)) }

          where(filter)
          }
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3624#discussion_r108372187 — Diff: docs/dev/libs/cep.md — @@ -124,13 +124,70 @@ val start : Pattern [Event, _] = Pattern.begin("start") </div> </div> -Each state must have an unique name to identify the matched events later on. +Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. +These filtering conditions can be either an `IterativeCondition` or a `SimpleCondition`. + +* Iterative Conditions: * This type of conditions can iterate over the previously accepted elements in the pattern and +decide to accept a new element or not, based on some statistic over those elements. + +Below is the code for an iterative condition that accepts elements whose name start with "foo" and for which, the sum +of the prices of the previously accepted elements for a state named "middle", plus the price of the current event, do +not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. +`oneToMany` or `zeroToMany`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +start.where(new IterativeCondition<SubEvent>() { + @Override + public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { + if (!value.getName().startsWith("foo")) { + return false; + } + + double sum = 0.0; + for (Event event : ctx.getEventsForPattern("middle")) { + sum += event.getPrice(); + } + sum += value.getPrice(); + return Double.compare(sum, 5.0) < 0; + } +}); + {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> + {% highlight scala %} +start.where( — End diff – The other option would be to have just scala wrapper around `Context` and use it in the methods accepting lambdas. Underneath it would be used in IterativeCondition. Not sure if the methods accepting objects e.g. `IterativeCondition` in Scala would be used at all. E.g. ``` where( filterFun: (K, ScalaContext [K] ) => Boolean) : Pattern [T, F] = { val filter = new IterativeCondition [F] { val cleanFilter = cep.scala.cleanClosure(filterFun) override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value, ScalaContext(ctx)) } where(filter) } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3624

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3624
          Hide
          kkl0u Kostas Kloudas added a comment -

          Merged with ad21a441434b9ac5886b664871553bf57885e984

          Show
          kkl0u Kostas Kloudas added a comment - Merged with ad21a441434b9ac5886b664871553bf57885e984

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              kkl0u Kostas Kloudas
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development