Issue Details (XML | Word | Printable)

Key: HADOOP-3164
Type: Improvement Improvement
Status: Closed Closed
Resolution: Fixed
Priority: Major Major
Assignee: Raghu Angadi
Reporter: Raghu Angadi
Votes: 0
Watchers: 4
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

Use FileChannel.transferTo() when data is read from DataNode.

Created: 03/Apr/08 07:22 PM   Updated: 08/Jul/09 04:43 PM
Component/s: None
Affects Version/s: None
Fix Version/s: 0.18.0

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works HADOOP-3164.patch 2008-04-23 10:32 PM Raghu Angadi 20 kB
Text File Licensed for inclusion in ASF works HADOOP-3164.patch 2008-04-21 07:45 PM Raghu Angadi 17 kB
Text File Licensed for inclusion in ASF works HADOOP-3164.patch 2008-04-18 09:53 PM Raghu Angadi 16 kB
Text File Licensed for inclusion in ASF works HADOOP-3164.patch 2008-04-08 03:44 AM Raghu Angadi 15 kB
Text File Licensed for inclusion in ASF works HADOOP-3164.patch 2008-04-08 03:29 AM Raghu Angadi 15 kB
Text File Licensed for inclusion in ASF works HADOOP-3164.patch 2008-04-05 12:30 AM Raghu Angadi 14 kB
Issue Links:
Reference
 

Hadoop Flags: Reviewed
Release Note:
Changed data node to use FileChannel.tranferTo() to transfer block data.
Resolution Date: 25/Apr/08 12:10 AM


 Description  « Hide
HADOOP-2312 talks about using FileChannel's transferTo() and transferFrom() in DataNode.

At the time DataNode neither used NIO sockets nor wrote large chunks of contiguous block data to socket. Hadoop 0.17 does both when data is seved to clients (and other datanodes). I am planning to try using transferTo() in the trunk. This might reduce DataNode's cpu by another 50% or more.

Once HADOOP-1702 is committed, we can look into using transferFrom().



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Raghu Angadi added a comment - 03/Apr/08 07:31 PM

This essentially means we need to implement write-timeout ourselves instead of using wait implementation in SocketOutputStream. May be SocketOutputStream could support something like waitForRead().

Raghu Angadi added a comment - 04/Apr/08 01:46 AM

bummer... transferTo does not handle non-blocking sockets well. I have a patch that works fine when the socket channel is in blocking mode. JavaDoc clearly indicates it can handle non-blocking sockets!. One hack is to interprete IOException and look for "Resource temporarily" at the beginning of the message.

I get the following with non-blocking sockets :

java.io.IOException: Resource temporarily unavailable
        at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
        at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:418)
        at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:519)
        at org.apache.hadoop.dfs.DataNode.transferToFully(DataNode.java:1500)
        at org.apache.hadoop.dfs.DataNode.access$900(DataNode.java:84)
        at org.apache.hadoop.dfs.DataNode$BlockSender.sendChunks(DataNode.java:1807)
        at org.apache.hadoop.dfs.DataNode$BlockSender.sendBlock(DataNode.java:1880)
        at org.apache.hadoop.dfs.DataNode$DataXceiver.readBlock(DataNode.java:1032)
        at org.apache.hadoop.dfs.DataNode$DataXceiver.run(DataNode.java:961)
        at java.lang.Thread.run(Thread.java:619)

Looks like all it needs to do is just to look for EAGAIN from the system call.


Raghu Angadi added a comment - 04/Apr/08 04:25 AM
'mostly good' patch attacheed.

With the patch,initial tests show DataNode takes about 1/5th to 1/4th the CPU compared to trunk while reading data. That is about 10 times faster than 0.16.

With the patch, for majority of the data the path changes from
'file --> direct buffer --> Java buffer --> direct buffer ---> socket'
to 'file ---> socket',
which mostly explains slightly better than 4 times less cpu.

The main remaining issue is with non-blocking sockets, as mentioned in the previous comment. One option is to blocking sockets and have one thread that enforces write timeout.


Doug Cutting added a comment - 04/Apr/08 04:34 PM
LUCENE-1121 did not find that transferTo made things any faster. But it still might be worth trying here too.

Raghu Angadi added a comment - 04/Apr/08 05:48 PM
The last comment in LUCENE-1121 seems to indicate it is measuring wall clock times. Is is so? Wall clock time would depend on what the bottleneck is. The comparisons here are only for CPU consumed by DataNode. Still there are some surprising results on Windows XP. I will go through them in more detail.

