Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.6.4, 1.7.2, 1.8.0
-
None
-
Beam 2.12.0 or any other Beam version
Flink >= 1.6
Heap/Filesystem state backend (RocksDB works fine)
Description
The Flink Runner includes a test which verifies checkpoints/savepoints work correctly with Beam on Flink. When adding additional tests for scaleup/scaledown [1], I came across a bug with restoring the keyed state backend. After a fair amount of debugging Beam code and checking any potential issues with serializers, I think this could be a Flink issue.
Steps to reproduce:
1. git clone https://github.com/mxm/beam
2. cd beam && git checkout savepoint-problem
3. ./gradlew :runners:flink:1.6:test --tests "**.FlinkSavepointTest.testSavepointRestoreLegacy"
Error:
java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for DoFnOperator_76375152c4a81d5df72cf49e32c4ecb9_(4/4) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133) ... 5 more Caused by: java.lang.RuntimeException: Invalid namespace string: '' at org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:245) at org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:246) at org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:221) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:169) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45) at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:513) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:474) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:431) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:370) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ... 7 more
It is possible to change the maxParallelism to other values. The following lead to failure:
options.setMaxParallelism(128); // default value options.setMaxParallelism(64); options.setMaxParallelism(118);
The following work fine:
options.setMaxParallelism(110); options.setMaxParallelism(63); options.setMaxParallelism(24);
[1] https://github.com/apache/beam/commit/52d7291144f64eaa417862558d71a443fae3d690
Everything works fine with RocksDB.
Attachments
Issue Links
- is related to
-
BEAM-7144 Job re-scale fails on Flink >= 1.6 with certain values of maxParallelism
- Triage Needed