As per discussion in
FLINK-4341 by aljoscha and StephanEwen, we need a low watermark service in the JobManager to support transparent resharding / partition discovery for our Kafka and Kinesis consumers (and any future streaming connectors in general for which the external system may elastically scale up and down independently of the parallelism of sources in Flink). The main idea is to let source subtasks that don't emit their own watermarks (because they currently don't have data partitions to consume) emit the low watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE watermark and forbidding them to be assigned partitions in the future.
The proposed implementation, from a high-level: a LowWatermarkCoordinator will be added to execution graphs, periodically triggering only the source vertices with a RetrieveLowWatermark message. The tasks reply to the JobManager through the actor gateway (or a new interface after
FLINK-4456 gets merged) with a ReplyLowWatermark message. When the coordinator collects all low watermarks for a particular source vertex and determines the aggregated low watermark for this round (accounting only values that are larger than the aggregated low watermark of the last round), it sends a NotifyNewLowWatermark message to the source vertex's tasks.
The messages will only be relevant to tasks that implement an internal LowWatermarkCooperatingTask interface. For now, only SourceStreamTask should implement LowWatermarkCooperatingTask.
Source functions should implement a public LowWatermarkListener interface if they wish to get notified of the aggregated low watermarks across subtasks. Connectors like the Kinesis consumer can choose to emit this watermark if the subtask currently does not have any shards, so that downstream operators may still properly advance time windows (implementation for this is tracked as a separate issue).
Overall, the service will include -
New messages between JobManager <-> TaskManager:
RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)
ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)
NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, timestamp)
New internal task interface LowWatermarkCooperatingTask in flink-runtime
New public interface LowWatermarkListener in flink-streaming-java
Might also need to extend SourceFunction.SourceContext to support retrieving the current low watermark of sources.
Any feedback for this is appreciated!