Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6990

Poor performance with Sliding Time Windows

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.3.0
    • Fix Version/s: None
    • Component/s: API / DataStream
    • Labels:
      None
    • Environment:

      OSX 10.11.4
      2.8 GHz Intel Core i7
      16 GB 1600 MHz DDR3

      Description

      I'm experiencing poor performance when using sliding time windows. Here is a simple example that performs poorly for me:

      public class FlinkPerfTest {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
              //Streaming 10,000 events per second
              see.addSource(new SourceFunction<TestObject>() {
      
                  transient ScheduledExecutorService executor;
      
                  @Override
                  public synchronized void run(final SourceContext<TestObject> ctx) throws Exception {
                      executor = Executors.newSingleThreadScheduledExecutor();
                      executor.scheduleAtFixedRate(new Runnable() {
      
                          @Override
                          public void run() {
                              for (int k = 0; k < 10; k++) {
                                  for (int i = 0; i < 1000; i++) {
                                      TestObject obj = new TestObject();
                                      obj.setKey(k);
                                      ctx.collect(obj);
                                  }
                              }
                          }
                      }, 0, 1, TimeUnit.SECONDS);
                      this.wait();
                  }
      
                  @Override
                  public synchronized void cancel() {
                      executor.shutdown();
                      this.notify();
                  }
              }).keyBy("key")
              .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(1))).apply(new WindowFunction<TestObject, String, Tuple, TimeWindow>() {
      
                  @Override
                  public void apply(Tuple key, TimeWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception {
                      int count = 0;
                      for (Object obj : input) {
                          count++;
                      }
                      out.collect(key.getField(0) + ": " + count);
                  }
              })
              .print();
              see.execute();
          }
      
          public static class TestObject {
              private Integer key;
      
              public Integer getKey() {
                  return key;
              }
      
              public void setKey(Integer key) {
                  this.key = key;
              }
      
          }
      
      }
      

      When running this, flink periodically pauses for long periods of time. I would expect a steady stream of output at 1 second intervals. For comparison, you can switch to a count window of similar size which peforms just fine:

         .countWindow(600000, 1000).apply(new WindowFunction<FlinkPerfTest.TestObject, String, Tuple, GlobalWindow>() {
      
                          @Override
                          public void apply(Tuple key, GlobalWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception {
                              int count = 0;
                              for (Object obj : input) {
                                  count++;
                              }
                              out.collect(key.getField(0) + ": " + count);
                          }
                      })
      

      I would expect the sliding time window to perform similarly to a count window. The sliding time window also uses significantly more cpu and memory than the count window. I would also expect resource consumption to be similar.

      A possible cause could be that the SystemProcessingTimeService.TriggerTask is locking with the checkpointLock which acts like a global lock. There should be a lock per key or preferably a lock-less solution.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              BriceBingman Brice Bingman
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: