Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29314

AskTimeoutException for more than two partition with FileSource API on S3

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.13.5
    • None
    • FileSystems
    • None

    Description

      While using FileSource.forRecordStreamFormat api with a build option of monitorContinuously(Duration.ofMillis(10000)) for avro files on S3 we are getting

      akka.pattern.AskTimeoutException: Ask timed out on Actor[akka://flink/user/rpc/dispatcher_2#-1751288098] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. 

      error when we have more than 2 partition in a s3 bucket. 
      More context. 

      For a given S3 uri: s3://-export-staging/data-export/dataexport.prod-10.S3.integration.dd33/event_type=users.messages.email.Send/
      There are two partition of avro files based on date. 

      date=2022-09-12-10/
      date=2022-09-12-11/ 

      Reading of avro file perfectly fine. While the job is running when another partition is added to the uri it works perfectly fine then as well. Now when I stop the job and re-run the job it fails with below exception: 

      Exception in thread "main" org.apache.flink.util.FlinkException: Failed to execute job 'data-export'.
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
          at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
          at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801)
          at com.lightricks.sigma.topologies.EntryPoint$.delayedEndpoint$com$lightricks$sigma$topologies$EntryPoint$1(EntryPoint.scala:75)
          at com.lightricks.sigma.topologies.EntryPoint$delayedInit$body.apply(EntryPoint.scala:31)
          at scala.Function0.apply$mcV$sp(Function0.scala:39)
          at scala.Function0.apply$mcV$sp$(Function0.scala:39)
          at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
          at scala.App.$anonfun$main$1$adapted(App.scala:80)
          at scala.collection.immutable.List.foreach(List.scala:392)
          at scala.App.main(App.scala:80)
          at scala.App.main$(App.scala:78)
          at com.lightricks.sigma.topologies.EntryPoint$.main(EntryPoint.scala:31)
          at com.lightricks.sigma.topologies.EntryPoint.main(EntryPoint.scala)
      Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
          at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
          at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$submitJob$2(PerJobMiniClusterFactory.java:83)
          at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
          at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
          at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
          at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
          at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
          at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
          at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
          at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
      Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
          at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
          at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
          at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:89)
          at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144)
          ... 9 more
      Caused by: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
          at com.sun.proxy.$Proxy13.requestJobStatus(Unknown Source)
          at org.apache.flink.runtime.minicluster.MiniCluster.lambda$getJobStatus$6(MiniCluster.java:704)
          at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
          at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
          at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
          at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:751)
          at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:703)
          at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:86)
          ... 10 more
      Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/rpc/dispatcher_2#-1751288098]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
          at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
          at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
          at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
          at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:874)
          at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:113)
          at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107)
          at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:872)
          at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
          at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
          at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
          at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
          at java.lang.Thread.run(Thread.java:748) 

      The weird part is that when partition/directory is increased from 2 to 3 it fails with akka ask timeout.

      Attachments

        Activity

          People

            Unassigned Unassigned
            mlgruby Satyasheel Satyasheel
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: