Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.3.0, 1.4.0, 1.5.0, 1.6.0
-
None
-
None
Description
steps to reproduce:
1. build a standalone flink cluster.
2. submit a test job like this below:
public class DemoJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.disableOperatorChaining(); env.setParallelism(4); env.enableCheckpointing(3000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); DataStream<String> inputStream = env.addSource(new StringGeneratorParallelSourceFunction()); inputStream.map(String::hashCode).print(); env.execute(); } public static class StringGeneratorParallelSourceFunction extends RichParallelSourceFunction<String> { private static final Logger LOG = LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class); private volatile boolean running = true; private int index; private int subtask_nums; @Override public void open(Configuration parameters) throws Exception { index = getRuntimeContext().getIndexOfThisSubtask(); subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks(); } @Override public void run(SourceContext<String> ctx) throws Exception { while (running) { String data = UUID.randomUUID().toString(); ctx.collect(data); LOG.info("subtask_index = {}, emit string = {}", index, data); Thread.sleep(50); if (index == subtask_nums / 2) { running = false; LOG.info("subtask_index = {}, finished.", index); } } } @Override public void cancel() { running = false; } } }
3. observer jm and tm logs can be found.
taskmanager.log
2018-06-21 17:05:54,144 INFO com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570 2018-06-21 17:05:54,151 INFO com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe 2018-06-21 17:05:54,195 INFO com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - subtask_index = 2, finished. 2018-06-21 17:05:54,200 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to FINISHED. 2018-06-21 17:05:54,201 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9). 2018-06-21 17:05:54,201 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED] 2018-06-21 17:05:54,202 INFO org.apache.flink.yarn.YarnTaskManager - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source (6b2a374bec5f31112811613537dd4fd9) 2018-06-21 17:05:54,211 INFO com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction - subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d
jobmanager.log
2018-06-21 17:05:52,682 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING. 2018-06-21 17:05:52,683 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (2/4) (de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING. 2018-06-21 17:05:54,219 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to FINISHED. 2018-06-21 17:05:54,224 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (3/4) (8f523afb97dc848a9578f9cae5870421) switched from RUNNING to FINISHED. 2018-06-21 17:05:54,224 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Unnamed (3/4) (39f76a87d20cfc491e11f0b5b08ec5c2) switched from RUNNING to FINISHED. 2018-06-21 17:05:55,069 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom Source (3/4) is not being executed at the moment. Aborting checkpoint. 2018-06-21 17:05:58,067 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom Source (3/4) is not being executed at the moment. Aborting checkpoint. 2018-06-21 17:06:01,067 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom Source (3/4) is not being executed at the moment. Aborting checkpoint. 2018-06-21 17:06:04,067 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom Source (3/4) is not being executed at the moment. Aborting checkpoint.
-------------------------------------------------------------------------------------------------
I think we should filter out the FINISHED tasks, for tasks that need trigger checkpoint or need ack checkpoint. more details, see `org.apache.flink.runtime.checkpoint.CheckpointCoordinator` class.
Attachments
Issue Links
- duplicates
-
FLINK-2491 Support Checkpoints After Tasks Finished
- Closed