Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35088

watermark alignment maxAllowedWatermarkDrift and updateInterval param need check

    XMLWordPrintableJSON

Details

    Description

      When I use watermark alignment,

      1.I found that setting maxAllowedWatermarkDrift to a negative number initially led me to believe it could support delaying the consumption of the source, so I tried it. Then, the upstream data flow would hang indefinitely.

      Root cause:

      long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()                 + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  

      If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark < lastEmittedWatermark, then the SourceReader will be blocked indefinitely and cannot recover.

      I'm not sure if this is a supported feature of watermark alignment. If it's not, I think an additional parameter validation should be implemented to throw an exception on the client side if the value is negative.

      2.The updateInterval parameter also lacks validation. If I set it to 0, the task will throw an exception when starting the job manager. The JDK class java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and throws the exception.

      java.lang.IllegalArgumentException: null
      	at java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565) ~[?:1.8.0_351]
      	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:191) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:59) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:42) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.16.1.jar:1.16.1]
      	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) [?:1.8.0_351]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_351]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_351]
      	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_351]

      Therefore, I believe it's necessary to validate these two parameters to ensure that exceptions are thrown on the client side to alert the user.

      Attachments

        Issue Links

          Activity

            People

              elon elon_X
              elon elon_X
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: