Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.16.1
Description
When I use watermark alignment,
1.I found that setting maxAllowedWatermarkDrift to a negative number initially led me to believe it could support delaying the consumption of the source, so I tried it. Then, the upstream data flow would hang indefinitely.
Root cause:
long maxAllowedWatermark = globalCombinedWatermark.getTimestamp() + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark < lastEmittedWatermark, then the SourceReader will be blocked indefinitely and cannot recover.
I'm not sure if this is a supported feature of watermark alignment. If it's not, I think an additional parameter validation should be implemented to throw an exception on the client side if the value is negative.
2.The updateInterval parameter also lacks validation. If I set it to 0, the task will throw an exception when starting the job manager. The JDK class java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and throws the exception.
java.lang.IllegalArgumentException: null at java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565) ~[?:1.8.0_351] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:191) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:59) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:42) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) [?:1.8.0_351] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_351] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_351] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_351]
Therefore, I believe it's necessary to validate these two parameters to ensure that exceptions are thrown on the client side to alert the user.
Attachments
Attachments
Issue Links
- links to