Description
When running a WindowableTask with a long pause during the window() call, the task.window.ms does not reflect how often window() is called.
If task.window.ms is set to 250ms, and the window() method takes 100ms, you would expect the window() method to be invoked 4 times per second. In fact, it's only invoked 3 times per second:
100ms (window) + 250ms (process) + 100ms (window) + 250ms (process) + 100ms (window) + 250ms (process) = 1050ms
The reason for this is that we reset the window timer AFTER window() returns:
if (isWindowableTask && windowMs >= 0 && lastWindowMs + windowMs < clock()) {
task.asInstanceOf[WindowableTask].window(collector, coordinator)
lastWindowMs = clock()
}
I think we just need to move lastWindowMs above the window() call.
We should do this for task.commit.ms as well.