Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Not A Problem
-
1.8.0
-
None
-
None
-
The development environment is idea, the flink version is 1.8
Description
Using a Tumbling time window, I created a time-based and counter trigger. The parameters in the onElement method TriggerContext, ctx.getCurrentWatermark(), get negative values,
There are screenshots in the attachment。
code show as below
public class CountTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private CountTrigger(int count) { this.threshold = count; } private int count = 0; private int threshold; @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { long watermark = ctx.getCurrentWatermark(); ctx.registerEventTimeTimer(window.maxTimestamp()); if (count > threshold) { count = 0; return TriggerResult.FIRE; } else { count++; } System.out.println("onElement: " + element); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception { return TriggerResult.FIRE; } @Override public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } @Override public String toString() { return "CountTrigger"; } public static <W extends Window> CountTrigger of(int threshold) { return new CountTrigger(threshold); } }
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); timestamp = ctx.getCurrentWatermark(); long end = window.maxTimestamp(); System.out.println(element + " " + timestamp + " " + window.maxTimestamp() + " " + fireTimestamp.get()); if (fireTimestamp.get() == null) { long start = timestamp - (timestamp % interval); long nextFireTimestamp = start + interval; ctx.registerEventTimeTimer(nextFireTimestamp); fireTimestamp.add(nextFireTimestamp); return TriggerResult.CONTINUE; } return TriggerResult.CONTINUE; }