Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-14214

DomainSocketWatcher::add()/delete() should not self interrupt while looping await()

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Our hive team found a TPCDS job whose queries running on LLAP seem to be getting stuck. Dozens of threads were waiting for the DfsClientShmManager::lock, as following jstack:

      Thread 251 (IO-Elevator-Thread-5):
        State: WAITING
        Blocked count: 3871
        Wtaited count: 4565
        Waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@16ead198
        Stack:
          sun.misc.Unsafe.park(Native Method)
          java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
          java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1976)
          org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
          org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
          org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
          org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
          org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
          org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
          org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
          org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
          org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
          org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
          org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
          org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
          org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
          org.apache.hadoop.fs.FSDataInputStream.readFully(FSDataInputStream.java:111)
          org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readStripeFooter(RecordReaderUtils.java:166)
          org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata.<init>(OrcStripeMetadata.java:64)
          org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.readStripesMetadata(OrcEncodedDataReader.java:622)
      

      The thread that is expected to signal those threads is calling DomainSocketWatcher::add() method, but it gets stuck there dealing with InterruptedException infinitely. The jstack is like:

      Thread 44417 (TezTR-257387_2840_12_10_52_0):
        State: RUNNABLE
        Blocked count: 3
        Wtaited count: 5
        Stack:
          java.lang.Throwable.fillInStackTrace(Native Method)
          java.lang.Throwable.fillInStackTrace(Throwable.java:783)
          java.lang.Throwable.<init>(Throwable.java:250)
          java.lang.Exception.<init>(Exception.java:54)
          java.lang.InterruptedException.<init>(InterruptedException.java:57)
          java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2034)
          org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
          org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
          org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
          org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
          org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
          org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
          org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
          org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
          org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
          org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
          org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
          org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
          org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
          org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
      

      The whole job makes no progress because of this.

      The thread in DomainSocketWatcher::add() is expected to eventually break the while loop where it waits for the newly added entry being deleted by another thread. However, if this thread is ever interrupted, chances are that it will hold the lock forever so if(!toAdd.contains(entry)) always be false.

      DomainSocketWatcher::add()
        public void add(DomainSocket sock, Handler handler) {
          lock.lock();
          try {
            ......
            toAdd.add(entry);
            kick();
            while (true) {
              try {
                processedCond.await();
              } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
              }
              if (!toAdd.contains(entry)) {
                break;
              }
            }
          } finally {
            lock.unlock();
          }
        }
      

      The reason here is that, this method catches the InterruptedException and self interrupts during await(). The await() method internally calls AbstractQueuedSynchronizer::await(), which will throw a new InterruptedException if it's interrupted.

      AbstractQueuedSynchronizer::await()
              public final void await() throws InterruptedException {
                  if (Thread.interrupted())
                      throw new InterruptedException();
                  Node node = addConditionWaiter();
                  ...
      

      Our code in DomainSocketWatcher::add() catches this exception (again) and self interrupt (again). Please note in this process, the associated lock is never released so that the other thread which is supposed to make if(!toAdd.contains(entry)) be true is still pending on the lock.

      DomainSocketWatcher::delete() has similar code logic and should suffer from similar problems.

      Thanks Jason Dere for testing and reporting this.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            liuml07 Mingliang Liu Assign to me
            liuml07 Mingliang Liu
            Votes:
            0 Vote for this issue
            Watchers:
            13 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment