Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27456

mistake and confusion with CEP example in docs

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.14.4
    • None
    • None

    Description

      https://nightlies.apache.org/flink/flink-docs-master/docs/libs/cep/#contiguity-within-looping-patterns

      In the section of the docs on contiguity within looping patterns, what it says about strict contiguity for the given example is either incorrect or very confusing (or both). It doesn't help that the example code doesn't precisely match the scenario described in the text.

      To study this, I implemented the example in the text and find it produces no output for strict contiguity (as I expected), which contradicts what the text says.

      public class StreamingJob {
      
          public static void main(String[] args) throws Exception {
      
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              DataStream<String> events = env.fromElements("a", "b1", "d1", "b2", "d2", "b3", "c");
      
              AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
              Pattern<String, String> pattern =
                      Pattern.<String>begin("a", skipStrategy)
                              .where(
                                      new SimpleCondition<String>() {
                                          @Override
                                          public boolean filter(String element) throws Exception {
                                              return element.startsWith("a");
                                          }
                                      })
                              .next("b+")
                              .where(
                                      new SimpleCondition<String>() {
                                          @Override
                                          public boolean filter(String element) throws Exception {
                                              return element.startsWith("b");
                                          }
                                      })
                              .oneOrMore().consecutive()
                              .next("c")
                              .where(
                                      new SimpleCondition<String>() {
                                          @Override
                                          public boolean filter(String element) throws Exception {
                                              return element.startsWith("c");
                                          }
                                      });
      
              PatternStream<String> patternStream = CEP.pattern(events, pattern).inProcessingTime();
              patternStream.select(new SelectSegment()).addSink(new PrintSinkFunction<>(true));
              env.execute();
          }
      
          public static class SelectSegment implements PatternSelectFunction<String, String> {
              public String select(Map<String, List<String>> pattern) {
                  return String.join("", pattern.get("a"))
                          + String.join("", pattern.get("b+"))
                          + String.join("", pattern.get("c"));
              }
          }
      }
       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            danderson David Anderson
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: