Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13042

Prevent unexpected blocking in RegisterAndProcessBundleOperation hasFailed

Details

    • Improvement
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.35.0
    • runner-dataflow
    • None

    Description

      We have observed stack traces in stuck jobs as follows:
      — Threads (1): [Thread[pool-8-thread-1,5,main]] State: WAITING stack: —
      sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
      java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
      java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
      java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.hasFailed(RegisterAndProcessBundleOperation.java:407)
      org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:332)
      org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:326)
      org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$121/1203893465.run(Unknown Source)
      java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      java.lang.Thread.run(Thread.java:748)

      This code appears to not expect blocking as it checks isDone before calling get()

      public boolean hasFailed() throws ExecutionException, InterruptedException {
      if (processBundleResponse != null && processBundleResponse.toCompletableFuture().isDone())

      { return !processBundleResponse.toCompletableFuture().get().getError().isEmpty(); }

      else

      { // At the very least, we don't know that this has failed yet. return false; }

      }

      I'm unsure why this is occurring but it could be that the two calls to toCompletableFuture are returning different futures for some reason, or that the completion stage is somehow changing from done to undone. In either case this could by using a single call to CompletableFuture.getNow method which guarantees not to block.

      This affects the V1 Runner harness which isn't generally used but might as well be fixed.

      Attachments

        Issue Links

          Activity

            People

              scwhittle Sam Whittle
              scwhittle Sam Whittle
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h
                  1h