Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4425

"Out Of Memory" during savepoint deserialization

    Details

      Description

      I've created savepoint and trying to start job using it (via -s param) and getting exception like below:

      Exception
      java.lang.OutOfMemoryError: Java heap space
              at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:167)
              at org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:42)
              at org.apache.flink.runtime.checkpoint.savepoint.FsSavepointStore.loadSavepoint(FsSavepointStore.java:133)
              at org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:201)
              at org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
              at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
              at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
              at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
              at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
              at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
              at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
              at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
              at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      jobmanager.heap.mb: 1280
      taskmanager.heap.mb: 1024

      java 1.8

      savepoint + checkpoint size < 1 Mb in total

        Activity

        Hide
        uce Ufuk Celebi added a comment -

        Thanks for reporting this.

        (1) Is it possible to share your user program with some data?

        If not possible, could you (2) trigger the savepoint with the job having a MemoryStateBackend and share the savepoint file? That way the savepoint will be self-contained and you can share it here.

        I can then try to reproduce it.

        Show
        uce Ufuk Celebi added a comment - Thanks for reporting this. (1) Is it possible to share your user program with some data? If not possible, could you (2) trigger the savepoint with the job having a MemoryStateBackend and share the savepoint file? That way the savepoint will be self-contained and you can share it here. I can then try to reproduce it.
        Hide
        shturman Sergii Koshel added a comment -
        Show
        shturman Sergii Koshel added a comment - savepoint-c25e4b360a7d.zip attached
        Hide
        uce Ufuk Celebi added a comment -

        Thanks for sharing. I just looked into the code and saw that we fixed something since 1.1.1. There was a call to is.read instead of is.readFully. It could be that only parts of the stream are read into the serialized data and then set a wrong length value. I'm pretty sure that this is the problem and we can only fix this with 1.1.2.

        It would help very much if you have some spare time to check out the `release-1.1` branch, build it from sources and then try to restore your original savepoint with it.

        git clone https://github.com/apache/flink.git
        cd flink
        git checkout -b release-1.1 origin/release-1.1
        mvn clean package -DskipTests
        cd build-target
        cp <your config>/flink-conf.yaml conf/
        bin/start-cluster.sh
        bin/flink run -s <original savepoint> ...
        
        Show
        uce Ufuk Celebi added a comment - Thanks for sharing. I just looked into the code and saw that we fixed something since 1.1.1. There was a call to is.read instead of is.readFully . It could be that only parts of the stream are read into the serialized data and then set a wrong length value. I'm pretty sure that this is the problem and we can only fix this with 1.1.2. It would help very much if you have some spare time to check out the `release-1.1` branch, build it from sources and then try to restore your original savepoint with it. git clone https: //github.com/apache/flink.git cd flink git checkout -b release-1.1 origin/release-1.1 mvn clean package -DskipTests cd build-target cp <your config>/flink-conf.yaml conf/ bin/start-cluster.sh bin/flink run -s <original savepoint> ...
        Hide
        shturman Sergii Koshel added a comment -

        It works with `release-1.1` branch.

        Show
        shturman Sergii Koshel added a comment - It works with `release-1.1` branch.
        Hide
        uce Ufuk Celebi added a comment -

        OK thanks! Will be fixed in 1.1.2. Sorry about the inconvenience.

        Show
        uce Ufuk Celebi added a comment - OK thanks! Will be fixed in 1.1.2 . Sorry about the inconvenience.
        Hide
        uce Ufuk Celebi added a comment -

        Fixed in 3f3bab10b9ca68eb31a7ef5a31e49145b51006fd (master), 19de8ec01a9ec2b3ac0fdf0052b780f970b9bcd1(release-1.1).

        Show
        uce Ufuk Celebi added a comment - Fixed in 3f3bab10b9ca68eb31a7ef5a31e49145b51006fd (master), 19de8ec01a9ec2b3ac0fdf0052b780f970b9bcd1(release-1.1).

          People

          • Assignee:
            Unassigned
            Reporter:
            shturman Sergii Koshel
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development