Uploaded image for project: 'Hadoop HDFS'
  1. Hadoop HDFS
  2. HDFS-15206

HDFS synchronous reads from local file system

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • None
    • None
    • datanode

    Description

      Hello everyone,

      I ran a simple benchmark with runs ``` hadoop fs -get /file1.txt ```, and file1.txt has 1MB size and I capture the workflow of requests using XTrace. By evaluating the workflow trace, I noticed that datanode issues 64KB synchronous read request to local file system to read the data, and sends the data back and waits for completion. I had a code walk over HDFS code to verify the pattern and it was correct. I want to have two suggestions, (1) since each file in HDFS block size is usually 128MB, We could use the mmap mapping via FileChannel class to load the file into memory and enable file system prefetching and look ahead in background, instead of synchronously reading from disk. The second suggestion is to use asynchronous read operations to local disk of the datanode. I was wondering if there is a logic behind synchronous reads from the file system?

       

      Code: $HADOOP_SRC/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java line 586

        /**
         * Sends a packet with up to maxChunks chunks of data.
         * 
         * @param pkt buffer used for writing packet data
         * @param maxChunks maximum number of chunks to send
         * @param out stream to send data to
         * @param transferTo use transferTo to send data
         * @param throttler used for throttling data transfer bandwidth
         */
        private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
            boolean transferTo, DataTransferThrottler throttler) throws IOException {
          int dataLen = (int) Math.min(endOffset - offset,
                                   (chunkSize * (long) maxChunks));
          
          int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
          int checksumDataLen = numChunks * checksumSize;
          int packetLen = dataLen + checksumDataLen + 4;
          boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;    // The packet buffer is organized as follows:
          // _______HHHHCCCCD?D?D?D?
          //        ^   ^
          //        |   \ checksumOff
          //        \ headerOff
          // _ padding, since the header is variable-length
          // H = header and length prefixes
          // C = checksums
          // D? = data, if transferTo is false.
          
          int headerLen = writePacketHeader(pkt, dataLen, packetLen);
          
          // Per above, the header doesn't start at the beginning of the
          // buffer
          int headerOff = pkt.position() - headerLen;
          
          int checksumOff = pkt.position();
          byte[] buf = pkt.array();
          
          if (checksumSize > 0 && checksumIn != null) {
            readChecksum(buf, checksumOff, checksumDataLen);      // write in progress that we need to use to get last checksum
            if (lastDataPacket && lastChunkChecksum != null) {
              int start = checksumOff + checksumDataLen - checksumSize;
              byte[] updatedChecksum = lastChunkChecksum.getChecksum();
              
              if (updatedChecksum != null) {
                System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
              }
            }
          }
          
          int dataOff = checksumOff + checksumDataLen;
          if (!transferTo) { // normal transfer
            IOUtils.readFully(blockIn, buf, dataOff, dataLen);      
            if (verifyChecksum) {
              verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
            }
          }
          
          try {
            if (transferTo) {
              SocketOutputStream sockOut = (SocketOutputStream)out;
              // First write header and checksums
              sockOut.write(buf, headerOff, dataOff - headerOff);
              
              // no need to flush since we know out is not a buffered stream
              FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
              LongWritable waitTime = new LongWritable();
              LongWritable transferTime = new LongWritable();
              sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
                  waitTime, transferTime);
              datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
              datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
              blockInPosition += dataLen;
            } else {
              // normal transfer
              out.write(buf, headerOff, dataOff + dataLen - headerOff);
            }
          } catch (IOException e) {
            if (e instanceof SocketTimeoutException) {
              /*
               * writing to client timed out.  This happens if the client reads
               * part of a block and then decides not to read the rest (but leaves
               * the socket open).
               * 
               * Reporting of this case is done in DataXceiver#run
               */
            } else {
              /* Exception while writing to the client. Connection closure from
               * the other end is mostly the case and we do not care much about
               * it. But other things can go wrong, especially in transferTo(),
               * which we do not want to ignore.
               *
               * The message parsing below should not be considered as a good
               * coding example. NEVER do it to drive a program logic. NEVER.
               * It was done here because the NIO throws an IOException for EPIPE.
               */
              String ioem = e.getMessage();
              if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
                LOG.error("BlockSender.sendChunks() exception: ", e);
              }
              datanode.getBlockScanner().markSuspectBlock(
                    volumeRef.getVolume().getStorageID(),
                    block);
            }
            throw ioeToSocketException(e);
          }    if (throttler != null) { // rebalancing so throttle
            throttler.throttle(packetLen);
          }
          
          /* Retro throttle */
          { throttlingpoint.throttle(); }    return dataLen;
        }
        
        /**
         * Read checksum into given buffer
         * @param buf buffer to read the checksum into
         * @param checksumOffset offset at which to write the checksum into buf
         * @param checksumLen length of checksum to write
         * @throws IOException on error
         */
        private void readChecksum(byte[] buf, final int checksumOffset,
            final int checksumLen) throws IOException {
          if (checksumSize <= 0 && checksumIn == null) {
            return;
          }
          try {
            checksumIn.readFully(buf, checksumOffset, checksumLen);
          } catch (IOException e) {
            LOG.warn(" Could not read or failed to veirfy checksum for data"
                + " at offset " + offset + " for block " + block, e);
            IOUtils.closeStream(checksumIn);
            checksumIn = null;
            if (corruptChecksumOk) {
              if (checksumOffset < checksumLen) {
                // Just fill the array with zeros.
                Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
              }
            } else {
              throw e;
            }
          }
        }
      
      

       

      XTrace: http://brownsys.github.io/tracing-framework/xtrace/server/

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            maniaabdi Mania Abdi
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: