Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.3.0
-
None
-
None
-
OSX 10.11.4
2.8 GHz Intel Core i7
16 GB 1600 MHz DDR3
Description
I'm experiencing poor performance when using sliding time windows. Here is a simple example that performs poorly for me:
public class FlinkPerfTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); //Streaming 10,000 events per second see.addSource(new SourceFunction<TestObject>() { transient ScheduledExecutorService executor; @Override public synchronized void run(final SourceContext<TestObject> ctx) throws Exception { executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { for (int k = 0; k < 10; k++) { for (int i = 0; i < 1000; i++) { TestObject obj = new TestObject(); obj.setKey(k); ctx.collect(obj); } } } }, 0, 1, TimeUnit.SECONDS); this.wait(); } @Override public synchronized void cancel() { executor.shutdown(); this.notify(); } }).keyBy("key") .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(1))).apply(new WindowFunction<TestObject, String, Tuple, TimeWindow>() { @Override public void apply(Tuple key, TimeWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception { int count = 0; for (Object obj : input) { count++; } out.collect(key.getField(0) + ": " + count); } }) .print(); see.execute(); } public static class TestObject { private Integer key; public Integer getKey() { return key; } public void setKey(Integer key) { this.key = key; } } }
When running this, flink periodically pauses for long periods of time. I would expect a steady stream of output at 1 second intervals. For comparison, you can switch to a count window of similar size which peforms just fine:
.countWindow(600000, 1000).apply(new WindowFunction<FlinkPerfTest.TestObject, String, Tuple, GlobalWindow>() { @Override public void apply(Tuple key, GlobalWindow window, Iterable<TestObject> input, Collector<String> out) throws Exception { int count = 0; for (Object obj : input) { count++; } out.collect(key.getField(0) + ": " + count); } })
I would expect the sliding time window to perform similarly to a count window. The sliding time window also uses significantly more cpu and memory than the count window. I would also expect resource consumption to be similar.
A possible cause could be that the SystemProcessingTimeService.TriggerTask is locking with the checkpointLock which acts like a global lock. There should be a lock per key or preferably a lock-less solution.