Details
-
Bug
-
Status: Resolved
-
P3
-
Resolution: Fixed
-
2.1.0
-
Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 (2017-08-06) x86_64 GNU/Linux
Description
Hi,
I followed the guide https://beam.apache.org/get-started/quickstart-java/ to run beam program within a flink cluster.
The output of the dependency-command is:
mvn dependency:tree -Pflink-runner |grep flink [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime [INFO] +- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime [INFO] | +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime [INFO] | \- org.apache.flink:force-shading:jar:1.3.0:runtime [INFO] +- org.apache.flink:flink-core:jar:1.3.0:runtime [INFO] | +- org.apache.flink:flink-annotations:jar:1.3.0:runtime [INFO] +- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime [INFO] +- org.apache.flink:flink-java:jar:1.3.0:runtime [INFO] | +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime [INFO] +- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime [INFO] +- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
Then I started the flink cluster with the correct version with docker-compose
export JOB_MANAGER_RPC_ADDRESS=[HOST_IP] export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10 docker-compose up -d
The compose file looks like this:
version: '3.3' services: jobmanager: image: ${FLINK_DOCKER_IMAGE_NAME:-flink} expose: - "6123" ports: - "6123:6123" - "8081:8081" volumes: - /tmp:/tmp command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] taskmanager: image: ${FLINK_DOCKER_IMAGE_NAME:-flink} expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager environment: - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
The flink cluster works, but when I execute
mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Pflink-runner \ -Dexec.args="--runner=FlinkRunner \ --inputFile=pom.xml \ --output=/path/to/counts \ --flinkMaster=[HOST_IP]:6123 \ --filesToStage=target/word-count-beam-bundled-0.1.jar"
I get:
2017-09-12 06:39:57,226 INFO org.apache.flink.runtime.jobmanager.JobManager - Submitting job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199). 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.jobmanager.JobManager - Using restart strategy NoRestartStrategy for a913f922506053e65e732eeb8336b3bd. 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via failover strategy: full graph restart 2017-09-12 06:39:57,229 INFO org.apache.flink.runtime.jobmanager.JobManager - Running initialization on master for job wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd). 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager - Failed to submit job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199) org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at Read(CompressedSource) (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:66) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:150) ... 24 more Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290) at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:63) ... 25 more Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) ... 26 more