Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.6.3, 1.8.0
Description
Issue detail info
In our hadoop product env, we use fixed-delay restart-strategy.
restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 20 restart-strategy.fixed-delay.delay: 2 s
if a flink-job reaches the max attempt count, the flink job will write an archive file to FileSystem and shut down.
but when SubtaskExecutionAttemptDetailsHandler handle the detail attempt info of subtask, met NEP.
Detailed reasons are as follows:
0. Assume a scenario, a flink job reaches the max attempt count, ( 20 )
1. ExecutionVertex is a parallel subtask of the execution. Each ExecutionVertex was created with MAX_ATTEMPTS_HISTORY_SIZE( default value: 16 ).
2. when SubtaskExecutionAttemptDetailsHandler hand will get the attempt from ++priorExecutions,
but priorExecutions just retained MAX_ATTEMPTS_HISTORY_SIZE elemets, so some element
was dropped from the head of the list(FIFO). so may return null.
Detailed StackTrace
java.lang.NullPointerException at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140) at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120) at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780) at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57) at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758) at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730) at org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Minimal reproducible example
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> ctx) throws Exception { while (true) { ctx.collect("aaaa"); Thread.sleep(100); } } @Override public void cancel() { } }); text.addSink(new SinkFunction<String>() { @Override public void invoke(String value, Context context) throws Exception { System.out.println(1 / 0); } }); env.execute(); }
Attachments
Attachments
Issue Links
- is duplicated by
-
FLINK-12183 Job Cluster doesn't stop after cancel a running job in per-job Yarn mode
- Closed
- links to