Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19692

Can't restore feedback channel from savepoint

    XMLWordPrintableJSON

Details

    Description

      When using the new statefun-flink-datastream integration the following error is thrown by the feedback -> union task when trying to restore from a savepoint:

      java.lang.Exception: Exception while creating StreamOperatorStateContext.
          at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
          at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
          at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: java.io.IOException: position out of bounds
          at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
          at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
          at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
          ... 9 more
      Caused by: java.io.IOException: position out of bounds
          at org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
          at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
          at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
          at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
          ... 10 more
      

       The error is only thrown when the feedback channel has been used.

      I have tested with the example application and the error is thrown only if it is modified to actually use the feedback channel. I simply modified the invoke method to sometimes forward the greeting to a random name:

      @Override
      public void invoke(Context context, Object input) {
        int seen = seenCount.updateAndGet(MyFunction::increment);
        context.send(GREETINGS, String.format("Hello %s at the %d-th time", input, seen));
        String[] names = {"Stephan", "Igal", "Gordon", "Seth", "Marta"};
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int index = random.nextInt(names.length);
        final String name2 = names[index];
        if (random.nextDouble() < 0.5) context.send(new Address(GREET, name2), input);
      }
      

      Attachments

        Issue Links

          Activity

            People

              igal Igal Shilman
              Antti-Kaikkonen Antti Kaikkonen
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: