Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30561

ChangelogStreamHandleReaderWithCache cause FileNotFoundException

    XMLWordPrintableJSON

Details

    Description

      When a job with state changelog enabled continues to restart, the following exceptions may occur :

      java.lang.RuntimeException: java.io.FileNotFoundException: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp (No such file or directory)
          at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
          at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
          at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
          at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:107)
          at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:78)
          at org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:94)
          at org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
          at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
          at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
          at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
          at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
          at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265)
          at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
          at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
          at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
          at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
          at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.FileNotFoundException: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp (No such file or directory)
          at java.io.FileInputStream.open0(Native Method)
          at java.io.FileInputStream.open(FileInputStream.java:195)
          at java.io.FileInputStream.<init>(FileInputStream.java:138)
          at org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:158)
          at org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:95)
          at org.apache.flink.changelog.fs.StateChangeIteratorImpl.read(StateChangeIteratorImpl.java:42)
          at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
          ... 21 more 

      Problem causes:

      1. ChangelogStreamHandleReaderWithCache use RefCountedFile manager local cache file. The reference count is incremented when the input stream is opened from the cache file, and decremented by one when the input stream is closed. So the input stream must be closed and only once.
      2. StateChangelogHandleStreamHandleReader#getChanges() may cause the input stream to be closed twice. This happens when changeIterator.read(tuple2.f0, tuple2.f1) throws an exception (for example, when the task is canceled for other reasons during the restore process) the current state change iterator will be closed twice.
      private void advance() {
          while (!current.hasNext() && handleIterator.hasNext()) {
              try {
                  current.close();
                  Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
                  LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
                  current = changeIterator.read(tuple2.f0, tuple2.f1);
              } catch (Exception e) {
                  ExceptionUtils.rethrow(e);
              }
          }
      }
      
      @Override
      public void close() throws Exception {
          current.close();
      }

      So we should make sure current state change iterator only be closed once. I suggest to make the following changes to StateChangelogHandleStreamHandleReader :

      private boolean currentClosed = false;
      
      private void advance() {
          while (!current.hasNext() && handleIterator.hasNext()) {
              try {
                  current.close();
                  currentClosed = true;
      
                  Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
                  LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
                  current = changeIterator.read(tuple2.f0, tuple2.f1);
                  currentClosed = false;
              } catch (Exception e) {
                  ExceptionUtils.rethrow(e);
              }
          }
      }
      
      @Override
      public void close() throws Exception {
          if (!currentClosed) {
              current.close();
          }
      }

       

      cc yuanmei , roman .

      Attachments

        Issue Links

          Activity

            People

              Feifan Wang Feifan Wang
              Feifan Wang Feifan Wang
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: