Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13383

Customize the problem in the trigger

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Not A Problem
    • 1.8.0
    • None
    • API / DataStream
    • None
    • The development environment is idea, the flink version is 1.8

    Description

      Using a Tumbling time window, I created a time-based and counter trigger. The parameters in the onElement method TriggerContext, ctx.getCurrentWatermark(), get negative values,
      There are screenshots in the attachment。

      code show as below

      public class CountTrigger extends Trigger<Object, TimeWindow> {
      
          private static final long serialVersionUID = 1L;
      
          private CountTrigger(int count) {
              this.threshold = count;
          }
      
          private int count = 0;
          private int threshold;
      
          @Override
          public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
      
              long watermark = ctx.getCurrentWatermark();
              ctx.registerEventTimeTimer(window.maxTimestamp());
              if (count > threshold) {
                  count = 0;
                  return TriggerResult.FIRE;
              } else {
                  count++;
              }
              System.out.println("onElement: " + element);
              return TriggerResult.CONTINUE;
          }
      
          @Override
          public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
              return TriggerResult.CONTINUE;
          }
      
          @Override
          public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
              return TriggerResult.FIRE;
          }
      
          @Override
          public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
              ctx.deleteProcessingTimeTimer(window.maxTimestamp());
          }
      
          @Override
          public String toString() {
              return "CountTrigger";
          }
      
          public static <W extends Window> CountTrigger of(int threshold) {
              return new CountTrigger(threshold);
          }
      }
      
      
        public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
              ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
              timestamp = ctx.getCurrentWatermark();
              long end = window.maxTimestamp();
      
              System.out.println(element + "  " + timestamp + "  " + window.maxTimestamp() + "  " + fireTimestamp.get());
              if (fireTimestamp.get() == null) {
                  long start = timestamp - (timestamp % interval);
                  long nextFireTimestamp = start + interval;
      
                  ctx.registerEventTimeTimer(nextFireTimestamp);
      
                  fireTimestamp.add(nextFireTimestamp);
              
                  return TriggerResult.CONTINUE;
              }
             
              return TriggerResult.CONTINUE;
          }
      

      Attachments

        1. WechatIMG2.png
          91 kB
          jinguishi
        2. WX20190723-174236.png
          105 kB
          jinguishi

        Activity

          People

            Unassigned Unassigned
            shijingui jinguishi
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: