Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6772

Incorrect ordering of matched state events in Flink CEP

    Details

      Description

      I've stumbled across an unexepected ordering of the matched state events.

      Pattern:

      Pattern<String, ?> pattern = Pattern
          .<String>begin("start")
              .where(new IterativeCondition<String>() {
                  @Override
                  public boolean filter(String s, Context<String> context) throws Exception {
                      return s.startsWith("a-");
                  }
              }).times(4).allowCombinations()
          .followedByAny("end")
              .where(new IterativeCondition<String>() {
                  public boolean filter(String s, Context<String> context) throws Exception {
                      return s.startsWith("b-");
                  }
          }).times(3).consecutive();
      

      Input event sequence:
      a-1, a-2, a-3, a-4, b-1, b-2, b-3

      On b-3 a matched pattern would be triggered.

      Now, in the Map<String, List<IN>> map passed via select in PatternSelectFunction, the list for the "end" state is:
      b-3, b-1, b-2.

      Based on the timestamp of the events (simply using processing time), the correct order should be b-1, b-2, b-3.

        Issue Links

          Activity

          Hide
          dawidwys Dawid Wysakowicz added a comment - - edited

          Hmm, I could not reproduce this issue.

          I tried the following test:

          @Test
          public void test() throws Exception {
          
          	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          	env.setParallelism(2);
          
          	DataStream<String> input = env.fromElements(
          		"a-1", "a-2", "a-3", "a-4", "b-1", "b-2", "b-3"
          	);
          
          	Pattern<String, ?> pattern = Pattern
          		.<String>begin("start")
          		.where(new SimpleCondition<String>() {
          			@Override
          			public boolean filter(String s) throws Exception {
          				return s.startsWith("a-");
          			}
          		}).times(4).allowCombinations()
          		.followedByAny("end")
          		.where(new SimpleCondition<String>() {
          			public boolean filter(String s) throws Exception {
          				return s.startsWith("b-");
          			}
          		}).times(3).consecutive();
          
          	CEP.pattern(input, pattern).select(new PatternSelectFunction<String, String>() {
          		@Override
          		public String select(Map<String, List<String>> pattern) throws Exception {
          			return pattern.toString();
          		}
          	}).print();
          
          	env.execute();
          }
          

          And the results are as follows:

          1> {start=[a-1, a-2, a-3, a-4], end=[b-1, b-2, b-3]}
          

          Also checked the code with new, clear project and the results are the same.

          Show
          dawidwys Dawid Wysakowicz added a comment - - edited Hmm, I could not reproduce this issue. I tried the following test: @Test public void test() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStream< String > input = env.fromElements( "a-1" , "a-2" , "a-3" , "a-4" , "b-1" , "b-2" , "b-3" ); Pattern< String , ?> pattern = Pattern .< String >begin( "start" ) .where( new SimpleCondition< String >() { @Override public boolean filter( String s) throws Exception { return s.startsWith( "a-" ); } }).times(4).allowCombinations() .followedByAny( "end" ) .where( new SimpleCondition< String >() { public boolean filter( String s) throws Exception { return s.startsWith( "b-" ); } }).times(3).consecutive(); CEP.pattern(input, pattern).select( new PatternSelectFunction< String , String >() { @Override public String select(Map< String , List< String >> pattern) throws Exception { return pattern.toString(); } }).print(); env.execute(); } And the results are as follows: 1> {start=[a-1, a-2, a-3, a-4], end=[b-1, b-2, b-3]} Also checked the code with new, clear project and the results are the same.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Hmm, I did another check on this (using the code you posted), and had a very odd observation:

          so, when the followedByAny state is called "end", then it outputs correctly to be [b-1, b-2, b-3],
          but for example when you rename it to be "middle", or "m" (some random data points here), the result ordering is incorrect.
          I've only changed the second state's name, and nothing else.

          Any clue what might be happening here? I know it sounds a bit bizzare :/
          The tests are done with on fresh runs, not restored from checkpoint.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hmm, I did another check on this (using the code you posted), and had a very odd observation: so, when the followedByAny state is called "end", then it outputs correctly to be [b-1, b-2, b-3] , but for example when you rename it to be "middle", or "m" (some random data points here), the result ordering is incorrect. I've only changed the second state's name, and nothing else. Any clue what might be happening here? I know it sounds a bit bizzare :/ The tests are done with on fresh runs, not restored from checkpoint.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          So, for example,

          public class FlinkCEPTest {
          
          	public static void main(String[] args) throws Exception {
          
          		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          		env.setParallelism(2);
          
          		DataStream<String> input = env.fromElements(
          				"a-1", "a-2", "a-3", "a-4", "b-1", "b-2", "b-3"
          		);
          
          		Pattern<String, ?> pattern = Pattern
          				.<String>begin("start")
          				.where(new SimpleCondition<String>() {
          					public boolean filter(String s) throws Exception {
          						return s.startsWith("a-");
          					}
          				}).times(4).allowCombinations()
          				.followedByAny("randomStateName")
          				.where(new SimpleCondition<String>() {
          					public boolean filter(String s) throws Exception {
          						return s.startsWith("b-");
          					}
          				}).times(3).consecutive();
          
          		CEP.pattern(input, pattern).select(new PatternSelectFunction<String, String>() {
          			public String select(Map<String, List<String>> pattern) throws Exception {
          				return pattern.toString();
          			}
          		}).print();
          
          		env.execute();
          	}
          }
          

          The result is:

          1> {randomStateName=[b-3, b-1, b-2], start=[a-1, a-2, a-3, a-4]}
          
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - So, for example, public class FlinkCEPTest { public static void main( String [] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStream< String > input = env.fromElements( "a-1" , "a-2" , "a-3" , "a-4" , "b-1" , "b-2" , "b-3" ); Pattern< String , ?> pattern = Pattern .< String >begin( "start" ) .where( new SimpleCondition< String >() { public boolean filter( String s) throws Exception { return s.startsWith( "a-" ); } }).times(4).allowCombinations() .followedByAny( "randomStateName" ) .where( new SimpleCondition< String >() { public boolean filter( String s) throws Exception { return s.startsWith( "b-" ); } }).times(3).consecutive(); CEP.pattern(input, pattern).select( new PatternSelectFunction< String , String >() { public String select(Map< String , List< String >> pattern) throws Exception { return pattern.toString(); } }).print(); env.execute(); } } The result is: 1> {randomStateName=[b-3, b-1, b-2], start=[a-1, a-2, a-3, a-4]}
          Hide
          dawidwys Dawid Wysakowicz added a comment -

          Ok I've tracked down the issue. It is caused by internalStateName introduced in FLINK-6604, because of that the order in SharedBuffer::extractPatterns is not preserved. Kostas Kloudas Is the internal unique name necessary?

          Show
          dawidwys Dawid Wysakowicz added a comment - Ok I've tracked down the issue. It is caused by internalStateName introduced in FLINK-6604 , because of that the order in SharedBuffer::extractPatterns is not preserved. Kostas Kloudas Is the internal unique name necessary?
          Hide
          rmetzger Robert Metzger added a comment -

          I assume we can fix this in Flink 1.3.1 and it won't affect the 1.3.0 release?

          Show
          rmetzger Robert Metzger added a comment - I assume we can fix this in Flink 1.3.1 and it won't affect the 1.3.0 release?
          Hide
          kkl0u Kostas Kloudas added a comment -

          I agree with Robert Metzger that this can wait till 1.3.1.
          Dawid Wysakowicz the unique names were introduced for serialization/deserialization. I will have a look to see if this can be bypassed.

          Show
          kkl0u Kostas Kloudas added a comment - I agree with Robert Metzger that this can wait till 1.3.1. Dawid Wysakowicz the unique names were introduced for serialization/deserialization. I will have a look to see if this can be bypassed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/4084

          FLINK-6772 [cep] Fix ordering (by timestamp) of matched events.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink cep-fix-ordering

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4084.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4084


          commit b3f38aa64addd73b20f6ab51f98f95017cb7e348
          Author: kkloudas <kkloudas@gmail.com>
          Date: 2017-05-31T15:48:34Z

          FLINK-6772 [cep] Fix ordering (by timestamp) of matched events.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/4084 FLINK-6772 [cep] Fix ordering (by timestamp) of matched events. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-fix-ordering Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4084.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4084 commit b3f38aa64addd73b20f6ab51f98f95017cb7e348 Author: kkloudas <kkloudas@gmail.com> Date: 2017-05-31T15:48:34Z FLINK-6772 [cep] Fix ordering (by timestamp) of matched events.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/4084

          LGTM

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4084 LGTM
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/4084

          Thanks @dawidwys ! I will let travis have another go on the rebased version and then merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4084 Thanks @dawidwys ! I will let travis have another go on the rebased version and then merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4084

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4084
          Hide
          kkl0u Kostas Kloudas added a comment -

          Merged at 5d3506e

          Show
          kkl0u Kostas Kloudas added a comment - Merged at 5d3506e
          Hide
          yuzhihong@gmail.com Ted Yu added a comment -
          +               for (String key: path.keySet()) {
          +                       List<T> events = path.get(key);
          

          Instead of calling keySet(), entrySet() should be used. This would avoid the path.get() call.

          Show
          yuzhihong@gmail.com Ted Yu added a comment - + for ( String key: path.keySet()) { + List<T> events = path.get(key); Instead of calling keySet(), entrySet() should be used. This would avoid the path.get() call.

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development