Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3703

Add sequence matching semantics to discard matched events

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Implemented
    • Affects Version/s: 1.0.0, 1.1.0
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Description

      There is no easy way to decide whether events can be part of multiple matching sequences or not. Currently, the default is that an event can participate in multiple matching sequences. E.g. if you have the pattern Pattern.<Event>begin("a").followedBy("b") and the input event stream Event("A"), Event("B"), Event("C"), then you will generate the following matching sequences: Event("A"), Event("B"), Event("A"), Event("C") and Event("B"), Event("C").

      It would be useful to allow the user to define where the matching algorithm should continue after a matching sequence has been found. Possible option values could be

      • from first - continue keeping all events for future matches (that is the current behaviour)
      • after first - continue after the first element (remove first matching event and continue with the second event)
      • after last - continue after the last element (effectively discarding all elements of the matching sequence)

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk closed the pull request at:

          https://github.com/apache/flink/pull/2367

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk closed the pull request at: https://github.com/apache/flink/pull/2367
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2367

          @kl0u Sure.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2367 @kl0u Sure.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2367

          Hi @mushketyk .

          I am trying to clean up a bit the open PRs for the CEP library and the related JIRAs.
          The issue that this PR covers was already implemented as part of multiple other features that I also mention in the related JIRA.
          Could you close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2367 Hi @mushketyk . I am trying to clean up a bit the open PRs for the CEP library and the related JIRAs. The issue that this PR covers was already implemented as part of multiple other features that I also mention in the related JIRA. Could you close this PR?
          Hide
          kkl0u Kostas Kloudas added a comment -

          This was covered by other issues including the Quantifier implementation, the skip-till-next and some additional issues for continuity within looping patterns.

          Show
          kkl0u Kostas Kloudas added a comment - This was covered by other issues including the Quantifier implementation, the skip-till-next and some additional issues for continuity within looping patterns.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user LordFB commented on the issue:

          https://github.com/apache/flink/pull/2367

          Hi @mushketyk,

          too bad, with this missing it is kind of a gamebreaker for Flink in my Use Case.

          Jeah, that'd be great, if Till would show some action on this and the related PRs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user LordFB commented on the issue: https://github.com/apache/flink/pull/2367 Hi @mushketyk, too bad, with this missing it is kind of a gamebreaker for Flink in my Use Case. Jeah, that'd be great, if Till would show some action on this and the related PRs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2367

          Hi @LordFB

          I don't think there is currently a way to do it in cep-flink. I am still waiting for Till's review for this PR, but he seems to be really busy with other work.

          Maybe together we will be able to convince him to spend some time on reviewing this and similar CEP PRs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2367 Hi @LordFB I don't think there is currently a way to do it in cep-flink. I am still waiting for Till's review for this PR, but he seems to be really busy with other work. Maybe together we will be able to convince him to spend some time on reviewing this and similar CEP PRs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user LordFB commented on the issue:

          https://github.com/apache/flink/pull/2367

          Hi @mushketyk and @tillrohrmann,

          are there any updates on this Pull Request? Or is there already a way to change the matching behaviour in FlinkCEP?

          Show
          githubbot ASF GitHub Bot added a comment - Github user LordFB commented on the issue: https://github.com/apache/flink/pull/2367 Hi @mushketyk and @tillrohrmann, are there any updates on this Pull Request? Or is there already a way to change the matching behaviour in FlinkCEP?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2367

          Hi @tillrohrmann
          I've updated the PR according to your review. I had to add reverse reverse edges to buffer entries and changed the interface of the SharedBuffer a bit to move code from it to the NFA class. I hope I didn't break the encapsulation too much, so I would to see your opinion about this.
          I also added more tests and user documentation in this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2367 Hi @tillrohrmann I've updated the PR according to your review. I had to add reverse reverse edges to buffer entries and changed the interface of the SharedBuffer a bit to move code from it to the NFA class. I hope I didn't break the encapsulation too much, so I would to see your opinion about this. I also added more tests and user documentation in this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2367

          @tillrohrmann Thank you for the very detailed review. I do appreciate it.
          I agree with your suggestions and I'll try to improve the code and write documentation in the next couple of days.
          At the very least I know that tests are correct, so I can start changing the implementation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2367 @tillrohrmann Thank you for the very detailed review. I do appreciate it. I agree with your suggestions and I'll try to improve the code and write documentation in the next couple of days. At the very least I know that tests are correct, so I can start changing the implementation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2367

          Thanks for your contribution and apologies for the late review @mushketyk. I think your implementation goes into the right direction and I really like the testing.

          I think what we still can and should improve is to pull out the `MatchingBehaviour` out of the `SharedBuffer` and the `State`. The `NFA` should be the only component which is responsible for the matching behaviour. I think we can achieve this if we pimp the `SharedBuffer` a little bit.

          So what should roughly happen is that the `NFA` removes computation states which are not longer valid. Furthermore, we have to clean up the `SharedBuffer` and remove `SharedBufferEntries` which depend on other `SharedBufferEntries` which are no longer valid (e.g. due to `MatchingBehaviour.afterLast`).

          So when you have a `SharedBufferEntry` representing a certain state which is part of the currently matched sequence and `MatchingBehaviour.afterLast`, then you have to find out all `SharedBufferEntries` which depend on this state (children) and remove them. That you have to do until you reach the youngest descendant. Currently we can only find the predecessors for a given `SharedBufferEntry` but not the children. I think that would have to be added in order to implement the different `MatchingBehaviours`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2367 Thanks for your contribution and apologies for the late review @mushketyk. I think your implementation goes into the right direction and I really like the testing. I think what we still can and should improve is to pull out the `MatchingBehaviour` out of the `SharedBuffer` and the `State`. The `NFA` should be the only component which is responsible for the matching behaviour. I think we can achieve this if we pimp the `SharedBuffer` a little bit. So what should roughly happen is that the `NFA` removes computation states which are not longer valid. Furthermore, we have to clean up the `SharedBuffer` and remove `SharedBufferEntries` which depend on other `SharedBufferEntries` which are no longer valid (e.g. due to `MatchingBehaviour.afterLast`). So when you have a `SharedBufferEntry` representing a certain state which is part of the currently matched sequence and `MatchingBehaviour.afterLast`, then you have to find out all `SharedBufferEntries` which depend on this state (children) and remove them. That you have to do until you reach the youngest descendant. Currently we can only find the predecessors for a given `SharedBufferEntry` but not the children. I think that would have to be added in order to implement the different `MatchingBehaviours`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2367#discussion_r75496232

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java —
          @@ -129,6 +141,23 @@ public Time getWindowTime() {
          }

          /**
          + * Set a matching behaviour that defines if the same event can be used in multiple matching sequences.
          + *
          + * @param matchingBehaviour New matching behaviour
          + * @return The same pattern operator with the specified matching behaviour
          + */
          + public Pattern<T, F> matchingBehaviour(MatchingBehaviour matchingBehaviour) {
          +
          + Pattern<T, F> pattern = this;
          + while (pattern != null)

          { + pattern.matchingBehaviour = matchingBehaviour; + pattern = (Pattern<T, F>) pattern.getPrevious(); + }

          +
          + return this;
          — End diff –

          Looking at the necessary implementation to wire the matching behaviour through all `Pattern` instances, I wonder whether the `MatchingBehaviour` should rather be given to the `CEP.pattern` call. There it could be given to the generated `NFA` which is responsible for making sure that the behaviour is respected. The API would then look like:

          `CEP.pattern(input, pattern, matchingBehaviour)`

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r75496232 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java — @@ -129,6 +141,23 @@ public Time getWindowTime() { } /** + * Set a matching behaviour that defines if the same event can be used in multiple matching sequences. + * + * @param matchingBehaviour New matching behaviour + * @return The same pattern operator with the specified matching behaviour + */ + public Pattern<T, F> matchingBehaviour(MatchingBehaviour matchingBehaviour) { + + Pattern<T, F> pattern = this; + while (pattern != null) { + pattern.matchingBehaviour = matchingBehaviour; + pattern = (Pattern<T, F>) pattern.getPrevious(); + } + + return this; — End diff – Looking at the necessary implementation to wire the matching behaviour through all `Pattern` instances, I wonder whether the `MatchingBehaviour` should rather be given to the `CEP.pattern` call. There it could be given to the generated `NFA` which is responsible for making sure that the behaviour is respected. The API would then look like: `CEP.pattern(input, pattern, matchingBehaviour)` What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2367#discussion_r75495772

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java —
          @@ -38,10 +40,16 @@
          private final String name;
          private final StateType stateType;
          private final Collection<StateTransition<T>> stateTransitions;
          + private final MatchingBehaviour matchingBehaviour;
          — End diff –

          The `State` shouldn't have to know the `MatchingBehaviour`. They should be agnostic to the matching behaviour.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r75495772 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java — @@ -38,10 +40,16 @@ private final String name; private final StateType stateType; private final Collection<StateTransition<T>> stateTransitions; + private final MatchingBehaviour matchingBehaviour; — End diff – The `State` shouldn't have to know the `MatchingBehaviour`. They should be agnostic to the matching behaviour.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2367#discussion_r75495601

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java —
          @@ -202,7 +203,8 @@ public void prune(long pruningTimestamp) {
          final K key,
          final V value,
          final long timestamp,

          • final DeweyNumber version) {
            + final DeweyNumber version,
            + final MatchingBehaviour matchingBehaviour) {
              • End diff –

          I think the `SharedBuffer` should nothing know about the `MatchingBehaviour`. The `MatchingBehaviour` is something specific to the `NFA` and, thus, the `NFA` should be responsible for implementing it. This will give a better separation of concerns.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r75495601 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java — @@ -202,7 +203,8 @@ public void prune(long pruningTimestamp) { final K key, final V value, final long timestamp, final DeweyNumber version) { + final DeweyNumber version, + final MatchingBehaviour matchingBehaviour) { End diff – I think the `SharedBuffer` should nothing know about the `MatchingBehaviour`. The `MatchingBehaviour` is something specific to the `NFA` and, thus, the `NFA` should be responsible for implementing it. This will give a better separation of concerns.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2367#discussion_r75495021

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java —
          @@ -300,17 +311,21 @@ public int hashCode()

          { previousEvent, previousTimestamp, oldVersion); + validPreviousState = true; }
          • // a new computation state is referring to the shared entry
          • sharedBuffer.lock(newState, event, timestamp);
            + if (validPreviousState) {
            + // a new computation state is referring to the shared entry
            + sharedBuffer.lock(newState, event, timestamp);
            +
            + resultingComputationStates.add(new ComputationState<T>(
            + newState,
            + event,
            + timestamp,
            + newComputationStateVersion,
            + startTimestamp));
              • End diff –

          Ideally we would filtered out all computation states which referred to pruned computations. Then we would not have to make this case distinction here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r75495021 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java — @@ -300,17 +311,21 @@ public int hashCode() { previousEvent, previousTimestamp, oldVersion); + validPreviousState = true; } // a new computation state is referring to the shared entry sharedBuffer.lock(newState, event, timestamp); + if (validPreviousState) { + // a new computation state is referring to the shared entry + sharedBuffer.lock(newState, event, timestamp); + + resultingComputationStates.add(new ComputationState<T>( + newState, + event, + timestamp, + newComputationStateVersion, + startTimestamp)); End diff – Ideally we would filtered out all computation states which referred to pruned computations. Then we would not have to make this case distinction here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2367#discussion_r75494363

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java —
          @@ -133,9 +134,13 @@ public void addState(final State<T> state) {
          final int numberComputationStates = computationStates.size();
          final Collection<Map<String, T>> result = new ArrayList<>();
          final Collection<Tuple2<Map<String, T>, Long>> timeoutResult = new ArrayList<>();
          + boolean skipOtherStates = false;

          // iterate over all current computations
          for (int i = 0; i < numberComputationStates; i++) {
          + if (skipOtherStates)

          { + continue; + }

          — End diff –

          I think you cannot simply skip the current set of computation states because then you would retain those states, which have a next condition and would thus be filtered out in the `MatchingBehaviour.AFTER_LAST` case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r75494363 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java — @@ -133,9 +134,13 @@ public void addState(final State<T> state) { final int numberComputationStates = computationStates.size(); final Collection<Map<String, T>> result = new ArrayList<>(); final Collection<Tuple2<Map<String, T>, Long>> timeoutResult = new ArrayList<>(); + boolean skipOtherStates = false; // iterate over all current computations for (int i = 0; i < numberComputationStates; i++) { + if (skipOtherStates) { + continue; + } — End diff – I think you cannot simply skip the current set of computation states because then you would retain those states, which have a next condition and would thus be filtered out in the `MatchingBehaviour.AFTER_LAST` case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2367#discussion_r75491739

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java —
          @@ -243,8 +252,7 @@ public int hashCode() {
          // check all state transitions for each state
          for (StateTransition<T> stateTransition: stateTransitions) {
          try {

          • if (stateTransition.getCondition() == null || stateTransition.getCondition().filter(event)) {
          • // filter condition is true
            + if (stateTransition.isConditionTrue(event)) {
              • End diff –

          Good refactoring

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r75491739 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java — @@ -243,8 +252,7 @@ public int hashCode() { // check all state transitions for each state for (StateTransition<T> stateTransition: stateTransitions) { try { if (stateTransition.getCondition() == null || stateTransition.getCondition().filter(event)) { // filter condition is true + if (stateTransition.isConditionTrue(event)) { End diff – Good refactoring
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2367#discussion_r74837056

          — Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java —
          @@ -26,6 +26,7 @@
          import akka.pattern.Patterns;
          import akka.util.Timeout;

          +import com.google.common.base.Joiner;
          — End diff –

          Good point. Will update.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r74837056 — Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java — @@ -26,6 +26,7 @@ import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.base.Joiner; — End diff – Good point. Will update.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user smarthi commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2367#discussion_r74711517

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java —
          @@ -256,16 +267,75 @@ public void prune(long pruningTimestamp) {
          edge.getTarget(),
          edge.getVersion(),
          copy));
          + if (matchingBehaviour == MatchingBehaviour.AFTER_LAST)

          { + cleanUp.add(edge.getTarget()); + }

          }
          }
          }
          }
          }
          }

          + // Remove shared buffer entries to maintain correct matching behaviour
          + doCleanUp(new Predicate<K, V>() {
          +
          + @Override
          + public boolean toRemove(SharedBufferEntry<K, V> entry)

          { + return cleanUp.contains(entry); + }

          + });
          + // Remove all entries that are dependent on the current event
          + if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) {
          + doCleanUp(new Predicate<K, V>() {
          +
          + @Override
          + public boolean toRemove(SharedBufferEntry<K, V> entry) {
          + if (entry == null)

          { + return false; + }

          + return entry.getValueTime().value == value
          + && entry.getValueTime().timestamp == timestamp;
          + }
          + });
          + }
          +
          return result;
          }

          + private void doCleanUp(Predicate<K, V> predicate) {
          + ArrayList<SharedBufferEntry<K, V>> toRemove = new ArrayList<>();
          + for (SharedBufferPage<K, V> page : this.pages.values()) {
          + for (SharedBufferEntry<K, V> entry : page.getEntries()) {
          + if (entry.getReferenceCounter() <= 1)

          { + doRecursiveCleanup(entry, predicate, toRemove); + }

          + }
          + }
          +
          + for (SharedBufferEntry<K, V> startNode: toRemove)

          { + release(startNode.page.getKey(), startNode.getValueTime().value, startNode.getValueTime().getTimestamp()); + remove(startNode.page.getKey(), startNode.getValueTime().value, startNode.getValueTime().getTimestamp()); + }

          + }
          +
          + private boolean doRecursiveCleanup(SharedBufferEntry<K, V> startNode, Predicate<K, V> cleanUp, ArrayList<SharedBufferEntry<K, V>> toRemove) {
          — End diff –

          Replace ArrayList by List in the arguments, unless we need it to be ArrayList explicitly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user smarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r74711517 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java — @@ -256,16 +267,75 @@ public void prune(long pruningTimestamp) { edge.getTarget(), edge.getVersion(), copy)); + if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) { + cleanUp.add(edge.getTarget()); + } } } } } } } + // Remove shared buffer entries to maintain correct matching behaviour + doCleanUp(new Predicate<K, V>() { + + @Override + public boolean toRemove(SharedBufferEntry<K, V> entry) { + return cleanUp.contains(entry); + } + }); + // Remove all entries that are dependent on the current event + if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) { + doCleanUp(new Predicate<K, V>() { + + @Override + public boolean toRemove(SharedBufferEntry<K, V> entry) { + if (entry == null) { + return false; + } + return entry.getValueTime().value == value + && entry.getValueTime().timestamp == timestamp; + } + }); + } + return result; } + private void doCleanUp(Predicate<K, V> predicate) { + ArrayList<SharedBufferEntry<K, V>> toRemove = new ArrayList<>(); + for (SharedBufferPage<K, V> page : this.pages.values()) { + for (SharedBufferEntry<K, V> entry : page.getEntries()) { + if (entry.getReferenceCounter() <= 1) { + doRecursiveCleanup(entry, predicate, toRemove); + } + } + } + + for (SharedBufferEntry<K, V> startNode: toRemove) { + release(startNode.page.getKey(), startNode.getValueTime().value, startNode.getValueTime().getTimestamp()); + remove(startNode.page.getKey(), startNode.getValueTime().value, startNode.getValueTime().getTimestamp()); + } + } + + private boolean doRecursiveCleanup(SharedBufferEntry<K, V> startNode, Predicate<K, V> cleanUp, ArrayList<SharedBufferEntry<K, V>> toRemove) { — End diff – Replace ArrayList by List in the arguments, unless we need it to be ArrayList explicitly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user smarthi commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2367#discussion_r74711491

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java —
          @@ -256,16 +267,75 @@ public void prune(long pruningTimestamp) {
          edge.getTarget(),
          edge.getVersion(),
          copy));
          + if (matchingBehaviour == MatchingBehaviour.AFTER_LAST)

          { + cleanUp.add(edge.getTarget()); + }

          }
          }
          }
          }
          }
          }

          + // Remove shared buffer entries to maintain correct matching behaviour
          + doCleanUp(new Predicate<K, V>() {
          +
          + @Override
          + public boolean toRemove(SharedBufferEntry<K, V> entry)

          { + return cleanUp.contains(entry); + }

          + });
          + // Remove all entries that are dependent on the current event
          + if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) {
          + doCleanUp(new Predicate<K, V>() {
          +
          + @Override
          + public boolean toRemove(SharedBufferEntry<K, V> entry) {
          + if (entry == null)

          { + return false; + }

          + return entry.getValueTime().value == value
          + && entry.getValueTime().timestamp == timestamp;
          + }
          + });
          + }
          +
          return result;
          }

          + private void doCleanUp(Predicate<K, V> predicate) {
          + ArrayList<SharedBufferEntry<K, V>> toRemove = new ArrayList<>();
          — End diff –

          Any reason for using a Concreate class - ArrayList on the LS here ? Can it be
          `List<SharedBufferEntry<K, V>> toRemove = new ArrayList<>();`

          Show
          githubbot ASF GitHub Bot added a comment - Github user smarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r74711491 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java — @@ -256,16 +267,75 @@ public void prune(long pruningTimestamp) { edge.getTarget(), edge.getVersion(), copy)); + if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) { + cleanUp.add(edge.getTarget()); + } } } } } } } + // Remove shared buffer entries to maintain correct matching behaviour + doCleanUp(new Predicate<K, V>() { + + @Override + public boolean toRemove(SharedBufferEntry<K, V> entry) { + return cleanUp.contains(entry); + } + }); + // Remove all entries that are dependent on the current event + if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) { + doCleanUp(new Predicate<K, V>() { + + @Override + public boolean toRemove(SharedBufferEntry<K, V> entry) { + if (entry == null) { + return false; + } + return entry.getValueTime().value == value + && entry.getValueTime().timestamp == timestamp; + } + }); + } + return result; } + private void doCleanUp(Predicate<K, V> predicate) { + ArrayList<SharedBufferEntry<K, V>> toRemove = new ArrayList<>(); — End diff – Any reason for using a Concreate class - ArrayList on the LS here ? Can it be `List<SharedBufferEntry<K, V>> toRemove = new ArrayList<>();`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user smarthi commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2367#discussion_r74711450

          — Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java —
          @@ -26,6 +26,7 @@
          import akka.pattern.Patterns;
          import akka.util.Timeout;

          +import com.google.common.base.Joiner;
          — End diff –

          Avoid using Guava API calls. You could use Commons lang StringUtils.join() instead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user smarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r74711450 — Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java — @@ -26,6 +26,7 @@ import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.base.Joiner; — End diff – Avoid using Guava API calls. You could use Commons lang StringUtils.join() instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user mushketyk opened a pull request:

          https://github.com/apache/flink/pull/2367

          FLINK-3703[cep] Add sequence matching semantics to discard matched events

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          This PR adds sequence matching semantics to matched events, so a user can specify if a particular event can be used in multiple matching sequences.
          I didn't add user documentation yet, but I'll add it when someone from Flink community will check if this PR implements matching semantics correctly.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/mushketyk/flink sequence-matching-semantics

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2367.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2367


          commit 67a0502ec8586caae94992eeb29125151fc392d7
          Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
          Date: 2016-08-12T22:46:50Z

          FLINK-3703 Minor refactoring

          commit 010abdb4dd8649ded338caaaeca857357585173b
          Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
          Date: 2016-08-13T20:42:55Z

          FLINK-3703[cep] Add sequence matching semantics to discard matched events


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/2367 FLINK-3703 [cep] Add sequence matching semantics to discard matched events Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR adds sequence matching semantics to matched events, so a user can specify if a particular event can be used in multiple matching sequences. I didn't add user documentation yet, but I'll add it when someone from Flink community will check if this PR implements matching semantics correctly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink sequence-matching-semantics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2367.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2367 commit 67a0502ec8586caae94992eeb29125151fc392d7 Author: Ivan Mushketyk <ivan.mushketik@gmail.com> Date: 2016-08-12T22:46:50Z FLINK-3703 Minor refactoring commit 010abdb4dd8649ded338caaaeca857357585173b Author: Ivan Mushketyk <ivan.mushketik@gmail.com> Date: 2016-08-13T20:42:55Z FLINK-3703 [cep] Add sequence matching semantics to discard matched events
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Hi Ivan Mushketyk,

          Yes you're right with your description of the matching behaviours. The list mustn't necessarily be exhaustive. If you can think of other interesting matching behaviours, then you can add them as well.

          The matching semantics should be assigned to the pattern definition and then applied to each sequence individually. For "after first" this would mean that you remove all elements of the matched sequence from the shared buffer and all other elements which have the first element as a predecessor (also transitively). I hope this makes things a bit clearer

          Show
          till.rohrmann Till Rohrmann added a comment - Hi Ivan Mushketyk , Yes you're right with your description of the matching behaviours. The list mustn't necessarily be exhaustive. If you can think of other interesting matching behaviours, then you can add them as well. The matching semantics should be assigned to the pattern definition and then applied to each sequence individually. For "after first" this would mean that you remove all elements of the matched sequence from the shared buffer and all other elements which have the first element as a predecessor (also transitively). I hope this makes things a bit clearer
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi Till Rohrmann,

          Thank you for your reply.

          Just to make sure that I understand your idea.
          By default the pattern mentioned in the issue description will generate matching sequences: A, B; A, C; B, C;

          Do I understand correctly that:
          "after first" will generate sequences A, B; and B, C; but won't generate A, C; since event A can participate as the first element only once
          "after last" will generate sequence A, B; since every event can only participate in the matching once

          Also, do I understand correctly that matching semantics should be defined on the sequence level and not on a level of a particular pattern?

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi Till Rohrmann , Thank you for your reply. Just to make sure that I understand your idea. By default the pattern mentioned in the issue description will generate matching sequences: A, B; A, C; B, C; Do I understand correctly that: "after first" will generate sequences A, B; and B, C; but won't generate A, C; since event A can participate as the first element only once "after last" will generate sequence A, B; since every event can only participate in the matching once Also, do I understand correctly that matching semantics should be defined on the sequence level and not on a level of a particular pattern?
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Hi Ivan Mushketyk,

          I think this feature should be part of the pattern definition. Maybe we could add it like:

          Pattern<Event>.begin("a").followedBy("b").matchingBehaviour(MatchingBehaviour.AfterLast)
          
          Show
          till.rohrmann Till Rohrmann added a comment - Hi Ivan Mushketyk , I think this feature should be part of the pattern definition. Maybe we could add it like: Pattern<Event>.begin( "a" ).followedBy( "b" ).matchingBehaviour(MatchingBehaviour.AfterLast)
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi,

          Do you have an idea how a code example with this feature would look like?

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi, Do you have an idea how a code example with this feature would look like?

            People

            • Assignee:
              ivan.mushketyk Ivan Mushketyk
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              1 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development