Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29348

The DPP(dynamic partition pruning) can not work with adaptive batch scheduler

    XMLWordPrintableJSON

Details

    Description

      When running tpcds with both DPP(dynamic partition pruning) and adaptive batch scheduler enabled, q14a.sql fails due to the following exception:

      2022-09-20 10:34:18,244 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job q14a.sql (6d4355bdde514be083b9762e286626d2) switched from state FAILING to FAILED.
      org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:299) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:125) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:1031) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:588) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at sun.reflect.GeneratedMethodAccessor69.invoke(Unknown Source) ~[?:?]
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_332]
      	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_332]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_2b75f75b-9d98-44d4-b364-927fcb095b21.jar:1.16-SNAPSHOT]
      	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_332]
      	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_332]
      	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_332]
      	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_332]
      Caused by: java.lang.IllegalStateException: Dynamic filtering data listener is missing: b9e97be4-bde0-4718-bfb1-e13d490517f1
      	at org.apache.flink.table.runtime.operators.dynamicfiltering.DynamicFilteringDataCollectorOperatorCoordinator.handleEventFromOperator(DynamicFilteringDataCollectorOperatorCoordinator.java:98) ~[flink-table-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$handleEventFromOperator$0(RecreateOnResetOperatorCoordinator.java:84) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.handleEventFromOperator(RecreateOnResetOperatorCoordinator.java:82) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:218) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:121) ~[flink-dist-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	... 31 more

      Attachments

        Issue Links

          Activity

            People

              pltbkd Gen Luo
              wanglijie#1 wanglijie#1
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: