Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22888

Matches results may be wrong when using notNext as the last part of the pattern with Window

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Fixed
    • 1.9.0
    • None
    • Library / CEP
    • None

    Description

      the pattern is like 
      Pattern.begin("start").where(records == "a")

                  .notNext("notNext").where(records == "b")

                  .withIn(5milliseconds).

      If there is only one event "a" in 5 milliseconds. I think this “a” should be output as the correct result of the match next time in advanceTime.

      But in the actual operation of CEP. This “a” will be treated as matching timeout data

      // code placeholder
      @Test
      public void testNoNextWithWindow() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      
         // (Event, timestamp)
         DataStream<Event> input = env.fromElements(
            Tuple2.of(new Event(1, "start", 1.0), 5L),
      
            // last element for high final watermark
            Tuple2.of(new Event(5, "final", 5.0), 100L)
         ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
      
            @Override
            public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
               return element.f1;
            }
      
            @Override
            public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
               return new Watermark(lastElement.f1 - 5);
            }
      
         }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
      
            @Override
            public Event map(Tuple2<Event, Long> value) throws Exception {
               return value.f0;
            }
         });
      
         Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
               return value.getName().equals("start");
            }
         }).notNext("middle").where(new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
               return value.getName().equals("middle");
            }
         }).within(Time.milliseconds(5L));
      
         DataStream<String> result = CEP.pattern(input, pattern).select(
            new PatternSelectFunction<Event, String>() {
               @Override
               public String select(Map<String, List<Event>> pattern) {
                  StringBuilder builder = new StringBuilder();
                  builder.append(pattern.get("start").get(0).getId());
                  return builder.toString();
               }
            }
         );
      
         List<String> resultList = new ArrayList<>();
      
         DataStreamUtils.collect(result).forEachRemaining(resultList::add);
      
         resultList.sort(String::compareTo);
      
         assertEquals(Arrays.asList("1"), resultList);
      }
      

       

      Attachments

        Activity

          People

            nicholasjiang Nicholas Jiang
            mayuehappy Yue Ma
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: