Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12247

fix NPE when writing an archive file to a FileSystem

    XMLWordPrintableJSON

Details

    Description

      Issue detail info

      In our hadoop product env, we use fixed-delay restart-strategy.

      restart-strategy: fixed-delay
      restart-strategy.fixed-delay.attempts: 20
      restart-strategy.fixed-delay.delay: 2 s
      

      if a flink-job reaches the max attempt count, the flink job will write an archive file to FileSystem and shut down.

      but when SubtaskExecutionAttemptDetailsHandler handle the detail attempt info of subtask, met NEP.

      Detailed reasons are as follows:

      0. Assume a scenario, a flink job reaches the max attempt count, ( 20 )

      1. ExecutionVertex is a parallel subtask of the execution. Each ExecutionVertex was created with MAX_ATTEMPTS_HISTORY_SIZE( default value: 16 ).

      2. when SubtaskExecutionAttemptDetailsHandler hand will get the attempt from ++priorExecutions,

         but priorExecutions just retained MAX_ATTEMPTS_HISTORY_SIZE elemets, so some element

         was dropped from the head of the list(FIFO). so may return null.

      Detailed StackTrace

      java.lang.NullPointerException
         at org.apache.flink.runtime.rest.handler.util.MutableIOMetrics.addIOMetrics(MutableIOMetrics.java:88)
         at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.createDetailsInfo(SubtaskExecutionAttemptDetailsHandler.java:140)
         at org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler.archiveJsonWithPath(SubtaskExecutionAttemptDetailsHandler.java:120)
         at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.archiveJsonWithPath(WebMonitorEndpoint.java:780)
         at org.apache.flink.runtime.dispatcher.JsonResponseHistoryServerArchivist.archiveExecutionGraph(JsonResponseHistoryServerArchivist.java:57)
         at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:758)
         at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:730)
         at org.apache.flink.runtime.dispatcher.MiniDispatcher.jobReachedGloballyTerminalState(MiniDispatcher.java:138)
         at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$startJobManagerRunner$6(Dispatcher.java:341)
         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
         at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
         at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      
      
      

      Minimal reproducible example

      public static void main(String[] args) throws Exception {
      
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
          DataStream<String> text = env.addSource(new SourceFunction<String>() {
              @Override
              public void run(SourceContext<String> ctx) throws Exception {
                  while (true) {
                      ctx.collect("aaaa");
                      Thread.sleep(100);
                  }
              }
      
              @Override
              public void cancel() {
      
              }
          });
      
          text.addSink(new SinkFunction<String>() {
              @Override
              public void invoke(String value, Context context) throws Exception {
                  System.out.println(1 / 0);
              }
          });
      
          env.execute();
      
      }
      

       

      Attachments

        1. fix-nep.patch
          4 kB
          lamber-ken

        Issue Links

          Activity

            People

              lamber-ken lamber-ken
              lamber-ken lamber-ken
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m