Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
1.11.0, 1.12.0
Description
If the source in SourceStreamTask implements CheckpointedFunction and an exception is thrown in the snapshotState method, then the SourceStreamTask will always hang.
The main reason is that the checkpoint is executed in the mailbox. When the CheckpointedFunction#snapshotState of the source throws an exception, the StreamTask#cleanUpInvoke will be called, where it will wait for the end of the LegacySourceFunctionThread of the source. However, the source thread does not end by itself (this requires the user to control it), the Task will hang at this time, and the JobMaster has no perception of this behavior.
protected void cleanUpInvoke() throws Exception { getCompletionFuture().exceptionally(unused -> null).join(); //wait for the end of the source // clean up everything we initialized isRunning = false; ... }
I think we should call the cancel method of the source first, and then wait for the end.
The following is my test code, the test branch is Flink's master branch.
@Test public void testSourceFailure() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(2000L); env.setRestartStrategy(RestartStrategies.noRestart()); env.addSource(new FailedSource()).addSink(new DiscardingSink<>()); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); try { // assert that the job only execute checkpoint once and only failed once. TestUtils.submitJobAndWaitForResult( cluster.getClusterClient(), jobGraph, getClass().getClassLoader()); } catch (JobExecutionException jobException) { Optional<FlinkRuntimeException> throwable = ExceptionUtils.findThrowable(jobException, FlinkRuntimeException.class); Assert.assertTrue(throwable.isPresent()); Assert.assertEquals( CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, throwable.get().getMessage()); } // assert that the job only failed once. Assert.assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get()); } private static class FailedSource extends RichParallelSourceFunction<String> implements CheckpointedFunction { private transient boolean running; @Override public void open(Configuration parameters) throws Exception { running = true; } @Override public void run(SourceContext<String> ctx) throws Exception { while (running) { ctx.collect("test"); } } @Override public void cancel() { running = false; } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { throw new RuntimeException("source failed"); } @Override public void initializeState(FunctionInitializationContext context) throws Exception {} }
Attachments
Issue Links
- links to