Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8580

No easy way (or issues when trying?) to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job





      I am using Flink 1.3.2 and I’m struggling to achieve something that should be simple.

      For isolation reasons, I want to start multiple long living yarn session containers (with the same user) and choose at run-time, when I start a HA streaming app, which container will hold it.

      I start my yarn session with the command line option : -Dyarn.properties-file.location=mydir

      The session is created and a .yarn-properties-$USER file is generated.

      And I’ve tried the following to submit my job:


      CASE 1

      flink-conf.yaml : yarn.properties-file.location: mydir
      flink run options : none

      • Uses zookeeper and works  – but I cannot choose the container as the property file is global.


      CASE 2

      flink-conf.yaml : nothing
      flink run options : -yid applicationId

      • Do not use zookeeper, tries to connect to yarn job manager but fails in “Job submission to the JobManager timed out” error


      CASE 3

      flink-conf.yaml : nothing
      flink run options : -yid applicationId and -yD with all dynamic properties found in the “dynamicPropertiesString” of .yarn-properties $USER file

      • Same as case 2


      CASE 4

      flink-conf.yaml : nothing
      flink run options : -yD yarn.properties-file.location=mydir

      • Tries to connect to local (non yarn) job manager (and fails)


      CASE 5

      Even weirder:

      flink-conf.yaml : yarn.properties-file.location: mydir
      flink run options : -yD yarn.properties-file.location=mydir

      • Still tries to connect to local (non yarn) job manager!


      Without any other solution, I've made a shell script that copies the original content of FLINK_CONF_DIR in a temporary dir, modify flink-conf.yaml to set yarn.properties-file.location, and change FLINK_CONF_DIR to that temp dir before executing flink to submit the job.

      I am now able to select the container I want, but I think it should be made simpler…


      Logs extracts:

      CASE 1:

      2018:02:01 15:43:20 - Waiting until all TaskManagers have connected2018:02:01 15:43:20 - Starting client actor system.2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.2018:02:01 15:43:20 - Trying to select the network interface and address to use by connecting to the leading JobManager.2018:02:01 15:43:20 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics2018:02:01 15:43:21 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/ 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.2018:02:01 15:43:21 - Slf4jLogger started2018:02:01 15:43:21 - Starting remoting2018:02:01 15:43:21 - Remoting started; listening on addresses :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.2018:02:01 15:43:21 - TaskManager status (2/1)2018:02:01 15:43:21 - All TaskManagers are connected2018:02:01 15:43:21 - Submitting job with JobID: f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId: f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a JobManager yet.2018:02:01 15:43:21 - Received job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77).2018:02:01 15:43:21 - Disconnect from JobManager null.2018:02:01 15:43:21 - Connect to JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].2018:02:01 15:43:21 - Connected to JobManager at Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] with leader session id 388af5b8-5555-4923-8ee4-8a4b9bfbb0b9.2018:02:01 15:43:21 - Sending message to JobManager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and wait for progress2018:02:01 15:43:21 - Upload jar files to job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.2018:02:01 15:43:21 - Blob client connecting to akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager2018:02:01 15:43:22 - Submit job to the job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.2018:02:01 15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was successfully submitted to the JobManager akka://flink/deadLetters.2018:02:01 15:43:22 - 02/01/2018 15:43:22   Job execution switched to status RUNNING.


      CASE 2:

      2018:02:01 15:48:43 - Waiting until all TaskManagers have connected2018:02:01 15:48:43 - Starting client actor system.2018:02:01 15:48:43 - Trying to select the network interface and address to use by connecting to the leading JobManager.2018:02:01 15:48:43 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics2018:02:01 15:48:43 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/ 15:48:43 - Slf4jLogger started2018:02:01 15:48:43 - Starting remoting2018:02:01 15:48:43 - Remoting started; listening on addresses :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:34140]2018:02:01 15:48:43 - TaskManager status (2/1)2018:02:01 15:48:43 - All TaskManagers are connected2018:02:01 15:48:43 - Submitting job with JobID: cd3e0e223c57d01d415fe7a6a308576c. Waiting for job completion.2018:02:01 15:48:43 - Received SubmitJobAndWait(JobGraph(jobId: cd3e0e223c57d01d415fe7a6a308576c)) but there is no connection to a JobManager yet.2018:02:01 15:48:43 - Received job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c).2018:02:01 15:48:43 - Disconnect from JobManager null.2018:02:01 15:48:43 - Connect to JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].2018:02:01 15:48:43 - Connected to JobManager at Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] with leader session id 00000000-0000-0000-0000-000000000000.2018:02:01 15:48:43 - Sending message to JobManager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager to submit job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c) and wait for progress2018:02:01 15:48:43 - Upload jar files to job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.2018:02:01 15:48:43 - Blob client connecting to akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager2018:02:01 15:48:45 - Submit job to the job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.2018:02:01 15:49:45 - Terminate JobClientActor.2018:02:01 15:49:45 - Disconnect from JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].



      Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.        at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)        at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)        at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)        at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)        at akka.actor.ActorCell.invoke(ActorCell.scala:487)        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)        at akka.dispatch.Mailbox.run(Mailbox.scala:220)        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


      CASE 3,4

       ** 2018:02:01 15:35:14 - Starting client actor system.2018:02:01 15:35:14 - Trying to select the network interface and address to use by connecting to the leading JobManager.2018:02:01 15:35:14 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics2018:02:01 15:35:14 - Retrieved new target address localhost/ 15:35:15 - Trying to connect to address localhost/ 15:35:15 - Failed to connect from address 'elara-edge-u2-n01/': Connexion refusée (Connection refused)2018:02:01 15:35:15 - Failed to connect from address '/': Connexion refusée (Connection refused)2018:02:01 15:35:15 - Failed to connect from address '/': Connexion refusée (Connection refused)2018:02:01 15:35:15 - Failed to connect from address '/': Connexion refusée (Connection refused)2018:02:01 15:35:15 - Failed to connect from address '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas accessible (connect failed)2018:02:01 15:35:15 - Failed to connect from address '/': Connexion refusée (Connection refused)2018:02:01 15:35:15 - Failed to connect from address '/': Connexion refusée (Connection refused)2018:02:01 15:35:15 - Failed to connect from address '/': Connexion refusée (Connection refused)2018:02:01 15:35:15 - Failed to connect from address '/': Connexion refusée (Connection refused)2018:02:01 15:35:15 - Failed to connect from address '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas accessible (connect failed)2018:02:01 15:35:15 - Failed to connect from address '/': Connexion refusée (Connection refused)2018:02:01 15:35:15 - Failed to connect from address '/': Connexion refusée (Connection refused)






            Unassigned Unassigned
            ArnaudL Arnaud Linz
            0 Vote for this issue
            3 Start watching this issue

