Uploaded image for project: 'Hadoop HDFS'
  1. Hadoop HDFS
  2. HDFS-15925

The lack of packet-level mirrorError state synchronization in BlockReceiver$PacketResponder can cause the HDFS client to hang

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 3.2.2
    • None
    • datanode

    Description

          When the datanode is receiving data block packets from a HDFS client and forwarding these packets to a mirror (another datanode) simultaneously, a single IOException in the datanode’s forwarding path can cause the client to get stuck for 1 min, without any logging. After 1 min, the client’s log shows a warning of EOFException and `Slow waitForAckedSeqno took 60106ms (threshold=30000ms)`.

          Normally the datanode will inform the client of this error state immediately, and then the client will resend the packets immediately. The whole process is very fast. After careful analyses, we find the above symptom is due to the lack of packet-level mirrorError state synchronization in BlockReceiver$PacketResponder: in some concurrency condition, the BlockReceiver$PacketResponder will hang for 1 min and then exit, without sending the error state to the client.

      Root Cause Analysis 

      //hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
      
      class BlockReceiver implements Closeable {
        // ...
      
        private void handleMirrorOutError(IOException ioe) throws IOException {
          // ...
          if (Thread.interrupted()) {
            throw ioe;
          } else { // encounter an error while writing to mirror
            // continue to run even if can not write to mirror
            // notify client of the error
            // and wait for the client to shut down the pipeline
            mirrorError = true;                                            // line 461
          }
        }
      
        private int receivePacket() throws IOException {
          // read the next packet
          packetReceiver.receiveNextPacket(in);                            // line 528
          // ...
          boolean lastPacketInBlock = header.isLastPacketInBlock();        // line 551
          //First write the packet to the mirror:
          if (mirrorOut != null && !mirrorError) {
            try {
              // ...
              packetReceiver.mirrorPacketTo(mirrorOut);                    // line 588
              // ...
            } catch (IOException e) {
              handleMirrorOutError(e);                                     // line 604
            }
          }
          // ...
          return lastPacketInBlock?-1:len;                                 // line 849
        }
      
        void receiveBlock(...) throws IOException {
          // ...
          try {
            if (isClient && !isTransfer) {
              responder = new Daemon(datanode.threadGroup, 
                  new PacketResponder(replyOut, mirrIn, downstreams));
              responder.start();                                           // line 968
            }
      
            while(receivePacket() >= 0){/*Receive until the last packet*/} // line 971
      
            // wait for all outstanding packet responses. And then
            // indicate responder to gracefully shutdown.
            // Mark that responder has been closed for future processing
            if (responder != null) {
              ((PacketResponder)responder.getRunnable()).close();          // line 977
              responderClosed = true;
            }
            // ...
          } catch (IOException ioe) {                                      // line 1003
            // ...
          } finally {
            // ...
            if (!responderClosed) { // Data transfer was not complete.
              if (responder != null) {
                // ...
                responder.interrupt();                                     // line 1046
              }
              // ...
            }
            if (responder != null) {
              try {
                responder.interrupt();                                     // line 1053
                // ...
              } catch (InterruptedException e) {
                responder.interrupt();                                     // line 1067
                // ...
              }
              // ...
            }
          }
        }
      }
      

          In the `BlockReceiver.receivePacket` method, if the datanode fails to forward the packet to the mirror ( (line 588) due to an IOException, it is handled by line 604, which sets the mirrorError flag in line 461. According to the comments, the BlockReceiver keeps going with the mirrorError state, and the client would be notified of the error.

          However, jstack shows that the datanode gets stuck in the `DataXceiver` thread (receiving data block packets from client) and the `BlockReceiver$PacketResponder` thread (replying ACK packets to client). In particular, the `DataXceiver` thread gets stuck in the loop in line 971, which is further caused by blocking in line 528, meaning that the `lastPacketInBlock` packet has not arrived, and no more packets are coming in.

      //hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
      
      class BlockReceiver implements Closeable {
        // ...
      
        class PacketResponder implements Runnable, Closeable {
          // ...
      
          public void run() {
            // ...
            while (isRunning() && !lastPacketInBlock) {
              // ...
              try {
                // ...
                PipelineAck ack = new PipelineAck();
                // ...
                try {
                  if (... && !mirrorError) {                               // line 1381
                    // ...
                    // read an ack from downstream datanode
                    ack.readFields(downstreamIn);                          // line 1384
                    // ...
                  }
                  // ...
                } catch (InterruptedException ine) {
                  isInterrupted = true;                                    // line 1434
                } catch (IOException ioe) {
                  if (Thread.interrupted()) {
                    isInterrupted = true;                                  // line 1437
                  } else ...
                }
      
                if (Thread.interrupted() || isInterrupted) {               // line 1458
                  // ...
                  LOG.info(myString + ": Thread is interrupted.");
                  running = false;
                  continue;                                                // line 1472
                }
                // ...
                sendAckUpstream(ack, expected, totalAckTimeNanos,          // line 1481
                  (pkt != null ? pkt.offsetInBlock : 0),
                  PipelineAck.combineHeader(datanode.getECN(), myStatus));
                // ...
              } catch (IOException e) {
                // ...
              } catch (Throwable e) {
                // ...
              }
            }
            LOG.info(myString + " terminating");
          }
      
          private void sendAckUpstream(...) throws IOException {
            try {
              // ...
      
              try {
                if (!running) return;
                sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,  // line 1568
                    offsetInBlock, myHeader);
              } finally {
                // ...
              }
            } catch (InterruptedException ie) {
              // ...
            }
          }
      
          private void sendAckUpstreamUnprotected(...) throws IOException {
            final int[] replies;
            if (ack == null) {
              // ...
              replies = new int[] { myHeader };
            } else if (mirrorError) { // ack read error
              int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
              int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
              replies = new int[] {h, h1};                                 // line 1602
            } else {
              // ...
            }
            PipelineAck replyAck = new PipelineAck(seqno, replies,
                totalAckTimeNanos);
            // ...
            replyAck.write(upstreamOut);                                   // line 1632
            // ...
          }
      
        }
      }
      

          The `BlockReceiver$PacketResponder` thread checks the mirrorError flag in line 1381. The `DataXceiver` thread is run concurrently. If `BlockReceiver$PacketResponder` finds mirrorError is false, it will try to read the ACK packet from downstream (the mirror, another datanode) in line 1384, which is a blocking call.

          However, there is a race condition. If the mirrorError flag set by the `handleMirrorOutError` method is noticed in line 1381, then the `BlockReceiver$PacketResponder` thread will not run the blocking network I/O call in line 1384. Instead, it will go to line 1481, and then 1568, and then 1632. According to the code around line 1602, this ACK contains `Status.ERROR` which can warn the client. On the contrary, if the mirrorError flag is set after the timing of line 1381, the `BlockReceiver$PacketResponder` thread gets blocked in line 1384. In our scenario, a data block packet is not sent to the mirror datanode due to the IOException, so the corresponding ACK packet will not be sent by the mirror datanode either. Therefore, the `BlockReceiver$PacketResponder` thread will be blocked here for a long time.

      Fix

          The key is to avoid the problematic concurrency between `BlockReceiver#receivePacket` and the ACK packet (from downstream mirror datanode) reading in `BlockReceiver$PacketResponder`. The simplest way to do it is that, every time `BlockReceiver#receivePacket` successfully forwards a packet to the downstream mirror datanode, we grant one chance for `BlockReceiver$PacketResponder` to check the mirrorError state and read the ACK with the blocking I/O call. It is reasonable because if the datanode has not sent the packet, it is impossible for the `BlockReceiver$PacketResponder` to get the corresponding ACK.

          The implementation only needs a semaphore in `BlockReceiver$PacketResponder`, and will not affect the other components.

      P.S.

          We only talk about the reasoning on the symptom and the fix of this issue here. Actually this bug is also related to some behaviors in client side, but the reasoning would be a little complex. We have the complete analysis (https://docs.google.com/document/d/1Hq1qhbNFfS7y9zTNZ0VXsN3rxqExlMPaAqz4RfCurpE/edit?usp=sharing) for reference, which analyzes the packet receiving & sending threads of datanode & client and explain how the aforementioned injection can make these 4 threads stuck in "deadlock".

      Reproduction

          Start HDFS (1 namanode, 2 datanodes) with the default configuration. Then execute a client (we used the command `bin/hdfs dfs -copyFromLocal ./foo.txt /1.txt` in the terminal). For each data block packet the client sends to the datanode, the datanode forwards it by line 588 in `BlockReceiver.java` (https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java#L588). Inject one single IOException there.

          Most of the time, we don't have the concurrency condition to trigger this bug. Now the reliable way we use to reproduce this bug is setting `dfs.blocksize` as `1m` in `hdfs-site.xml`. Then run `bin/hdfs dfs -copyFromLocal ./foo.txt /1.txt` where `./foo.txt` is a file of 15MB (generated from `fallocate -l 15000000 foo.txt`). Then do the aforementioned injection in the timing of the 12th occurrence of https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java#L747.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              functioner Haoze Wu
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h