Raghu Angadi added a comment - 05/Apr/08 12:30 AM
This patch does not make the socket blocking. Is the following hack to check for "EAGAIN" acceptable? The attached patch passes all the tests on Linux.
[...] try {
        nTransfered = (int) inChannel.transferTo(startPos, len, outChannel);
      } catch (IOException e) {
        /* at least jdk1.6.0 on Linux seems to throw IOException
         * when the socket is full. Hopefully near future verisions will 
         * handle EAGAIN better. For now look for a specific string in for 
         * the message for the exception.
         */
        if (e.getMessage().startsWith("Resource temporarily unavailable")) {
          out.waitForWritable();
          continue;
        } else {
          throw e;
        }
      }

The IOException message could be different on other systems. For now, we could use transferTo() only on the systems where we know the text of the message. I think this issue will be fixed in next versions of JRE.


Raghu Angadi added a comment - 07/Apr/08 08:53 PM
On windows, transferTo() returns 0 as expected. This is what I am planning to do.
  1. Linux & Windows : will use out.getChannel() as the output channel for transferTo() (which implies Java runtime does best it can). On Linux this transfer happens in kernel and saves a little more than 75% cpu. I don't know about Windows, it can't be worse 50% (see below).
  2. Rest of the platforms: will use out for transferTo(), which implies data transfer happens through an internal direct buffer. This should give about 50% cpu savings. Once we or some users confirm behavior of transferTo() with non-blocking sockets, we could use out.getChannel() for those platforms.

Here, out is a o.a.h.ipc.SocketOutputStream.

I think using the real socket channel for Linux is worth the CPU savings.


Raghu Angadi added a comment - 07/Apr/08 10:10 PM - edited
> transferTo does not handle non-blocking sockets well.
Looks like this is fixed in Java SE 7 (Dolphin) http://bugs.sun.com/view_bug.do?bug_id=5103988

Edit: corrected url for the bug.


Raghu Angadi added a comment - 08/Apr/08 03:29 AM
Patch for review.

Raghu Angadi added a comment - 09/Apr/08 06:15 PM - edited
The following table shows 'dfs -cat' of 10Gb of data. This is a disk bound test and CPU is measured from /proc/pid/stat. io.file.buffer.size is 128k. This is cluster with single datanode and the client and datanode are on the same machine. The three fields reported for each run are user cpu, kernel cpu, and wall clock time. "total cpu" is sum of user and kernel cpu for DataNode process.
Test Bound by Run1 Run 2 Run 3 Cpu % Avg Total Cpu
Trunk Disk 2589u 5208k 253s 2659u 5162k 265s 2827u 5341k 328s 100% 7929
Trunk + patch Disk 0474u 1038k 228s 0477u 1031k 232s 0611u 1189k 301s 20% 1607

This shows DataNode takes about 80% less cpu.

Also, since we don't actually allocate any user buffer, we could actually invoke transferTo() to send even larger amounts of data at a time. I haven't experimented with larger buffer sizes.


Raghu Angadi added a comment - 18/Apr/08 09:53 PM
patch merged with trunk.

Konstantin Shvachko added a comment - 19/Apr/08 02:02 AM
  1. DataNode.useChannelForTransferTo
    I am not in favor of a lot of very OS-dependent and even OS version dependent code. Rather than including all known OSs that we observed do not have the problem we should assume that all OS do well and take actions on those that don't when this reported.
    This is translated into that we should eliminate boolean useChannelForTransferTo and retain the part of the code that corresponds to the true value.
  2. DataNode.transferToFully()
    • Analyzing IOException message text is BAD. Instead, lets try to call waitForWritable() before transferTo(). The expectation is that if the Socket buffer is full waitForWritable() will wait until there is space to write to, and this will be a workaround the Linux EAGAIN bug Raghu mentioned.
    • I'd make transferToFully() a member of SocketOutputStream rather than a DataNode static method.
  3. BlockSender.sendBlock()
    • I am not sure I understand the why the new argument. What is wrong with declaring as
      long sendBlock(OutputStream out, Throttler throttler) throws IOException {
        if( out == null ) {
          throw new IOException( "out stream is null" );
        }
        this.out = out;
        ......................................
      }

      and then calling it as

      long read = blockSender.sendBlock(out, throttler);
         or
      long read = blockSender.sendBlock(baseStream, throttler);
    • Also the use of this.out and (the parameter) out is very ambiguous here.

