Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12247

fix NPE when writing an archive file to a FileSystem

    XMLWordPrintableJSON

    Details

      Description

      Issue detail info

      In our hadoop product env, we use fixed-delay restart-strategy.

      restart-strategy: fixed-delay
      restart-strategy.fixed-delay.attempts: 20
      restart-strategy.fixed-delay.delay: 2 s
      

      if a flink-job reaches the max attempt count, the flink job will write an archive file to FileSystem and shut down.

      but when SubtaskExecutionAttemptDetailsHandler handle the detail attempt info of subtask, met NEP.

      Detailed reasons are as follows:

      0. Assume a scenario, a flink job reaches the max attempt count, ( 20 )

      1. ExecutionVertex is a parallel subtask of the execution. Each ExecutionVertex was created with MAX_ATTEMPTS_HISTORY_SIZE( default value: 16 ).

      2. when SubtaskExecutionAttemptDetailsHandler hand will get the attempt from ++priorExecutions,

         but priorExecutions just retained MAX_ATTEMPTS_HISTORY_SIZE elemets, so some element

         was dropped from the head of the list(FIFO). so may return null.

      Detailed StackTrace

      java.lang.NullPointerException
         at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
         at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
         at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
         at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
         at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
         at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
         at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
         at org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
         at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
         at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
         at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      
      

      Minimal reproducible example

      public static void main(String[] args) throws Exception {
      
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
          DataStream<String> text = env.addSource(new SourceFunction<String>() {
              @Override
              public void run(SourceContext<String> ctx) throws Exception {
                  while (true) {
                      ctx.collect("aaaa");
                      Thread.sleep(100);
                  }
              }
      
              @Override
              public void cancel() {
      
              }
          });
      
          text.addSink(new SinkFunction<String>() {
              @Override
              public void invoke(String value, Context context) throws Exception {
                  System.out.println(1 / 0);
              }
          });
      
          env.execute();
      
      }
      

       

        Attachments

        1. fix-nep.patch
          4 kB
          lamber-ken

          Issue Links

            Activity

              People

              • Assignee:
                lamber-ken lamber-ken
                Reporter:
                lamber-ken lamber-ken
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Due:
                  Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m