Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12101

Race condition when concurrently running uploaded jars via REST

    Details

      Description

      Flink enables to upload and run Jars via REST. When multiple uploaded jars are invoked interactively to generate the JobGraph, the static initialization of the ContextEnvironment, when calls are interleaved, will override each other and produce a local execution of the jar. The local execution uses an incorrect class loader and throws an exception like this:

      2019-04-02 14:25:05,549 ERROR <pipeline class>   - Failed to create job graph
      java.lang.RuntimeException: Pipeline execution failed
          at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:117)
          at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
          at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
          at <pipeline class run>
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
          at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
          at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
          at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
          at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
          at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:117)
          at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$7(JarRunHandler.java:151)
          at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
          at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
          at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
          at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
          at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:125)
          at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:114)
          ... 18 more
      Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
          at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.createStreamRecordWriters(StreamTask.java:1164)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:212)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:190)
          at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.(SourceStreamTask.java:51)
          at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.(StoppableSourceStreamTask.java:39)
          at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
          at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
          at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
          at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
          at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1398)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:682)
          ... 1 more
      Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector
          at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
          at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
          at java.lang.Class.forName0(Native Method)
          at java.lang.Class.forName(Class.java:348)
          at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
          at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
          at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
          at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
          at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
          at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
          at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
          at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
          at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
          at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
          at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
          at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
          at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
          at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
          at java.util.ArrayList.readObject(ArrayList.java:797)
          at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
          at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
          at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
          at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
          at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
          at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
          at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:395)
          ... 12 more
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                xleesf leesf
                Reporter:
                mxm Maximilian Michels
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m