Raghu Angadi added a comment - 19/Apr/08 04:06 AM
  • #1 : useChannelForTransferTo can be removed. Since there in a bug in Linux, I was conservative about untested OSes. But we need not be.
  • #2 always calling waitForWritable() before transferTo() will work (for practical purposes). Extra 4 system calls mostly won't be noticeable. Main thing I was wondering is that we might still hit the Linux bug in rare cases since there is no promise that sendfile() will not return EAGAIN after select() returns socket is writable. I will leave a comment to this effect.
    • How about IOUtils rather than SocketOutputStream for transferToFully()? Its more like readFully().
  • #3. sendblock() needs a DataOutputStream to write checksum and some more stuff. If it is not passed then sendBlock needs to create one. I think current interface is ok (it is DataNode internal).
    • this.out and out existed before. But we can fix it.

I will update the patch.


Sam Pullara added a comment - 21/Apr/08 03:34 AM - edited
I tried enabling this on Mac OS X 10.5.2 (added a comparison for Mac OS X). Throughput dropped by 8x and CPU only dropped by 2x. My test was just cat'ing a 13G file out of a DataNode with 2 disks:

hadoop -fs cat wikipedia.xml > /dev/null

Without the patch I can get around 110-120MB/s (about the average speed of the two disks) while with the patch enabled I get around 16MB/s.


Raghu Angadi added a comment - 21/Apr/08 07:45 PM
Updated patch includes feedback from Konstantin. Also improves documentation.

Regd Sam's experiment on Mac OS :
8x is very surprising. Sam sent me iostat during the test and the main difference seems to be that size of each read from the disk with out the patch is around 350 KB and just 4 KB with the patch. The latest patch uses real socket channel with transferTo() on all the platforms. Please check if it improves the test.


Sam Pullara added a comment - 21/Apr/08 08:06 PM
Same result. Each read is ~4k and makes it go quite slow.

Tried Java 5 & 6 for good measure.


Raghu Angadi added a comment - 21/Apr/08 10:46 PM
Thanks Sam. I am assuming you ran with large value (like 64 KB or 128KB) for io.file.buffer.size. Given this, options I can think of are :
  1. have an internal config variable to turn on this feature, with default off.
  2. 1st option + with default on on Linux (and any other OS with positive results) and off on the rest.
  3. No need to have this code.
  4. Always on.

I hope last two options are ruled out.

My preference is 2nd option. Every option has (obvious) pros and cons. One isolated and well commented check for OS is not such a terrible thing (well, may be it is). Hadoop already has those.. its not the first such check. It does not mean we are in favor of such checks. Do I need to make a better case? Votes are welcome.


Sam Pullara added a comment - 21/Apr/08 10:53 PM
Tested this with io.file.buffer.size set to 128K instead of the default 4K and got great results. Over 50% reduction in CPU time used by the DataNode. We can't have the default be on if the default buffer size is small.

dhruba borthakur added a comment - 21/Apr/08 11:14 PM
It would be nice if we can keep code same on all platforms as much as possible. One option would be to raise the default io.file.buffer.size to something like 64K and use FileChannel.transferTo on all platforms.

Raghu Angadi added a comment - 21/Apr/08 11:31 PM
Not all OSes are so bad. Linux does not suffer from such a problem. Not sure whether it is Java or Mac OS that is some how prohibiting read ahead for these sequential reads.

So we could enable this on datanode as long as the buffer size is >= 64k. Of course, Linux with smaller buffer sizes will also suffer.


Sam Pullara added a comment - 21/Apr/08 11:53 PM
So on Linux with 4k buffersizes you see strong performance with this patch? Needing to raise the buffersize is limited to only Mac? It is hard to tell from your previous comment.

Raghu Angadi added a comment - 22/Apr/08 12:03 AM - edited
Yes. On Linux there is not much difference between the two cases with buffer size set to 4k (based on my preliminary tests).. iostat looks pretty much the same (size of each read is large) with and without the patch.

Sam Pullara added a comment - 22/Apr/08 12:14 AM
Yeah, not clear where the fault is on the Mac side. WIthout this patch there isn't much difference in performance with the two buffer sizes so it must be related to the NIO implementation in some way.

Doug Cutting added a comment - 22/Apr/08 04:19 PM
The point of the patch is to bypass the buffer. So making the buffer big doesn't improve the utility of transferTo(), rather just hides the fact that the implemention of transferTo() on the Mac sucks. It thus makes no sense to gate the use of transferTo() on the bufferSize.

