Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.16.0, 1.17.0
Description
When a job with state changelog enabled continues to restart, the following exceptions may occur :
java.lang.RuntimeException: java.io.FileNotFoundException: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp (No such file or directory) at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87) at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69) at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:107) at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:78) at org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:94) at org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:158) at org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:95) at org.apache.flink.changelog.fs.StateChangeIteratorImpl.read(StateChangeIteratorImpl.java:42) at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85) ... 21 more
Problem causes:
- ChangelogStreamHandleReaderWithCache use RefCountedFile manager local cache file. The reference count is incremented when the input stream is opened from the cache file, and decremented by one when the input stream is closed. So the input stream must be closed and only once.
- StateChangelogHandleStreamHandleReader#getChanges() may cause the input stream to be closed twice. This happens when changeIterator.read(tuple2.f0, tuple2.f1) throws an exception (for example, when the task is canceled for other reasons during the restore process) the current state change iterator will be closed twice.
private void advance() { while (!current.hasNext() && handleIterator.hasNext()) { try { current.close(); Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next(); LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0); current = changeIterator.read(tuple2.f0, tuple2.f1); } catch (Exception e) { ExceptionUtils.rethrow(e); } } } @Override public void close() throws Exception { current.close(); }
So we should make sure current state change iterator only be closed once. I suggest to make the following changes to StateChangelogHandleStreamHandleReader :
private boolean currentClosed = false; private void advance() { while (!current.hasNext() && handleIterator.hasNext()) { try { current.close(); currentClosed = true; Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next(); LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0); current = changeIterator.read(tuple2.f0, tuple2.f1); currentClosed = false; } catch (Exception e) { ExceptionUtils.rethrow(e); } } } @Override public void close() throws Exception { if (!currentClosed) { current.close(); } }
Attachments
Issue Links
- is related to
-
FLINK-28440 EventTimeWindowCheckpointingITCase failed with restore
- Reopened
- relates to
-
FLINK-28898 ChangelogRecoverySwitchStateBackendITCase.testSwitchFromEnablingToDisablingWithRescalingOut failed
- Resolved
-
FLINK-30107 Unstable test ChangelogRecoveryITCase#testMaterialization failed on azure
- Closed
- links to