Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8320

Flink cluster does not work on Java 9

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.4.0
    • 1.4.1, 1.5.0
    • None
    • flink-1.4.0, mac os x, 10.13.1

    Description

      Recently got a new macbook and figured it was a good time to install java 9 and try it out. I didn't realize that Java 9 was such a breaking update (eg: https://blog.codefx.org/java/java-9-migration-guide/) and took the Flink documentation at face value and assumed that Java 7+ or higher would be fine.

      Here's is what happens after starting a local cluster and attempting to run the sample WordCount program under Java 9:

      flink-1.4.0 $ export JAVA_HOME=$(/usr/libexec/java_home -v 9)
      
      cru@lappy:flink-1.4.0 $ java -version
      java version "9.0.1"
      Java(TM) SE Runtime Environment (build 9.0.1+11)
      Java HotSpot(TM) 64-Bit Server VM (build 9.0.1+11, mixed mode)
      
      cru@lappy:flink-1.4.0 $ bin/start-cluster.sh
      Starting cluster.
      Starting jobmanager daemon on host lappy.local.
      Starting taskmanager daemon on host lappy.local.
      
      cru@lappy:flink-1.4.0 $ bin/flink run examples/streaming/WordCount.jar
      Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
      Using address localhost:6123 to connect to JobManager.
      JobManager web interface address http://localhost:8081
      Starting execution of program
      Executing WordCount example with default input data set.
      Use --input to specify file input.
      Printing result to stdout. Use --output to specify output path.
      Submitting job with JobID: ee054ffeb4784848143b76b7d51d99c1. Waiting for job completion.
      
      ------------------------------------------------------------
       The program finished with the following exception:
      
      org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
      	at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
      	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
      	at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
      	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
      	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
      	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
      	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
      	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
      	at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
      	at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
      	at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
      	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
      	at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
      	at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
      	... 18 more
      Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager.
      	at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:219)
      	at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104)
      	at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71)
      	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
      	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      WARNING: An illegal reflective access operation has occurred
      WARNING: Illegal reflective access by org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil (file:/Users/cru/proj/flink/flink-1.4.0/lib/flink-dist_2.11-1.4.0.jar) to method java.nio.DirectByteBuffer.cleaner()
      WARNING: Please consider reporting this to the maintainers of org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
      WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
      WARNING: All illegal access operations will be denied in a future release
      

      Strangely, the logs seemed to suggest that the JobManager was running fine before submitting the job, so figured this was just a problem with the client. The long timeout also made it seem like a low level network issue.

      Changing to Java 8 (without bouncing the local cluster) similarly times out as well, but gives a slightly different error. Including it here just for posterity in case someone doesn't bounce the server like I did (To be clear: in this case JobManager and TaskManager processes are still running under java 9.):

      $ java -version
      java version "1.8.0_151"
      Java(TM) SE Runtime Environment (build 1.8.0_151-b12)
      Java HotSpot(TM) 64-Bit Server VM (build 25.151-b12, mixed mode)
      
      cru@lappy:flink-1.4.0 $ bin/flink run examples/streaming/WordCount.jar
      Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
      Using address localhost:6123 to connect to JobManager.
      JobManager web interface address http://localhost:8081
      Starting execution of program
      Executing WordCount example with default input data set.
      Use --input to specify file input.
      Printing result to stdout. Use --output to specify output path.
      Submitting job with JobID: 6bd8fb1a904098473634a7290fbde812. Waiting for job completion.
      Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1031166856] with leader session id 00000000-0000-0000-0000-000000000000.
      
      ------------------------------------------------------------
       The program finished with the following exception:
      
      org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Could not retrieve BlobServer address.
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
      	at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
      	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
      	at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
      	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
      	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
      	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
      	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
      	at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
      	at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
      	at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
      	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
      Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could not retrieve BlobServer address.
      	at org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:166)
      	at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:97)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.util.concurrent.TimeoutException
      	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
      	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      	at org.apache.flink.runtime.client.JobSubmissionClientActor$1.call(JobSubmissionClientActor.java:160)
      	... 9 more
      

      While I get that supporting Java 9 completely is probably a larger task, at the very least we should update documentation / prereqs to say that Flink is not yet compatible.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              rockpunk Steve Layland
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: