Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-26584

State Processor API fails to write savepoints exceeding 5MB

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.13.0, 1.14.0
    • None
    • API / State Processor
    • None

    Description

      • WritableSavepoint.write(…) falls back to JobManagerCheckpointStorage which restricts savepoint size to 5MiB

          - See relevant exception stack here [1]

          - This is because SavepointTaskManagerRuntimeInfo.getConfiguration() always returns empty Configuration, hence

          - Neither “state.checkpoint-storage” nor “state.checkpoints.dir” are/can be configured

          - ‘fix’: provide SavepointTaskManagerRuntimeInfo.getConfiguration() with a meaningful implementation and set configuration in SavepointEnvironment.getTaskManagerInfo()

       

      [1]
      8215140 MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)#0 ERROR BatchTask - Error in task code: MapPartition (bb312595cb5ccc27fd3b2c729bbb9136) (2/4)
      java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=180075318 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
      at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
      at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
      at org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
      at org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)
      at org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)
      at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
      at org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)
      at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:107)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=180075318 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
      at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:61)
      at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:141)
      at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:121)
      at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75)
      at org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:87)
      at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
      at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
      at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
      at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
      at java.util.concurrent.FutureTask.run(FutureTask.java)
      at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:633)
      ... 14 more

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Matthias Schwalbe Matthias Schwalbe
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: