Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-2943

Non-existing fileToStage results in ClassNotFoundException

Details

    • Bug
    • Status: Resolved
    • P3
    • Resolution: Fixed
    • 2.1.0
    • 2.14.0
    • runner-flink
    • Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 (2017-08-06) x86_64 GNU/Linux

    Description

      Hi,

      I followed the guide https://beam.apache.org/get-started/quickstart-java/ to run beam program within a flink cluster.

      The output of the dependency-command is:

      mvn dependency:tree -Pflink-runner |grep flink                                                                              
      [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
      [INFO]    +- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
      [INFO]    |  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
      [INFO]    |  \- org.apache.flink:force-shading:jar:1.3.0:runtime
      [INFO]    +- org.apache.flink:flink-core:jar:1.3.0:runtime
      [INFO]    |  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
      [INFO]    +- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
      [INFO]    +- org.apache.flink:flink-java:jar:1.3.0:runtime
      [INFO]    |  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
      [INFO]    +- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
      [INFO]    +- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
      

      Then I started the flink cluster with the correct version with docker-compose

      export JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
      export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10
      
      docker-compose up -d
      

      The compose file looks like this:

      version: '3.3'
      services:
        jobmanager:
          image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
          expose:
            - "6123"
          ports:
            - "6123:6123"
            - "8081:8081"
          volumes:
            - /tmp:/tmp
          command: jobmanager
          environment:
            - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
      
        taskmanager:
          image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
          expose:
            - "6121"
            - "6122"
          depends_on:
            - jobmanager
          command: taskmanager
          environment:
            - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
      

      The flink cluster works, but when I execute

      mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
          -Pflink-runner \
          -Dexec.args="--runner=FlinkRunner \
            --inputFile=pom.xml \
            --output=/path/to/counts \
            --flinkMaster=[HOST_IP]:6123 \
            --filesToStage=target/word-count-beam-bundled-0.1.jar"
      

      I get:

      2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Submitting job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199).
      2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Using restart strategy NoRestartStrategy for a913f922506053e65e732eeb8336b3bd.
      2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
      2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Running initialization on master for job wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
      2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199)
      org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at Read(CompressedSource) (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
      	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
      	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
      	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.Exception: Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
      	at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:66)
      	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:150)
      	... 24 more
      Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
      	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
      	at org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:63)
      	... 25 more
      Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
      	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      	at java.lang.Class.forName0(Native Method)
      	at java.lang.Class.forName(Class.java:348)
      	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
      	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
      	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
      	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
      	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
      	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
      	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
      	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
      	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
      	... 26 more
      

      Attachments

        Activity

          People

            mxm Maximilian Michels
            guenhter Guenther Grill
            Votes:
            2 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 40m
                40m