Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12653

Keyed state backend fails to restore during rescaling

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Critical
    • Resolution: Unresolved
    • Affects Version/s: 1.6.4, 1.7.2, 1.8.0
    • Fix Version/s: None
    • Labels:
      None
    • Environment:

      Beam 2.12.0 or any other Beam version
      Flink >= 1.6
      Heap/Filesystem state backend (RocksDB works fine)

      Description

      The Flink Runner includes a test which verifies checkpoints/savepoints work correctly with Beam on Flink. When adding additional tests for scaleup/scaledown [1], I came across a bug with restoring the keyed state backend. After a fair amount of debugging Beam code and checking any potential issues with serializers, I think this could be a Flink issue.

      Steps to reproduce:

      1. git clone https://github.com/mxm/beam
      2. cd beam && git checkout savepoint-problem
      3. ./gradlew :runners:flink:1.6:test --tests "**.FlinkSavepointTest.testSavepointRestoreLegacy"

      Error:

      java.lang.Exception: Exception while creating StreamOperatorStateContext.
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for DoFnOperator_76375152c4a81d5df72cf49e32c4ecb9_(4/4) from any of the 1 provided restore options.
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133)
      	... 5 more
      Caused by: java.lang.RuntimeException: Invalid namespace string: ''
      	at org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:245)
      	at org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:246)
      	at org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:221)
      	at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
      	at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:169)
      	at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
      	at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:513)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:474)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:431)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:370)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105)
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
      	... 7 more
      

      It is possible to change the maxParallelism to other values. The following lead to failure:

         options.setMaxParallelism(128); // default value
         options.setMaxParallelism(64);
          options.setMaxParallelism(118);
      

      The following work fine:

          options.setMaxParallelism(110);
          options.setMaxParallelism(63);
          options.setMaxParallelism(24);
      

      [1] https://github.com/apache/beam/commit/52d7291144f64eaa417862558d71a443fae3d690

      Everything works fine with RocksDB.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                mxm Maximilian Michels
              • Votes:
                0 Vote for this issue
                Watchers:
                14 Start watching this issue

                Dates

                • Created:
                  Updated: