Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10577

CEP's greedy() doesn't work

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.6.0
    • Fix Version/s: None
    • Component/s: Library / CEP
    • Labels:
      None

      Description

      I think greedy operator has some problem.

      Given the below java code:

       

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      
      DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(
      Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "r"),
      Tuple3.of(new Integer(101), Timestamp.valueOf("2018-10-01 13:00:00").getTime(), "p"),
      Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:01").getTime(), "p"),
      Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:03").getTime(), "p"),
      Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:04").getTime(), "p"),
      Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:05").getTime(), "c"),
      Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:08").getTime(), "c"),
      Tuple3.of(new Integer(100), Timestamp.valueOf("2018-10-01 13:00:11").getTime(), "a")
      ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, Long, String>>(Time.seconds(2)) {
      private static final long serialVersionUID = 1L;
      @Override
      public long extractTimestamp(Tuple3<Integer, Long, String> element) {
      return element.f1; 
      }
      });
      AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipPastLastEvent();
      Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("start", strategy)
      .where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
      private static final long serialVersionUID = 1L;
      @Override
      public boolean filter(Tuple3<Integer, Long, String> e) {
      return e.f2.equals("r") ? true : false;
      }
      }).followedBy("middle").where(new SimpleCondition<Tuple3<Integer, Long, String>>() {
      private static final long serialVersionUID = 1L;
      @Override
      public boolean filter(Tuple3<Integer, Long, String> e) throws Exception {
      return !e.f2.equals("r") ? true : false;
      }
      }).oneOrMore().greedy()
      .within(Time.seconds(10));
      CEP.pattern(input.keyBy(0), pattern)
      .select(new PatternSelectFunction<Tuple3<Integer, Long, String>, String>() {
      private static final long serialVersionUID = 1L;
      @Override
      public String select(Map<String, List<Tuple3<Integer, Long, String>>> pattern) {
      StringBuilder builder = new StringBuilder();
      List<Tuple3<Integer, Long, String>> start = pattern.get("start");
      List<Tuple3<Integer, Long, String>> middle = pattern.get("middle");
      for (Tuple3<Integer, Long, String> t : start) {
      builder.append(t.f0).append(",");
      }
      for (Tuple3<Integer, Long, String> t : middle) {
      builder.append(t.f0).append(",");
      }
      return builder.toString(); 
      }
      })
      .print(); 
      env.execute();

      I would like to see:100,100,100,100,100,100

      however it matches 100,100

      I have tried to use AfterMatchSkipStrategy.skipPastLastEvent() for skipping some partial matches,it also matches 100,100.

      Is there something important about greedy operator that i misunderstand?

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                simahoa zhanghao
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: