Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3318

Add support for quantifiers to CEP's pattern API

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.0.0
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Description

      It would be a good addition to extend the pattern API to support quantifiers known from regular expressions (e.g. Kleene star, ?, +, or count bounds). This would considerably enrich the set of supported patterns.

      Implementing the count bounds could be done by unrolling the pattern state. In order to support the Kleene star operator, the NFACompiler has to be extended to insert epsilon-transition between a Kleene start state and the succeeding pattern state. In order to support ?, one could insert two paths from the preceding state, one which accepts the event and another which directly goes into the next pattern state.

        Issue Links

          Activity

          Hide
          rt2357 Robert Thorman added a comment - - edited

          So would the idea here be something like the following?

          Pattern.begin("start").followedBy("next?").followedBy("third

          {1,3}").followedBy("end*");

          I'm not sure if the capture group () delimiters should be applied to make it more obvious that the Kleene start notation applies to the terms, like this:

          Pattern.begin("start").followedBy("(next)?").followedBy("(third){1,3}

          ").followedBy("(end)(*");

          I wouldn't mind helping with this.

          Show
          rt2357 Robert Thorman added a comment - - edited So would the idea here be something like the following? Pattern.begin("start").followedBy("next?").followedBy("third {1,3}").followedBy("end*"); I'm not sure if the capture group () delimiters should be applied to make it more obvious that the Kleene start notation applies to the terms, like this: Pattern.begin("start").followedBy("(next)?").followedBy("(third){1,3} ").followedBy("(end)(*"); I wouldn't mind helping with this.
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment - - edited

          Hi,

          Do you mean something like this:

          Pattern.begin("start")
            .oneOrMore(where(...).subtype(SubEvent.class)) // + operator
            .followedBy("next").optional(where(...).subtype(...)) // ? operator
            .next("end").count(1, 3).(where(...)) // count bounds operator
          

          If no one is working on this and you like the idea, I can give it a try.

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - - edited Hi, Do you mean something like this: Pattern.begin( "start" ) .oneOrMore(where(...).subtype(SubEvent.class)) // + operator .followedBy( "next" ).optional(where(...).subtype(...)) // ? operator .next( "end" ).count(1, 3).(where(...)) // count bounds operator If no one is working on this and you like the idea, I can give it a try.
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Hi Ivan Mushketyk,

          yes I think we need some API calls which let you specify for a node how often it can appear. I think you're proposal looks good, but I would not let the quantifier calls take as a parameter the filter conditions.

          Pattern.begin("start").optional().followedBy("next").where(...).times(1, 3).next("end").subtype(...).every()
          
          Show
          till.rohrmann Till Rohrmann added a comment - Hi Ivan Mushketyk , yes I think we need some API calls which let you specify for a node how often it can appear. I think you're proposal looks good, but I would not let the quantifier calls take as a parameter the filter conditions. Pattern.begin( "start" ).optional().followedBy( "next" ).where(...).times(1, 3).next( "end" ).subtype(...).every()
          Hide
          ivan.mushketyk Ivan Mushketyk added a comment -

          Hi Till Rohrmann,

          Thank you for your comment. I like your idea better. Since no one seems to work on this, I would like to help with this.
          I can start with the + operator and create a PR for it so we can decide if I go in the right direction with this and then add support for other operators in later PRs.

          Show
          ivan.mushketyk Ivan Mushketyk added a comment - Hi Till Rohrmann , Thank you for your comment. I like your idea better. Since no one seems to work on this, I would like to help with this. I can start with the + operator and create a PR for it so we can decide if I go in the right direction with this and then add support for other operators in later PRs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user mushketyk opened a pull request:

          https://github.com/apache/flink/pull/2339

          FLINK-3318 Implement one or many and optional quantifiers

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/mushketyk/flink cep-operators

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2339.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2339


          commit 17400369ec79ffd1d153973faa8259550a607a9d
          Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
          Date: 2016-08-05T20:05:39Z

          FLINK-3318 Implement one or many and optional quantifiers

          commit cdd8196cf6b32c287ced3627c5207d4c479a85a8
          Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
          Date: 2016-08-06T17:38:14Z

          FLINK-3318 Add scala support for pattern quantifiers

          commit 08fc02befd74bc861a86c5191c91ebb30e865f56
          Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
          Date: 2016-08-06T17:41:48Z

          FLINK-3318 Minor refactoring


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/2339 FLINK-3318 Implement one or many and optional quantifiers Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink cep-operators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2339.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2339 commit 17400369ec79ffd1d153973faa8259550a607a9d Author: Ivan Mushketyk <ivan.mushketik@gmail.com> Date: 2016-08-05T20:05:39Z FLINK-3318 Implement one or many and optional quantifiers commit cdd8196cf6b32c287ced3627c5207d4c479a85a8 Author: Ivan Mushketyk <ivan.mushketik@gmail.com> Date: 2016-08-06T17:38:14Z FLINK-3318 Add scala support for pattern quantifiers commit 08fc02befd74bc861a86c5191c91ebb30e865f56 Author: Ivan Mushketyk <ivan.mushketik@gmail.com> Date: 2016-08-06T17:41:48Z FLINK-3318 Minor refactoring
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2339

          So far I've only implemented one or many and optional quantifiers. I would like to implement other quantifiers, but I would like to make sure that I am on the right track before I proceed any further.

          I will update user documentation when all quantifiers are implemented.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2339 So far I've only implemented one or many and optional quantifiers. I would like to implement other quantifiers, but I would like to make sure that I am on the right track before I proceed any further. I will update user documentation when all quantifiers are implemented.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2339

          I realized that there is still some work to do here. I'll reopen this pull request a bit later.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2339 I realized that there is still some work to do here. I'll reopen this pull request a bit later.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk closed the pull request at:

          https://github.com/apache/flink/pull/2339

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk closed the pull request at: https://github.com/apache/flink/pull/2339
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user mushketyk opened a pull request:

          https://github.com/apache/flink/pull/2361

          FLINK-3318[cep] Add support for quantifiers to CEP's pattern API

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/mushketyk/flink cep-operators

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2361.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2361


          commit 96c077a4c1c2c1ccba678ec863775402554d6dcf
          Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
          Date: 2016-08-05T20:05:39Z

          FLINK-3318[cep] Add support for quantifiers to CEP's pattern API

          commit 425fa3d94d15ea6b1396e7bf7a901f7318f107b0
          Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
          Date: 2016-08-11T21:10:43Z

          FLINK-3318[cep] Add documentation about pattern quantifiers


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/2361 FLINK-3318 [cep] Add support for quantifiers to CEP's pattern API Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink cep-operators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2361.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2361 commit 96c077a4c1c2c1ccba678ec863775402554d6dcf Author: Ivan Mushketyk <ivan.mushketik@gmail.com> Date: 2016-08-05T20:05:39Z FLINK-3318 [cep] Add support for quantifiers to CEP's pattern API commit 425fa3d94d15ea6b1396e7bf7a901f7318f107b0 Author: Ivan Mushketyk <ivan.mushketik@gmail.com> Date: 2016-08-11T21:10:43Z FLINK-3318 [cep] Add documentation about pattern quantifiers
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk closed the pull request at:

          https://github.com/apache/flink/pull/2361

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk closed the pull request at: https://github.com/apache/flink/pull/2361
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2361

          I've added support for zeroToMany, oneToMany, optional and count quantifiers in CEP patterns.
          I had to change logic of NFACompiler quite a bit to accommodate new

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2361 I've added support for zeroToMany, oneToMany, optional and count quantifiers in CEP patterns. I had to change logic of NFACompiler quite a bit to accommodate new
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user mushketyk reopened a pull request:

          https://github.com/apache/flink/pull/2361

          FLINK-3318[cep] Add support for quantifiers to CEP's pattern API

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/mushketyk/flink cep-operators

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2361.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2361


          commit 96c077a4c1c2c1ccba678ec863775402554d6dcf
          Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
          Date: 2016-08-05T20:05:39Z

          FLINK-3318[cep] Add support for quantifiers to CEP's pattern API

          commit 425fa3d94d15ea6b1396e7bf7a901f7318f107b0
          Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
          Date: 2016-08-11T21:10:43Z

          FLINK-3318[cep] Add documentation about pattern quantifiers


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user mushketyk reopened a pull request: https://github.com/apache/flink/pull/2361 FLINK-3318 [cep] Add support for quantifiers to CEP's pattern API Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink cep-operators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2361.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2361 commit 96c077a4c1c2c1ccba678ec863775402554d6dcf Author: Ivan Mushketyk <ivan.mushketik@gmail.com> Date: 2016-08-05T20:05:39Z FLINK-3318 [cep] Add support for quantifiers to CEP's pattern API commit 425fa3d94d15ea6b1396e7bf7a901f7318f107b0 Author: Ivan Mushketyk <ivan.mushketik@gmail.com> Date: 2016-08-11T21:10:43Z FLINK-3318 [cep] Add documentation about pattern quantifiers
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2361

          @tillrohrmann , could you please take a look at this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2361 @tillrohrmann , could you please take a look at this PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2361

          Could someone please take a look at this PR? It has been here without a review for more than 2 weeks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2361 Could someone please take a look at this PR? It has been here without a review for more than 2 weeks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tttMelody commented on the issue:

          https://github.com/apache/flink/pull/2361

          it's an important feature,esper and siddhi all support this.hope to relase as soon as possible

          Show
          githubbot ASF GitHub Bot added a comment - Github user tttMelody commented on the issue: https://github.com/apache/flink/pull/2361 it's an important feature,esper and siddhi all support this.hope to relase as soon as possible
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chermenin commented on the issue:

          https://github.com/apache/flink/pull/2361

          @mushketyk It seems needed to rebase this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user chermenin commented on the issue: https://github.com/apache/flink/pull/2361 @mushketyk It seems needed to rebase this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user chermenin commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2361#discussion_r97264252

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java —
          @@ -43,7 +43,14 @@ public State(final String name, final StateType stateType)

          { this.name = name; this.stateType = stateType; - stateTransitions = new ArrayList<StateTransition<T>>(); + stateTransitions = new ArrayList<>(); + }

          +
          + public State(String name, StateType stateType, Collection<StateTransition<T>> stateTransitions) {
          — End diff –

          It seems that this constructor is never used. What is that for?

          Show
          githubbot ASF GitHub Bot added a comment - Github user chermenin commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97264252 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java — @@ -43,7 +43,14 @@ public State(final String name, final StateType stateType) { this.name = name; this.stateType = stateType; - stateTransitions = new ArrayList<StateTransition<T>>(); + stateTransitions = new ArrayList<>(); + } + + public State(String name, StateType stateType, Collection<StateTransition<T>> stateTransitions) { — End diff – It seems that this constructor is never used. What is that for?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2361#discussion_r97790385

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -130,26 +114,166 @@
          }

          // add the beginning state

          • final State<T> beginningState;
            + State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
            + addTransitions(beginningState, -1, patterns, states);
            + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + }
            + }
          • if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - }

            else

            { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - }

            + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
            + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
            + State<T> succeedingState = states.get(succeedingPattern.getName());

          • beginningState.addStateTransition(new StateTransition<T>(
            + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + }

            else {
            + currentState.addStateTransition(new StateTransition<T>(
            StateTransitionAction.TAKE,

          • currentState,
          • (FilterFunction<T>) currentPattern.getFilterFunction()
            + succeedingState,
            + (FilterFunction<T>) succeedingPattern.getFilterFunction()
            ));
          • return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + }

            + if (isPatternOptional(succeedingPattern))

            { + addOptionalTransitions(currentState, patternPos, patterns, states); + }

            + }
            + }
            +
            + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states)

            Unknown macro: { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } }

          /**
          + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in:
          + *
          + * ---- ----- ------
          + * |State+>|State#1+>|State#2+
          + * -+ ----- ---+
          + */
          + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos,
          + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
          + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
          + State<T> succeedingState = states.get(succeedingPattern.getName());
          + Pattern<T, ?> currentPattern = patterns.get(patternPos);
          +
          + State<T> currentRepeatingState = null;
          + State<T> nextRepeatingState = currentState;
          + for (int i = 1; i < currentPattern.getMaxCount(); i++) {
          + currentRepeatingState = nextRepeatingState;
          + nextRepeatingState = new State<>(
          + currentState.getName() + "#" + i,
          + State.StateType.Normal);
          + states.put(nextRepeatingState.getName(), nextRepeatingState);
          + currentRepeatingState.addStateTransition(new StateTransition<T>(
          + StateTransitionAction.TAKE,
          + nextRepeatingState,
          + (FilterFunction<T>) currentPattern.getFilterFunction()));
          +
          + // Add a transition around optional pattern.
          + // count(2,3) will result in:
          + // ---- ----- ----- ---
          + // |State+>|State#1+>|State#2+>|Next|
          + // -+ ----- --+ --+
          + // | ^
          + // --------------------
          + if (i >= currentPattern.getMinCount())

          { + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + }

          + }
          + nextRepeatingState.addStateTransition(new StateTransition<T>(
          + StateTransitionAction.TAKE,
          + succeedingState,
          + (FilterFunction<T>) succeedingPattern.getFilterFunction()));
          + }
          +
          + private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) {
          + if (patternPos == -1)

          { + return false; + }

          +
          + Pattern<T, ?> pattern = patterns.get(patternPos);
          + return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
          + }
          +
          + private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState)

          { + succeedingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + }

          +
          + private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern)

          { + return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY; + }

          +
          + private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) {
          + int pos = startPos;
          + for (; pos < patterns.size(); pos++) {
          + Pattern<T, ?> pattern = patterns.get(pos);
          + if (!isPatternOptional(pattern))

          { + return pos; + }

          + }
          +
          + return pos;
          + }
          +
          + private static <T> Map<String, State<T>> createStatesFrom(ArrayList<Pattern<T, ?>> patterns) {
          + Map<String, State<T>> states = new HashMap<>();
          +
          + boolean foundNonOptionalPattern = false;
          + for (int i = patterns.size() - 1; i >= 0; i--)

          { + Pattern<T, ?> pattern = patterns.get(i); + State.StateType stateType = foundNonOptionalPattern ? State.StateType.Normal + : State.StateType.Final; + State<T> newState = new State<>(pattern.getName(), stateType); + foundNonOptionalPattern |= !isPatternOptional(pattern); + states.put(newState.getName(), newState); + }

          +
          + State<T> beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
          + states.put(BEGINNING_STATE_NAME, beginningState);
          + return states;
          + }
          +
          + private static <T> boolean isPatternOptional(Pattern<T, ?> pattern)

          { + return pattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || pattern.getQuantifier() == Quantifier.OPTIONAL + || pattern.getMinCount() == 0; + }

          +
          + private static <T> ArrayList<Pattern<T, ?>> createPatternsList(Pattern<T, ?> pattern) {
          — End diff –

          The return type can become a `List` instead of `ArrayList`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790385 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -130,26 +114,166 @@ } // add the beginning state final State<T> beginningState; + State<T> beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); beginningState.addStateTransition(new StateTransition<T>( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition<T>( StateTransitionAction.TAKE, currentState, (FilterFunction<T>) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction() )); return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) Unknown macro: { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } } /** + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: + * + * ---- ----- ------ + * |State+ >|State#1+ >|State#2+ + * - + ----- ---+ + */ + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos, + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); + Pattern<T, ?> currentPattern = patterns.get(patternPos); + + State<T> currentRepeatingState = null; + State<T> nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>( + currentState.getName() + "#" + i, + State.StateType.Normal); + states.put(nextRepeatingState.getName(), nextRepeatingState); + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + nextRepeatingState, + (FilterFunction<T>) currentPattern.getFilterFunction())); + + // Add a transition around optional pattern. + // count(2,3) will result in: + // ---- ----- ----- --- + // |State+ >|State#1+ >|State#2+ >|Next| + // - + ----- -- + --+ + // | ^ + // -------------------- + if (i >= currentPattern.getMinCount()) { + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + } + nextRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) { + if (patternPos == -1) { + return false; + } + + Pattern<T, ?> pattern = patterns.get(patternPos); + return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1; + } + + private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState) { + succeedingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern) { + return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY; + } + + private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) { + int pos = startPos; + for (; pos < patterns.size(); pos++) { + Pattern<T, ?> pattern = patterns.get(pos); + if (!isPatternOptional(pattern)) { + return pos; + } + } + + return pos; + } + + private static <T> Map<String, State<T>> createStatesFrom(ArrayList<Pattern<T, ?>> patterns) { + Map<String, State<T>> states = new HashMap<>(); + + boolean foundNonOptionalPattern = false; + for (int i = patterns.size() - 1; i >= 0; i--) { + Pattern<T, ?> pattern = patterns.get(i); + State.StateType stateType = foundNonOptionalPattern ? State.StateType.Normal + : State.StateType.Final; + State<T> newState = new State<>(pattern.getName(), stateType); + foundNonOptionalPattern |= !isPatternOptional(pattern); + states.put(newState.getName(), newState); + } + + State<T> beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); + states.put(BEGINNING_STATE_NAME, beginningState); + return states; + } + + private static <T> boolean isPatternOptional(Pattern<T, ?> pattern) { + return pattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || pattern.getQuantifier() == Quantifier.OPTIONAL + || pattern.getMinCount() == 0; + } + + private static <T> ArrayList<Pattern<T, ?>> createPatternsList(Pattern<T, ?> pattern) { — End diff – The return type can become a `List` instead of `ArrayList`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2361#discussion_r97790105

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -130,26 +114,166 @@
          }

          // add the beginning state

          • final State<T> beginningState;
            + State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
            + addTransitions(beginningState, -1, patterns, states);
            + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + }
            + }
          • if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - }

            else

            { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - }

            + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
            + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
            + State<T> succeedingState = states.get(succeedingPattern.getName());

          • beginningState.addStateTransition(new StateTransition<T>(
            + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + }

            else {
            + currentState.addStateTransition(new StateTransition<T>(
            StateTransitionAction.TAKE,

          • currentState,
          • (FilterFunction<T>) currentPattern.getFilterFunction()
            + succeedingState,
            + (FilterFunction<T>) succeedingPattern.getFilterFunction()
            ));
          • return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + }

            + if (isPatternOptional(succeedingPattern))

            { + addOptionalTransitions(currentState, patternPos, patterns, states); + }

            + }
            + }
            +
            + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states)

            Unknown macro: { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } }

          /**
          + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in:
          + *
          + * ---- ----- ------
          + * |State+>|State#1+>|State#2+
          + * -+ ----- ---+
          + */
          + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos,
          + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
          + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
          + State<T> succeedingState = states.get(succeedingPattern.getName());
          + Pattern<T, ?> currentPattern = patterns.get(patternPos);
          +
          + State<T> currentRepeatingState = null;
          + State<T> nextRepeatingState = currentState;
          + for (int i = 1; i < currentPattern.getMaxCount(); i++) {
          + currentRepeatingState = nextRepeatingState;
          + nextRepeatingState = new State<>(
          + currentState.getName() + "#" + i,
          + State.StateType.Normal);
          + states.put(nextRepeatingState.getName(), nextRepeatingState);
          + currentRepeatingState.addStateTransition(new StateTransition<T>(
          + StateTransitionAction.TAKE,
          + nextRepeatingState,
          + (FilterFunction<T>) currentPattern.getFilterFunction()));
          +
          + // Add a transition around optional pattern.
          + // count(2,3) will result in:
          + // ---- ----- ----- ---
          + // |State+>|State#1+>|State#2+>|Next|
          + // -+ ----- --+ --+
          + // | ^
          + // --------------------
          + if (i >= currentPattern.getMinCount())

          { + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + }

          + }
          + nextRepeatingState.addStateTransition(new StateTransition<T>(
          + StateTransitionAction.TAKE,
          + succeedingState,
          + (FilterFunction<T>) succeedingPattern.getFilterFunction()));
          + }
          +
          + private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) {
          + if (patternPos == -1)

          { + return false; + }

          +
          + Pattern<T, ?> pattern = patterns.get(patternPos);
          + return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
          + }
          +
          + private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState)

          { + succeedingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + }

          +
          + private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern)

          { + return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY; + }

          +
          + private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) {
          — End diff –

          The `patterns` can become a `List` instead of `ArrayList`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790105 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -130,26 +114,166 @@ } // add the beginning state final State<T> beginningState; + State<T> beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); beginningState.addStateTransition(new StateTransition<T>( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition<T>( StateTransitionAction.TAKE, currentState, (FilterFunction<T>) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction() )); return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) Unknown macro: { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } } /** + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: + * + * ---- ----- ------ + * |State+ >|State#1+ >|State#2+ + * - + ----- ---+ + */ + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos, + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); + Pattern<T, ?> currentPattern = patterns.get(patternPos); + + State<T> currentRepeatingState = null; + State<T> nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>( + currentState.getName() + "#" + i, + State.StateType.Normal); + states.put(nextRepeatingState.getName(), nextRepeatingState); + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + nextRepeatingState, + (FilterFunction<T>) currentPattern.getFilterFunction())); + + // Add a transition around optional pattern. + // count(2,3) will result in: + // ---- ----- ----- --- + // |State+ >|State#1+ >|State#2+ >|Next| + // - + ----- -- + --+ + // | ^ + // -------------------- + if (i >= currentPattern.getMinCount()) { + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + } + nextRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) { + if (patternPos == -1) { + return false; + } + + Pattern<T, ?> pattern = patterns.get(patternPos); + return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1; + } + + private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState) { + succeedingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern) { + return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY; + } + + private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) { — End diff – The `patterns` can become a `List` instead of `ArrayList`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2361#discussion_r97789988

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -130,26 +114,166 @@
          }

          // add the beginning state

          • final State<T> beginningState;
            + State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
            + addTransitions(beginningState, -1, patterns, states);
            + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + }
            + }
          • if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - }

            else

            { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - }

            + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
            + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
            + State<T> succeedingState = states.get(succeedingPattern.getName());

          • beginningState.addStateTransition(new StateTransition<T>(
            + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + }

            else {
            + currentState.addStateTransition(new StateTransition<T>(
            StateTransitionAction.TAKE,

          • currentState,
          • (FilterFunction<T>) currentPattern.getFilterFunction()
            + succeedingState,
            + (FilterFunction<T>) succeedingPattern.getFilterFunction()
            ));
          • return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + }

            + if (isPatternOptional(succeedingPattern))

            { + addOptionalTransitions(currentState, patternPos, patterns, states); + }

            + }
            + }
            +
            + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {

              • End diff –

          The `patterns` can become a `List` instead of `ArrayList`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97789988 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -130,26 +114,166 @@ } // add the beginning state final State<T> beginningState; + State<T> beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); beginningState.addStateTransition(new StateTransition<T>( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition<T>( StateTransitionAction.TAKE, currentState, (FilterFunction<T>) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction() )); return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { End diff – The `patterns` can become a `List` instead of `ArrayList`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2361#discussion_r97790026

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -130,26 +114,166 @@
          }

          // add the beginning state

          • final State<T> beginningState;
            + State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
            + addTransitions(beginningState, -1, patterns, states);
            + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + }
            + }
          • if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - }

            else

            { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - }

            + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
            + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
            + State<T> succeedingState = states.get(succeedingPattern.getName());

          • beginningState.addStateTransition(new StateTransition<T>(
            + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + }

            else {
            + currentState.addStateTransition(new StateTransition<T>(
            StateTransitionAction.TAKE,

          • currentState,
          • (FilterFunction<T>) currentPattern.getFilterFunction()
            + succeedingState,
            + (FilterFunction<T>) succeedingPattern.getFilterFunction()
            ));
          • return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + }

            + if (isPatternOptional(succeedingPattern))

            { + addOptionalTransitions(currentState, patternPos, patterns, states); + }

            + }
            + }
            +
            + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states)

            Unknown macro: { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } }

          /**
          + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in:
          + *
          + * ---- ----- ------
          + * |State+>|State#1+>|State#2+
          + * -+ ----- ---+
          + */
          + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos,
          + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
          — End diff –

          The `patterns` can become a `List` instead of `ArrayList`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790026 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -130,26 +114,166 @@ } // add the beginning state final State<T> beginningState; + State<T> beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); beginningState.addStateTransition(new StateTransition<T>( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition<T>( StateTransitionAction.TAKE, currentState, (FilterFunction<T>) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction() )); return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) Unknown macro: { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } } /** + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: + * + * ---- ----- ------ + * |State+ >|State#1+ >|State#2+ + * - + ----- ---+ + */ + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos, + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { — End diff – The `patterns` can become a `List` instead of `ArrayList`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2361#discussion_r97790271

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -130,26 +114,166 @@
          }

          // add the beginning state

          • final State<T> beginningState;
            + State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
            + addTransitions(beginningState, -1, patterns, states);
            + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + }
            + }
          • if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - }

            else

            { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - }

            + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
            + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
            + State<T> succeedingState = states.get(succeedingPattern.getName());

          • beginningState.addStateTransition(new StateTransition<T>(
            + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + }

            else {
            + currentState.addStateTransition(new StateTransition<T>(
            StateTransitionAction.TAKE,

          • currentState,
          • (FilterFunction<T>) currentPattern.getFilterFunction()
            + succeedingState,
            + (FilterFunction<T>) succeedingPattern.getFilterFunction()
            ));
          • return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + }

            + if (isPatternOptional(succeedingPattern))

            { + addOptionalTransitions(currentState, patternPos, patterns, states); + }

            + }
            + }
            +
            + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states)

            Unknown macro: { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } }

          /**
          + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in:
          + *
          + * ---- ----- ------
          + * |State+>|State#1+>|State#2+
          + * -+ ----- ---+
          + */
          + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos,
          + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
          + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
          + State<T> succeedingState = states.get(succeedingPattern.getName());
          + Pattern<T, ?> currentPattern = patterns.get(patternPos);
          +
          + State<T> currentRepeatingState = null;
          + State<T> nextRepeatingState = currentState;
          + for (int i = 1; i < currentPattern.getMaxCount(); i++) {
          + currentRepeatingState = nextRepeatingState;
          + nextRepeatingState = new State<>(
          + currentState.getName() + "#" + i,
          + State.StateType.Normal);
          + states.put(nextRepeatingState.getName(), nextRepeatingState);
          + currentRepeatingState.addStateTransition(new StateTransition<T>(
          + StateTransitionAction.TAKE,
          + nextRepeatingState,
          + (FilterFunction<T>) currentPattern.getFilterFunction()));
          +
          + // Add a transition around optional pattern.
          + // count(2,3) will result in:
          + // ---- ----- ----- ---
          + // |State+>|State#1+>|State#2+>|Next|
          + // -+ ----- --+ --+
          + // | ^
          + // --------------------
          + if (i >= currentPattern.getMinCount())

          { + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + }

          + }
          + nextRepeatingState.addStateTransition(new StateTransition<T>(
          + StateTransitionAction.TAKE,
          + succeedingState,
          + (FilterFunction<T>) succeedingPattern.getFilterFunction()));
          + }
          +
          + private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) {
          + if (patternPos == -1)

          { + return false; + }

          +
          + Pattern<T, ?> pattern = patterns.get(patternPos);
          + return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
          + }
          +
          + private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState)

          { + succeedingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + }

          +
          + private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern)

          { + return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY; + }

          +
          + private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) {
          + int pos = startPos;
          + for (; pos < patterns.size(); pos++) {
          + Pattern<T, ?> pattern = patterns.get(pos);
          + if (!isPatternOptional(pattern))

          { + return pos; + }

          + }
          +
          + return pos;
          + }
          +
          + private static <T> Map<String, State<T>> createStatesFrom(ArrayList<Pattern<T, ?>> patterns) {
          — End diff –

          The `patterns` can become a `List` instead of `ArrayList`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790271 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -130,26 +114,166 @@ } // add the beginning state final State<T> beginningState; + State<T> beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); beginningState.addStateTransition(new StateTransition<T>( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition<T>( StateTransitionAction.TAKE, currentState, (FilterFunction<T>) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction() )); return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) Unknown macro: { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } } /** + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: + * + * ---- ----- ------ + * |State+ >|State#1+ >|State#2+ + * - + ----- ---+ + */ + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos, + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); + Pattern<T, ?> currentPattern = patterns.get(patternPos); + + State<T> currentRepeatingState = null; + State<T> nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>( + currentState.getName() + "#" + i, + State.StateType.Normal); + states.put(nextRepeatingState.getName(), nextRepeatingState); + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + nextRepeatingState, + (FilterFunction<T>) currentPattern.getFilterFunction())); + + // Add a transition around optional pattern. + // count(2,3) will result in: + // ---- ----- ----- --- + // |State+ >|State#1+ >|State#2+ >|Next| + // - + ----- -- + --+ + // | ^ + // -------------------- + if (i >= currentPattern.getMinCount()) { + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + } + nextRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) { + if (patternPos == -1) { + return false; + } + + Pattern<T, ?> pattern = patterns.get(patternPos); + return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1; + } + + private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState) { + succeedingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern) { + return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY; + } + + private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) { + int pos = startPos; + for (; pos < patterns.size(); pos++) { + Pattern<T, ?> pattern = patterns.get(pos); + if (!isPatternOptional(pattern)) { + return pos; + } + } + + return pos; + } + + private static <T> Map<String, State<T>> createStatesFrom(ArrayList<Pattern<T, ?>> patterns) { — End diff – The `patterns` can become a `List` instead of `ArrayList`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2361#discussion_r97789351

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java —
          @@ -43,7 +43,14 @@ public State(final String name, final StateType stateType)

          { this.name = name; this.stateType = stateType; - stateTransitions = new ArrayList<StateTransition<T>>(); + stateTransitions = new ArrayList<>(); + }

          +
          + public State(String name, StateType stateType, Collection<StateTransition<T>> stateTransitions) {
          — End diff –

          I agree with @chermenin and this class can be reverted to its previous state. In general, PRs should have the smallest diff possible in order to be easier to review.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97789351 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java — @@ -43,7 +43,14 @@ public State(final String name, final StateType stateType) { this.name = name; this.stateType = stateType; - stateTransitions = new ArrayList<StateTransition<T>>(); + stateTransitions = new ArrayList<>(); + } + + public State(String name, StateType stateType, Collection<StateTransition<T>> stateTransitions) { — End diff – I agree with @chermenin and this class can be reverted to its previous state. In general, PRs should have the smallest diff possible in order to be easier to review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2361#discussion_r97789775

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -80,25 +82,17 @@
          // return a factory for empty NFAs
          return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
          } else {
          + ArrayList<Pattern<T, ?>> patterns = createPatternsList(pattern);
          — End diff –

          The `patterns` can become a `List` instead of `ArrayList`. It is good to have the most generic type possible as argument or return type, as implementations may change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97789775 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -80,25 +82,17 @@ // return a factory for empty NFAs return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling); } else { + ArrayList<Pattern<T, ?>> patterns = createPatternsList(pattern); — End diff – The `patterns` can become a `List` instead of `ArrayList`. It is good to have the most generic type possible as argument or return type, as implementations may change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2361#discussion_r97790228

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -130,26 +114,166 @@
          }

          // add the beginning state

          • final State<T> beginningState;
            + State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
            + addTransitions(beginningState, -1, patterns, states);
            + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + }
            + }
          • if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - }

            else

            { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - }

            + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
            + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
            + State<T> succeedingState = states.get(succeedingPattern.getName());

          • beginningState.addStateTransition(new StateTransition<T>(
            + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + }

            else {
            + currentState.addStateTransition(new StateTransition<T>(
            StateTransitionAction.TAKE,

          • currentState,
          • (FilterFunction<T>) currentPattern.getFilterFunction()
            + succeedingState,
            + (FilterFunction<T>) succeedingPattern.getFilterFunction()
            ));
          • return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + }

            + if (isPatternOptional(succeedingPattern))

            { + addOptionalTransitions(currentState, patternPos, patterns, states); + }

            + }
            + }
            +
            + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states)

            Unknown macro: { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } }

          /**
          + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in:
          + *
          + * ---- ----- ------
          + * |State+>|State#1+>|State#2+
          + * -+ ----- ---+
          + */
          + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos,
          + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
          + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
          + State<T> succeedingState = states.get(succeedingPattern.getName());
          + Pattern<T, ?> currentPattern = patterns.get(patternPos);
          +
          + State<T> currentRepeatingState = null;
          + State<T> nextRepeatingState = currentState;
          + for (int i = 1; i < currentPattern.getMaxCount(); i++) {
          + currentRepeatingState = nextRepeatingState;
          + nextRepeatingState = new State<>(
          + currentState.getName() + "#" + i,
          + State.StateType.Normal);
          + states.put(nextRepeatingState.getName(), nextRepeatingState);
          + currentRepeatingState.addStateTransition(new StateTransition<T>(
          + StateTransitionAction.TAKE,
          + nextRepeatingState,
          + (FilterFunction<T>) currentPattern.getFilterFunction()));
          +
          + // Add a transition around optional pattern.
          + // count(2,3) will result in:
          + // ---- ----- ----- ---
          + // |State+>|State#1+>|State#2+>|Next|
          + // -+ ----- --+ --+
          + // | ^
          + // --------------------
          + if (i >= currentPattern.getMinCount())

          { + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + }

          + }
          + nextRepeatingState.addStateTransition(new StateTransition<T>(
          + StateTransitionAction.TAKE,
          + succeedingState,
          + (FilterFunction<T>) succeedingPattern.getFilterFunction()));
          + }
          +
          + private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) {
          + if (patternPos == -1)

          { + return false; + }

          +
          + Pattern<T, ?> pattern = patterns.get(patternPos);
          + return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
          + }
          +
          + private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState)

          { + succeedingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + }

          +
          + private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern)

          { + return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY; + }

          +
          + private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) {
          + int pos = startPos;
          + for (; pos < patterns.size(); pos++) {
          — End diff –

          This can become a `while-loop`

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97790228 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -130,26 +114,166 @@ } // add the beginning state final State<T> beginningState; + State<T> beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); beginningState.addStateTransition(new StateTransition<T>( + if (shouldRepeatPattern(patternPos, patterns)) { + expandRepeatingPattern(currentState, patternPos, patterns, states); + } else { + currentState.addStateTransition(new StateTransition<T>( StateTransitionAction.TAKE, currentState, (FilterFunction<T>) currentPattern.getFilterFunction() + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction() )); return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + if (shouldAddSelfTransition(succeedingPattern)) { + addTransitionToSelf(succeedingPattern, succeedingState); + } + if (isPatternOptional(succeedingPattern)) { + addOptionalTransitions(currentState, patternPos, patterns, states); + } + } + } + + private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) Unknown macro: { + int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1); + + for (int optionalPatternPos = patternPos + 2; + optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size()); + optionalPatternPos++) { + + Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos); + State<T> optionalState = states.get(optionalPattern.getName()); + currentState.addStateTransition(new StateTransition<>( + StateTransitionAction.TAKE, + optionalState, + (FilterFunction<T>) optionalPattern.getFilterFunction())); } } /** + * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in: + * + * ---- ----- ------ + * |State+ >|State#1+ >|State#2+ + * - + ----- ---+ + */ + private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos, + ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { + Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1); + State<T> succeedingState = states.get(succeedingPattern.getName()); + Pattern<T, ?> currentPattern = patterns.get(patternPos); + + State<T> currentRepeatingState = null; + State<T> nextRepeatingState = currentState; + for (int i = 1; i < currentPattern.getMaxCount(); i++) { + currentRepeatingState = nextRepeatingState; + nextRepeatingState = new State<>( + currentState.getName() + "#" + i, + State.StateType.Normal); + states.put(nextRepeatingState.getName(), nextRepeatingState); + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + nextRepeatingState, + (FilterFunction<T>) currentPattern.getFilterFunction())); + + // Add a transition around optional pattern. + // count(2,3) will result in: + // ---- ----- ----- --- + // |State+ >|State#1+ >|State#2+ >|Next| + // - + ----- -- + --+ + // | ^ + // -------------------- + if (i >= currentPattern.getMinCount()) { + currentRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + } + nextRepeatingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) { + if (patternPos == -1) { + return false; + } + + Pattern<T, ?> pattern = patterns.get(patternPos); + return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1; + } + + private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState) { + succeedingState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + } + + private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern) { + return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY + || succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY; + } + + private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) { + int pos = startPos; + for (; pos < patterns.size(); pos++) { — End diff – This can become a `while-loop`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2361#discussion_r97789900

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java —
          @@ -130,26 +114,166 @@
          }

          // add the beginning state

          • final State<T> beginningState;
            + State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
            + addTransitions(beginningState, -1, patterns, states);
            + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
            + }
            + }
          • if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - }

            else

            { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - }

            + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {

              • End diff –

          The `patterns` can become a `List` instead of `ArrayList`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2361#discussion_r97789900 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java — @@ -130,26 +114,166 @@ } // add the beginning state final State<T> beginningState; + State<T> beginningState = states.get(BEGINNING_STATE_NAME);; + addTransitions(beginningState, -1, patterns, states); + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling); + } + } if (states.containsKey(BEGINNING_STATE_NAME)) { - beginningState = states.get(BEGINNING_STATE_NAME); - } else { - beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); - states.put(BEGINNING_STATE_NAME, beginningState); - } + private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) { End diff – The `patterns` can become a `List` instead of `ArrayList`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2361

          Hi @kl0u,

          Sorry for misunderstanding, I still don't understand the problem
          Let's select a test and go through it. For example in the test `testOneOrMoreCEPPattern` from `CEPComplexPatternsITCase ` there is a pattern that has the `oneOrMany` and the `end` statements:

          ```java
          .next("middle").oneOrMany().where(
          new FilterFunction<Event>() {

          @Override
          public boolean filter(Event value) throws Exception

          { return value.getName().equals("middle"); }

          }
          )
          .next("end").where(new FilterFunction<Event>() {

          @Override
          public boolean filter(Event value) throws Exception

          { return value.getName().equals("end"); }
          });
          ```

          Do you say that if I remove this part:

          ```java
          .next("end").where(new FilterFunction<Event>() {

          @Override
          public boolean filter(Event value) throws Exception { return value.getName().equals("end"); }

          });
          ```

          the pattern won't work as expected?
          Is this your point?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2361 Hi @kl0u, Sorry for misunderstanding, I still don't understand the problem Let's select a test and go through it. For example in the test `testOneOrMoreCEPPattern` from `CEPComplexPatternsITCase ` there is a pattern that has the `oneOrMany` and the `end` statements: ```java .next("middle").oneOrMany().where( new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } } ) .next("end").where(new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("end"); } }); ``` Do you say that if I remove this part: ```java .next("end").where(new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("end"); } }); ``` the pattern won't work as expected? Is this your point?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2361

          Hi @mushketyk ! Yes. In this case we would expect to have everything apart from the "end" event in the result, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2361 Hi @mushketyk ! Yes. In this case we would expect to have everything apart from the "end" event in the result, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2361

          Hi @kl0u,

          Yes you are right. It should work fine without the "end" event and if it does not work it is a bug.
          I'll take a look at this in the next few days, rebase the PR and address your other comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2361 Hi @kl0u, Yes you are right. It should work fine without the "end" event and if it does not work it is a bug. I'll take a look at this in the next few days, rebase the PR and address your other comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2361

          Hi @mushketyk ! Thanks a lot!

          The problem is in the `NFACompiler` and more specifically the way the pattern is compiled. I have not come up with a concrete solution yet but I would say that:

          i) there should be another `StateType` as well, sth like `PotentialFinal` (or a better name) and in the case that the final pattern is optional, then it gets assigned this `StateType`. This also propagates backwards in the pattern graph in the case that the final pattern is optional (0 to sth times).

          ii) the `addTransitions()` should also be extended to account for quantifiers at the end and at the start of the pattern when "unrolling" or expanding the original pattern, e.g. the `expandRepeatingPattern()` should not always assign `State.StateType.Normal` to the unrolled patterns as they may be at the end or the start.

          iii) the `NFA` should also be modified to account for the new `PotentialFinal` state. In this case, when we see such a pattern, a flag should be set, and for example in the one-to-many case the pattern should be matched when the next non-matching element comes.

          Again these are some initial thoughts. We can discuss more on the design if you want, as this seems to be a bigger change than expected

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2361 Hi @mushketyk ! Thanks a lot! The problem is in the `NFACompiler` and more specifically the way the pattern is compiled. I have not come up with a concrete solution yet but I would say that: i) there should be another `StateType` as well, sth like `PotentialFinal` (or a better name) and in the case that the final pattern is optional, then it gets assigned this `StateType`. This also propagates backwards in the pattern graph in the case that the final pattern is optional (0 to sth times). ii) the `addTransitions()` should also be extended to account for quantifiers at the end and at the start of the pattern when "unrolling" or expanding the original pattern, e.g. the `expandRepeatingPattern()` should not always assign `State.StateType.Normal` to the unrolled patterns as they may be at the end or the start. iii) the `NFA` should also be modified to account for the new `PotentialFinal` state. In this case, when we see such a pattern, a flag should be set, and for example in the one-to-many case the pattern should be matched when the next non-matching element comes. Again these are some initial thoughts. We can discuss more on the design if you want, as this seems to be a bigger change than expected
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2361

          Hi @kl0u

          Thank you for your suggestions.

          I don't have a concrete solution yet, but I can try to fill the gaps in testing and see if I can come up with a better solution.

          Speaking about testing. Can you think of any other test cases that I might need to cover?

          i) Seems like a good idea. I'll add more testing and see if it can help
          ii) Good point. I'll work on this.
          iii) I am not sure why NFA must be changed? I think we can build NFA using current state types by adding necessary transitions around optional states. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2361 Hi @kl0u Thank you for your suggestions. I don't have a concrete solution yet, but I can try to fill the gaps in testing and see if I can come up with a better solution. Speaking about testing. Can you think of any other test cases that I might need to cover? i) Seems like a good idea. I'll add more testing and see if it can help ii) Good point. I'll work on this. iii) I am not sure why NFA must be changed? I think we can build NFA using current state types by adding necessary transitions around optional states. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2361

          Well for iii), the idea behind the new type of state is that the `NFA` will see one `PotentiallyFinal` element, but it will continue receiving, until the first non-eligible element. So if the pattern says sth like `'a'.oneOrMany`, the `a` will be `PotentiallyFinal` with a self-loop. So the `NFA` will have to set a flag, e.g. `canTerminate` to `true`. Then another `a`, still valid, so the `NFA` will accept it, and then a `b`. In this case, the `NFA` should declare the previous as a matching pattern, emit it, and then continue with the `b`. The current `NFA` would not recognize the `PotentiallyFinal` state.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2361 Well for iii), the idea behind the new type of state is that the `NFA` will see one `PotentiallyFinal` element, but it will continue receiving, until the first non-eligible element. So if the pattern says sth like `'a'.oneOrMany`, the `a` will be `PotentiallyFinal` with a self-loop. So the `NFA` will have to set a flag, e.g. `canTerminate` to `true`. Then another `a`, still valid, so the `NFA` will accept it, and then a `b`. In this case, the `NFA` should declare the previous as a matching pattern, emit it, and then continue with the `b`. The current `NFA` would not recognize the `PotentiallyFinal` state.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2361

          Let's see if I understand correctly the test case you are describing. Do you mean that a pattern like:

          ```java
          begin().oneOrMany().where("a")
          .next().optional().where("b")
          ```

          won't work using current `NFA`. But as a result of this change if we have a sequence "aaab", the NFA should emit two matching patterns: "aaa" and "aaab"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2361 Let's see if I understand correctly the test case you are describing. Do you mean that a pattern like: ```java begin().oneOrMany().where("a") .next().optional().where("b") ``` won't work using current `NFA`. But as a result of this change if we have a sequence "aaab", the NFA should emit two matching patterns: "aaa" and "aaab"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2361

          I was talking about a pattern like `begin().oneOrMany().where("a")`. So that you just expect as many consecutive `a`'s as possible.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2361 I was talking about a pattern like `begin().oneOrMany().where("a")`. So that you just expect as many consecutive `a`'s as possible.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2361

          Got it Let me check how it works with the current code and I'll come back with a proposed solution.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2361 Got it Let me check how it works with the current code and I'll come back with a proposed solution.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2361

          Sounds good! Also you can think on the API changes I propose. I think they simplify the user-facing commands and remove some long and not so elegant if-loops that check the type of quantifier.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2361 Sounds good! Also you can think on the API changes I propose. I think they simplify the user-facing commands and remove some long and not so elegant if-loops that check the type of quantifier.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2361

          Hi @mushketyk are you still working on this issue?
          In not, @dawidwys would also like to work on this issue.

          Please let me know what you think.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2361 Hi @mushketyk are you still working on this issue? In not, @dawidwys would also like to work on this issue. Please let me know what you think.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2361

          Hi @kl0u

          Sorry for the long delay. I don't have any free time to allocate to contribute to Flink, so I don't mind if @dawidwys works on this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2361 Hi @kl0u Sorry for the long delay. I don't have any free time to allocate to contribute to Flink, so I don't mind if @dawidwys works on this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2361

          Hi @mushketyk ! Thanks a lot for the reply!

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2361 Hi @mushketyk ! Thanks a lot for the reply!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2361

          Hi @mushketyk could you close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2361 Hi @mushketyk could you close this PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk commented on the issue:

          https://github.com/apache/flink/pull/2361

          Closed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2361 Closed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mushketyk closed the pull request at:

          https://github.com/apache/flink/pull/2361

          Show
          githubbot ASF GitHub Bot added a comment - Github user mushketyk closed the pull request at: https://github.com/apache/flink/pull/2361

            People

            • Assignee:
              dawidwys Dawid Wysakowicz
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development