Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
1.9.0
-
None
-
None
Description
the pattern is like
Pattern.begin("start").where(records == "a")
.notNext("notNext").where(records == "b")
.withIn(5milliseconds).
If there is only one event "a" in 5 milliseconds. I think this “a” should be output as the correct result of the match next time in advanceTime.
But in the actual operation of CEP. This “a” will be treated as matching timeout data
// code placeholder @Test public void testNoNextWithWindow() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // (Event, timestamp) DataStream<Event> input = env.fromElements( Tuple2.of(new Event(1, "start", 1.0), 5L), // last element for high final watermark Tuple2.of(new Event(5, "final", 5.0), 100L) ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { @Override public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) { return element.f1; } @Override public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) { return new Watermark(lastElement.f1 - 5); } }).map(new MapFunction<Tuple2<Event, Long>, Event>() { @Override public Event map(Tuple2<Event, Long> value) throws Exception { return value.f0; } }); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } }).notNext("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } }).within(Time.milliseconds(5L)); DataStream<String> result = CEP.pattern(input, pattern).select( new PatternSelectFunction<Event, String>() { @Override public String select(Map<String, List<Event>> pattern) { StringBuilder builder = new StringBuilder(); builder.append(pattern.get("start").get(0).getId()); return builder.toString(); } } ); List<String> resultList = new ArrayList<>(); DataStreamUtils.collect(result).forEachRemaining(resultList::add); resultList.sort(String::compareTo); assertEquals(Arrays.asList("1"), resultList); }