Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
1.17.1
-
None
-
None
Description
I am observing a problem where after recovery from a checkpoint the Kafka source watermarks would start to diverge not honoring the watermark alignment setting I have applied.
I have a Kafka source which reads a topic with 32 partitions. I am applying the following watermark strategy:
new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => msg.value.getTimestamp) .withWatermarkAlignment("alignment-sources-group", time.Duration.ofMillis(sourceWatermarkAlignmentBlocks))
This works great up until my job needs to recover from checkpoint. Once the recovery takes place, no alignment is taking place any more. This can best be illustrated by looking at the watermark metrics for various operators in the image:
You can see how the watermarks disperse after the recovery. Trying to debug the problem I noticed that before the failure there would be calls in
SourceCoordinator::announceCombinedWatermark()
after the recovery, no calls get there, so no value for
watermarkAlignmentParams.getMaxAllowedWatermarkDrift()
is ever read. I can manually fix the problem If I stop the job, clear all state from Zookeeper and then manually start Flink providing the last checkpoint with
'–fromSavepoint'
flag. This would cause the SourceCoordinator to be constructed properly and watermark drift to be checked. Once recovery manually watermarks would again converge to the allowed drift as seen in the metrics:
Let me know If I can be helpful by providing any more information.