Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
statefun-2.0.0, statefun-2.1.0, statefun-2.2.0
Description
When using the new statefun-flink-datastream integration the following error is thrown by the feedback -> union task when trying to restore from a savepoint:
java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167) ... 9 more Caused by: java.io.IOException: position out of bounds at org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228) ... 10 more
The error is only thrown when the feedback channel has been used.
I have tested with the example application and the error is thrown only if it is modified to actually use the feedback channel. I simply modified the invoke method to sometimes forward the greeting to a random name:
@Override public void invoke(Context context, Object input) { int seen = seenCount.updateAndGet(MyFunction::increment); context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, seen)); String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"}; ThreadLocalRandom random = ThreadLocalRandom.current(); int index = random.nextInt(names.length); final String name2 = names[index]; if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), input); }
Attachments
Attachments
Issue Links
- is caused by
-
FLINK-19741 InternalTimeServiceManager fails to restore due to corrupt reads if there are other users of raw keyed state streams
- Closed
-
FLINK-19748 KeyGroupRangeOffsets#KeyGroupOffsetsIterator should skip key groups that don't have a defined offset
- Closed
- links to