AfterProcessingTime based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted.
The reason seems to be that SparkTimerInternals#getTimersReadyToProcess() determines what triggers are ready to be processed based on the following condition:
However, if the timer domain is TimeDomain.PROCESSING_TIME the position of the input watermark should NOT have effect.
In addition, SparkTimerInternals#getTimersReadyToProcess() deletes timers once they are deemed eligible for processing (but will not necessarily fire).
This may not be the correct behavior for timers in general and for timers in the TimeDomain.PROCESSING_TIME in particular, since they should remain scheduled until the corresponding window expires and all state is cleared.
For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its shouldFire() returns false and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. The implied moral being that "eligible for processing" does not imply "should be deleted".
It may be better to avoid removing timers in SparkTimerInternals#getTimersReadyToProcess() and leave timer management up to ReduceFnRunner#clearAllState() which has more context to determine whether it's time for a given timer to be deleted.