Also, increasing the default buffer size can have a significant impact on memory usage, since some applications open lots of streams, and each open stream frequently has several buffers. When we find that a particular operation benefits from a larger buffer, it is usually best to increase just its buffer size rather than the default buffer size for all Hadoop i/o streams.

I think option (2) still looks best.


Sam Pullara added a comment - 22/Apr/08 04:35 PM
I agree Doug. I retested the pre-patch code on the Mac with the increased buffer size and it is faster and uses the same amount of CPU as the patched code does. Looks like no benefit on Mac OS X with the Apple VMs anyway. I haven't tested Soylatte.

Raghu Angadi added a comment - 22/Apr/08 04:47 PM

Also, increasing the default buffer size can have a significant impact on memory usage, since some applications open lots of streams, and each open stream frequently has several buffers. When we find that a particular operation benefits from a larger buffer, it is usually best to increase just its buffer size rather than the default buffer size for all Hadoop i/o streams.

Yes. I was not planning to increase default buffer size in this patch.


Raghu Angadi added a comment - 22/Apr/08 04:54 PM
Currently I am looking into a dip in DFSIO read test. I will upload a new version of the patch after fixing it.

Raghu Angadi added a comment - 22/Apr/08 10:50 PM

Buffer size might matter on Linux also to certain extent (may be 15 - 20%) between 4k and 128k. I could not reproduce this on my deb box, where the disk maxes out at 30-35 MB/s, but DFSIO seems to show.

TestDFSIO essentially tests (bursty) write and read of a 320MB file. Each mapper writes or reads such a file reports time taken for this IO. With the patch results showed 10-20% dip in DFSIO (smaller the cluster, larger difference). To avoid misconfiguration, I tried a path with hard coded 128KB buffer size while sending block and results came back to normal.

Once we confirm misconfiguration, we can consider a buffer size "cut off" for transferTo(). With transferTo(), DataNode does not actually allocate the buffer. In that sense, we could increase the size in DataNode without affecting client buffering (apart from slight increase in buffer for checksum).


Raghu Angadi added a comment - 22/Apr/08 10:56 PM

With transferTo(), DataNode does not actually allocate the buffer. In that sense, we could increase the size in DataNode without affecting client buffering (apart from slight increase in buffer for checksum).

I mean, DataNode could use some thing like max(64KB, configured buffer size) when transfer to is enabled. 64KB implies 512 bytes of checksum data. So client needs to read 512 bytes of checksum before it reads actual data.


Raghu Angadi added a comment - 23/Apr/08 12:29 AM

Just confirmed that the DFSIO test in fact ran with a buffer size of 4096 for DataNode (not sure about the client).

Given that smaller buffer size for transferTo() could negatively impact DataNode reads, any preference for the following options (when transferTo() is enabled by config) ?:

  1. have cut off buffer size. i.e. enable transferTo() only if buffer size is larger than, say 64k.
  2. set buffer size to max(cut off, configured). This does not affect client buffering.
  3. no cut off required.

My first choice : #2, second : #3. 'cut off' could be something like 128KB.


Raghu Angadi added a comment - 23/Apr/08 10:32 PM
Changes in the updated patch :
  • New internal config variable dfs.datanode.transferTo.allowed is added with default to true on all platforms.
  • Minimum transferTo size is 64KB.
  • TestPread is modified to test the read code path with out transferTo.

Hadoop QA added a comment - 24/Apr/08 02:21 AM
+1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12380800/HADOOP-3164.patch
against trunk revision 645773.

@author +1. The patch does not contain any @author tags.

tests included +1. The patch appears to include 3 new or modified tests.

javadoc +1. The javadoc tool did not generate any warning messages.

javac +1. The applied patch does not generate any new javac compiler warnings.

release audit +1. The applied patch does not generate any new release audit warnings.

findbugs +1. The patch does not introduce any new Findbugs warnings.

core tests +1. The patch passed core unit tests.

contrib tests +1. The patch passed contrib unit tests.

Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2314/testReport/
Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2314/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2314/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2314/console

This message is automatically generated.


Raghu Angadi added a comment - 24/Apr/08 07:31 PM
I tested reading a 2GB file on a small windows box. Test takes same amount of wall clock time (5-6min) in both cases. Didn't actually check CPU.

Konstantin Shvachko added a comment - 24/Apr/08 11:37 PM
+1. Code looks good.
I ran tests on Windows and Linux in order to verify that there is no degradation in performance.
Pre-patch and post-patch numbers on both systems are comparable.

Raghu Angadi added a comment - 25/Apr/08 12:10 AM
I just committed this.

Hudson added a comment - 25/Apr/08 01:25 PM