Hadoop HDFS
  1. Hadoop HDFS
  2. HDFS-918

Use single Selector and small thread pool to replace many instances of BlockSender for reads

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: datanode, performance
    • Labels:
      None

      Description

      Currently, on read requests, the DataXCeiver server allocates a new thread per request, which must allocate its own buffers and leads to higher-than-optimal CPU and memory usage by the sending threads. If we had a single selector and a small threadpool to multiplex request packets, we could theoretically achieve higher performance while taking up fewer resources and leaving more CPU on datanodes available for mapred, hbase or whatever. This can be done without changing any wire protocols.

      1. hbase-hdfs-benchmarks.ods
        23 kB
        Jay Booth
      2. hdfs-918-20100201.patch
        81 kB
        Jay Booth
      3. hdfs-918-20100203.patch
        83 kB
        Jay Booth
      4. hdfs-918-20100211.patch
        82 kB
        Jay Booth
      5. hdfs-918-20100228.patch
        73 kB
        Jay Booth
      6. hdfs-918-20100309.patch
        76 kB
        Jay Booth
      7. hdfs-918-branch20.2.patch
        91 kB
        Jay Booth
      8. hdfs-918-branch20-append.patch
        90 kB
        Jay Booth
      9. hdfs-918-pool.patch
        39 kB
        Jay Booth
      10. hdfs-918-TRUNK.patch
        81 kB
        Jay Booth
      11. hdfs-multiplex.patch
        83 kB
        Jay Booth

        Issue Links

          Activity

          Hide
          Jay Booth added a comment -

          Here's a first implementation – it works, passes TestDistributedFileSystem, TestDataTransferProtocol and TestPread. However, it has a direct dependency on FSDataset (not FSDatasetInterface) because it needs to get ahold of files directly to open FileChannels. This leads to ClassCastExceptions in all tests relying on SimulatedFSDataset. Would love to hear feedback about a way to resolve this.

          Have not benchmarked yet, I'll post another comment with an architectural description.

          Show
          Jay Booth added a comment - Here's a first implementation – it works, passes TestDistributedFileSystem, TestDataTransferProtocol and TestPread. However, it has a direct dependency on FSDataset (not FSDatasetInterface) because it needs to get ahold of files directly to open FileChannels. This leads to ClassCastExceptions in all tests relying on SimulatedFSDataset. Would love to hear feedback about a way to resolve this. Have not benchmarked yet, I'll post another comment with an architectural description.
          Hide
          Jay Booth added a comment -

          The DataTransferProtocol.PacketHeader class by Todd Lipcon was really useful for this work so I subsumed his patch into mine. If/when HDFS-881 gets committed, I'll be happy to rebase.

          Show
          Jay Booth added a comment - The DataTransferProtocol.PacketHeader class by Todd Lipcon was really useful for this work so I subsumed his patch into mine. If/when HDFS-881 gets committed, I'll be happy to rebase.
          Hide
          Jay Booth added a comment -

          Current patch is having some issues in terms of actual use – seems to pass a lot of tests but I'm having problems with ChecksumExceptions running actual MR jobs over it, so clearly it's a work in progress still I have a better patch that fixes some of the issues but it's still not 100%, so I'll upload with some new tests once I resolve remaining issues. Currently passing TestPread, TestDistributedFileSystem, TestDataTransferProtocol and TestClientBlockVerification, but still getting issues when actually running the thing – if somebody has recommendations for other tests to debug with, that'd be very welcome.

          ARCHITECTURE

          1) On client connect to DataXCeiver server, dispatch a thread as per usual, except now the thread is extremely short-lived. It simply registers the connection with server.datanode.ReadServer and dies. (It would be more efficient to have some sort of accept loop that didn't spawn a thread here, but I went with lowest impact integration)

          2) On register of a connection, ReadServer creates a Connection object and registers the channel with a selector inside of ReadServer.ResponseWriter. ResponseWriter maintains an ArrayBlockingQueue<Connection> workQueue and polls the selector, cancelling keys and adding connections which are ready for write to the work queue. ReadServer also maintains a BlockChannelPool, which is a pool of BlockChannel objects – each BlockChannel represents the file and meta-file for a given block.

          3) A small Handler pool takes items off of this work queue and calls connection.sendPacket(buffer, channelPool). Each handler maintains a DirectByteBuffer, instantiated at startup time, which it uses for all requests.

          4) Connection.sendPacket(buffer, channelPool) consults internal state about what needs to be sent next (response headers, packet headers, checksums, bytes) and sends what it can, updating internal state variables. Uses the provided buffer and channelPool to do its work. Uses transferTo unless the config property for transferTo is disabled. Right now it actually sends 2 packets per packet (header+sums and then bytes), once I resolve all correctness bugs it may be worth combining the two into one packet for small reads.

          5) After work has been done and internal state updated (even if only 1 byte was sent), Handler re-registers the Connection with ResponseWriter for further writes, or closes it if we're done.

          Once I have this fully working, I'd expect CPU savings from fewer long-running threads and less garbage collection of buffers, perhaps a small performance boost from the select-based architecture and using DirectByteBuffers instead of HeapByteBuffers, and a slight reduction in IOWAIT time under some circumstances because we're pooling file channels rather than re-opening for every request. It should also consume far fewer xceiver threads and open file handles while running – the pool is capped, so if we start getting crazy numbers of requests, we'll close/re-open files as necessary to stay under the cap.

          INTEGRATION
          As I said, I made the DataXCeiver thread for opReadBlock register the channel and then die. This is probably the best way to go even though it's not optimal from a performance standpoint. Unfortunately, since DataXCeiver threads close their sockets when they die, I had to put a special boolean case 'skipClose' to avoid that if op == Op.READ, which is kind of ugly – recommendations are welcome for what to do here.

          Also, as I noted earlier, the BlockChannelPool requires an instance of FSDataset to function, rather than FSDatasetInterface, because Interface doesn't supply any getFile() methods, just getInputStream() methods. Probably the best way to handle this for tests would be to have the SimulatedFSDataset write actual files to /tmp somewhere and provide handles to those files when running tests. Any thoughts from anyone?

          FUTURE
          Once I get this working, it might be worth exploring using this as a mechanism for repeated reads over the same datanode connection, which could give some pretty big performance gains to certain applications. Upon completion of a request, the ReadServer could simply put the connections in a pool that's polled every so often for isReadable() – if it's readable, read the request and re-register with the ResponseWriter.

          Along those lines, once we get there it might wind up being simpler to do the initial read-request through that mechanism as well, which would mean that we could get rid of some of the messy integration with DataXceiverServer – however, it would require opening up another port just for reads. What are people's thoughts on that? I won't make that a goal of this patch but I'd be curious as to people's thoughts regarding separate ports for read and write (and maybe op_transfer_block could be handled via IPCServer) – it could make things simpler in some ways while making them more of a pain in others.

          Show
          Jay Booth added a comment - Current patch is having some issues in terms of actual use – seems to pass a lot of tests but I'm having problems with ChecksumExceptions running actual MR jobs over it, so clearly it's a work in progress still I have a better patch that fixes some of the issues but it's still not 100%, so I'll upload with some new tests once I resolve remaining issues. Currently passing TestPread, TestDistributedFileSystem, TestDataTransferProtocol and TestClientBlockVerification, but still getting issues when actually running the thing – if somebody has recommendations for other tests to debug with, that'd be very welcome. ARCHITECTURE 1) On client connect to DataXCeiver server, dispatch a thread as per usual, except now the thread is extremely short-lived. It simply registers the connection with server.datanode.ReadServer and dies. (It would be more efficient to have some sort of accept loop that didn't spawn a thread here, but I went with lowest impact integration) 2) On register of a connection, ReadServer creates a Connection object and registers the channel with a selector inside of ReadServer.ResponseWriter. ResponseWriter maintains an ArrayBlockingQueue<Connection> workQueue and polls the selector, cancelling keys and adding connections which are ready for write to the work queue. ReadServer also maintains a BlockChannelPool, which is a pool of BlockChannel objects – each BlockChannel represents the file and meta-file for a given block. 3) A small Handler pool takes items off of this work queue and calls connection.sendPacket(buffer, channelPool). Each handler maintains a DirectByteBuffer, instantiated at startup time, which it uses for all requests. 4) Connection.sendPacket(buffer, channelPool) consults internal state about what needs to be sent next (response headers, packet headers, checksums, bytes) and sends what it can, updating internal state variables. Uses the provided buffer and channelPool to do its work. Uses transferTo unless the config property for transferTo is disabled. Right now it actually sends 2 packets per packet (header+sums and then bytes), once I resolve all correctness bugs it may be worth combining the two into one packet for small reads. 5) After work has been done and internal state updated (even if only 1 byte was sent), Handler re-registers the Connection with ResponseWriter for further writes, or closes it if we're done. Once I have this fully working, I'd expect CPU savings from fewer long-running threads and less garbage collection of buffers, perhaps a small performance boost from the select-based architecture and using DirectByteBuffers instead of HeapByteBuffers, and a slight reduction in IOWAIT time under some circumstances because we're pooling file channels rather than re-opening for every request. It should also consume far fewer xceiver threads and open file handles while running – the pool is capped, so if we start getting crazy numbers of requests, we'll close/re-open files as necessary to stay under the cap. INTEGRATION As I said, I made the DataXCeiver thread for opReadBlock register the channel and then die. This is probably the best way to go even though it's not optimal from a performance standpoint. Unfortunately, since DataXCeiver threads close their sockets when they die, I had to put a special boolean case 'skipClose' to avoid that if op == Op.READ, which is kind of ugly – recommendations are welcome for what to do here. Also, as I noted earlier, the BlockChannelPool requires an instance of FSDataset to function, rather than FSDatasetInterface, because Interface doesn't supply any getFile() methods, just getInputStream() methods. Probably the best way to handle this for tests would be to have the SimulatedFSDataset write actual files to /tmp somewhere and provide handles to those files when running tests. Any thoughts from anyone? FUTURE Once I get this working, it might be worth exploring using this as a mechanism for repeated reads over the same datanode connection, which could give some pretty big performance gains to certain applications. Upon completion of a request, the ReadServer could simply put the connections in a pool that's polled every so often for isReadable() – if it's readable, read the request and re-register with the ResponseWriter. Along those lines, once we get there it might wind up being simpler to do the initial read-request through that mechanism as well, which would mean that we could get rid of some of the messy integration with DataXceiverServer – however, it would require opening up another port just for reads. What are people's thoughts on that? I won't make that a goal of this patch but I'd be curious as to people's thoughts regarding separate ports for read and write (and maybe op_transfer_block could be handled via IPCServer) – it could make things simpler in some ways while making them more of a pain in others.
          Hide
          dhruba borthakur added a comment -

          This is good work.

          > it could make things simpler in some ways while making them more of a pain in others

          I agree. Plus it requires lots of code change. Is it possible for you to measure how much performance benefit this approach will yield, before you try to make it commit-ready?

          Show
          dhruba borthakur added a comment - This is good work. > it could make things simpler in some ways while making them more of a pain in others I agree. Plus it requires lots of code change. Is it possible for you to measure how much performance benefit this approach will yield, before you try to make it commit-ready?
          Hide
          Jay Booth added a comment -

          I'll definitely perform lots of benchmarking before asking for more of a formal review for commit. To be clear, this patch by itself doesn't open up any additional ports/services and attempts minimal change to existing code – the only major change is in opRead on DataXCeiver, replacing creation/execution of a BlockSender with a quick call to register() on my new readserver class (which should probably be renamed to MultiplexedBlockSender or something since it's not actually a server, doesn't expose any new services). The other changes are just the addition of 3 new classes. My newest patch is still at home and I'll upload it this weekend when I have some time, the current patch here doesn't completely work.

          Right now I expect this to yield some pretty decent savings in CPU time but maybe not in wall-clock time. I'm thinking about 2 approaches, running datanode with time while performing a bunch of IO tasks and checking CPU usage afterwards, and then measuring total throughput in TestDFSIO on an oversubscribed cluster so that we get CPU-bound and can demonstrate improvement that way. Any preference? Other suggestions? I'm a little worried that simply timing datanode will measure things like, how long I waited after start to launch a job, or background replication.. but I suppose those problems exist with any measurement.

          Also, I'm planning to add a parameter to configure usage of BlockSender vs MultiplexedBlockSender, does "dfs.datanode.multiplexReads=true" sound good?

          Show
          Jay Booth added a comment - I'll definitely perform lots of benchmarking before asking for more of a formal review for commit. To be clear, this patch by itself doesn't open up any additional ports/services and attempts minimal change to existing code – the only major change is in opRead on DataXCeiver, replacing creation/execution of a BlockSender with a quick call to register() on my new readserver class (which should probably be renamed to MultiplexedBlockSender or something since it's not actually a server, doesn't expose any new services). The other changes are just the addition of 3 new classes. My newest patch is still at home and I'll upload it this weekend when I have some time, the current patch here doesn't completely work. Right now I expect this to yield some pretty decent savings in CPU time but maybe not in wall-clock time. I'm thinking about 2 approaches, running datanode with time while performing a bunch of IO tasks and checking CPU usage afterwards, and then measuring total throughput in TestDFSIO on an oversubscribed cluster so that we get CPU-bound and can demonstrate improvement that way. Any preference? Other suggestions? I'm a little worried that simply timing datanode will measure things like, how long I waited after start to launch a job, or background replication.. but I suppose those problems exist with any measurement. Also, I'm planning to add a parameter to configure usage of BlockSender vs MultiplexedBlockSender, does "dfs.datanode.multiplexReads=true" sound good?
          Hide
          Todd Lipcon added a comment -

          I think a main benefit of this work will be scalability with a high number of concurrent connection.

          A good test would be to write a quick MR job that opens a few hundred DFSInputStreams from each mapper, all to the same block of the same file, and reads very slowly on them (one byte every few seconds, for example). This should be fine on actual disk IO since it will all hit the buffer cache, but will demonstrate whether this has a scalability improvement over the thread-per-reader.

          Show
          Todd Lipcon added a comment - I think a main benefit of this work will be scalability with a high number of concurrent connection. A good test would be to write a quick MR job that opens a few hundred DFSInputStreams from each mapper, all to the same block of the same file, and reads very slowly on them (one byte every few seconds, for example). This should be fine on actual disk IO since it will all hit the buffer cache, but will demonstrate whether this has a scalability improvement over the thread-per-reader.
          Hide
          Jay Booth added a comment -

          Good call Todd, that's definitely the canonical case for multiplexing, plus it should show some benefit because I recycle file handles across requests by pooling them, rather than opening a new file per request. I'll see if I can get something along those lines set up.

          Show
          Jay Booth added a comment - Good call Todd, that's definitely the canonical case for multiplexing, plus it should show some benefit because I recycle file handles across requests by pooling them, rather than opening a new file per request. I'll see if I can get something along those lines set up.
          Hide
          dhruba borthakur added a comment -

          The CPU time needed to dynamically create a thread on demand (as is done now) might not be much different from using a thread pool, is it not? That leaves us with the question of number of threads possible in a JVM.

          Todd's idea of a performance test sounds like a fine thing to do.

          Show
          dhruba borthakur added a comment - The CPU time needed to dynamically create a thread on demand (as is done now) might not be much different from using a thread pool, is it not? That leaves us with the question of number of threads possible in a JVM. Todd's idea of a performance test sounds like a fine thing to do.
          Hide
          Jay Booth added a comment -

          New patch..

          • new configuration params: dfs.datanode.multiplexBlockSender=true, dfs.datanode.multiplex.packetSize=32k, dfs.datanode.multiplex.numWorkers=3
          • Packet size is tuneable, possibly allowing better performance with larger TCP buffers enabled
          • Workers only wake up when a connection is writable
          • 3 new class files, minor changes to DataXceiverServer and DataXceiver, 2 utility classes added to DataTransferProtocol (one stolen from HDFS-881)
          • Passes tests from earlier comment plus a new one for files with lengths that don't match up to checksum chunk size, as well as holding up to some load on TestDFSIO
          • Still fails all tests relying on SimulatedFSDataset
          • Has a large amount of TRACE level debugging going on in MultiplexedBlockSender in case anybody wants to watch the output
          • Adds dependencies for commons-pool and commons-math (for benchmarking code)
          • Doesn't yet have benchmarks, but those should be easy now that the configuration is all in place
          Show
          Jay Booth added a comment - New patch.. new configuration params: dfs.datanode.multiplexBlockSender=true, dfs.datanode.multiplex.packetSize=32k, dfs.datanode.multiplex.numWorkers=3 Packet size is tuneable, possibly allowing better performance with larger TCP buffers enabled Workers only wake up when a connection is writable 3 new class files, minor changes to DataXceiverServer and DataXceiver, 2 utility classes added to DataTransferProtocol (one stolen from HDFS-881 ) Passes tests from earlier comment plus a new one for files with lengths that don't match up to checksum chunk size, as well as holding up to some load on TestDFSIO Still fails all tests relying on SimulatedFSDataset Has a large amount of TRACE level debugging going on in MultiplexedBlockSender in case anybody wants to watch the output Adds dependencies for commons-pool and commons-math (for benchmarking code) Doesn't yet have benchmarks, but those should be easy now that the configuration is all in place
          Hide
          Jay Booth added a comment -

          I haven't had a chance to run benchmarks yet, but I think that under lots of connections, the thread-per-connection model will spend more time swapping compared to getting work done, plus it has a few places where they "hot block" by doing while (buff.hasRemaining())

          { write() }

          . Only selecting the currently writeable connections and scheduling them sidesteps both issues while being less of a resource footprint - assuming it delivers on the performance. As soon as I get a chance, I'll write some benchmarks.

          If anyone wants to take a look at the code in the meantime, I think this patch is pretty easy to set up – just enable MultiplexBlockSender.LOG for TRACE and run tests, and you can see how each packet is built and sent. 'ant compile eclipse-files' will set up the extra dependencies on commons-pool and commons-math.

          Show
          Jay Booth added a comment - I haven't had a chance to run benchmarks yet, but I think that under lots of connections, the thread-per-connection model will spend more time swapping compared to getting work done, plus it has a few places where they "hot block" by doing while (buff.hasRemaining()) { write() } . Only selecting the currently writeable connections and scheduling them sidesteps both issues while being less of a resource footprint - assuming it delivers on the performance. As soon as I get a chance, I'll write some benchmarks. If anyone wants to take a look at the code in the meantime, I think this patch is pretty easy to set up – just enable MultiplexBlockSender.LOG for TRACE and run tests, and you can see how each packet is built and sent. 'ant compile eclipse-files' will set up the extra dependencies on commons-pool and commons-math.
          Hide
          Jay Booth added a comment -

          New patch. Streamlined MultiplexedBlockSender, we now have one selector per worker thread and no BlockingQueues, writeable connections are handled inline by each thread as they're available.

          Includes a utility class to read a file with a bunch of threads and time them.

          Ran some ad hoc jobs on my laptop and got similar performance to existing BlockSender, slightly faster for single file and slightly slower for 15 competing localhost threads.. which is exactly the opposite of what I boldly predicted. I read somewhere that linux thread scheduling for Java is disabled because it requires root, so it ignores priority – if that's the case, maybe running more threads is actually an advantage when all the readers are local and you're directly competing with them for CPU – you compete more effectively for limited resources with more threads.

          I'm gonna try and write an MR job to run some different scenarios on a cluster soon (thundering herd, steady medium, large number of idles, individual read).. I think the architecture here is more suited to large numbers of connections so if it did ok under a small number, then great. I'll be pretty busy for the next month or so but will try to get this running in a cluster at some point and report some more interesting numbers.

          Show
          Jay Booth added a comment - New patch. Streamlined MultiplexedBlockSender, we now have one selector per worker thread and no BlockingQueues, writeable connections are handled inline by each thread as they're available. Includes a utility class to read a file with a bunch of threads and time them. Ran some ad hoc jobs on my laptop and got similar performance to existing BlockSender, slightly faster for single file and slightly slower for 15 competing localhost threads.. which is exactly the opposite of what I boldly predicted. I read somewhere that linux thread scheduling for Java is disabled because it requires root, so it ignores priority – if that's the case, maybe running more threads is actually an advantage when all the readers are local and you're directly competing with them for CPU – you compete more effectively for limited resources with more threads. I'm gonna try and write an MR job to run some different scenarios on a cluster soon (thundering herd, steady medium, large number of idles, individual read).. I think the architecture here is more suited to large numbers of connections so if it did ok under a small number, then great. I'll be pretty busy for the next month or so but will try to get this running in a cluster at some point and report some more interesting numbers.
          Hide
          stack added a comment -

          Yeah, I wouldn't rely on setPriority having an effect, at least on platforms where hadoop is commonly deployed.

          Show
          stack added a comment - Yeah, I wouldn't rely on setPriority having an effect, at least on platforms where hadoop is commonly deployed.
          Hide
          ryan rawson added a comment -

          I have done some thinking about HBase performance in relation to HDFS, and right now we are currently bottlenecking reads to a single file. By moving to a re-entrant API (pread) we are looking to unleash the parallelism. This is important I think, because we want to push as many parallel reads from our clients down into Datanode then down into the kernel to benefit from the IO scheduling in the kernel & hardware.

          This could mean we might expect literally dozens of parallel reads per node on a busy cluster. Perhaps even hundreds! Per node. To ensure scalbility we'd probably want to get away from the xciever model, for more than 1 reason... If I remember correctly, xcivers not only consume threads (hundreds of threads is OK but non ideal) but it also consumes epolls, and there is just so many epolls available. So I heartily approve of the direction of this JIRA!

          Show
          ryan rawson added a comment - I have done some thinking about HBase performance in relation to HDFS, and right now we are currently bottlenecking reads to a single file. By moving to a re-entrant API (pread) we are looking to unleash the parallelism. This is important I think, because we want to push as many parallel reads from our clients down into Datanode then down into the kernel to benefit from the IO scheduling in the kernel & hardware. This could mean we might expect literally dozens of parallel reads per node on a busy cluster. Perhaps even hundreds! Per node. To ensure scalbility we'd probably want to get away from the xciever model, for more than 1 reason... If I remember correctly, xcivers not only consume threads (hundreds of threads is OK but non ideal) but it also consumes epolls, and there is just so many epolls available. So I heartily approve of the direction of this JIRA!
          Hide
          dhruba borthakur added a comment -

          > because we want to push as many parallel reads from our clients down into Datanode

          Which part of HBase are you referring to here? I can understand that HBase can use pread to read the contents of a file but am not able to understand why the same file be read by multiple region servers at the same time.

          Show
          dhruba borthakur added a comment - > because we want to push as many parallel reads from our clients down into Datanode Which part of HBase are you referring to here? I can understand that HBase can use pread to read the contents of a file but am not able to understand why the same file be read by multiple region servers at the same time.
          Hide
          ryan rawson added a comment -

          the problem was we were using a stateful interface previously because it was faster in scan tests, so we serialized reads within 1 RS to any given HFile. With multiple client handler threads asking for different parts of a large file, we get a serialized behaviour which hurts random get performance.

          So we are moving back to pread, which means we will get more parallelism - depending your table read pattern of course. But I want to get even more parallelism, by preading multiple hfiles during a scan/get for example. This will just up the thread pressure on the datanode.

          Show
          ryan rawson added a comment - the problem was we were using a stateful interface previously because it was faster in scan tests, so we serialized reads within 1 RS to any given HFile. With multiple client handler threads asking for different parts of a large file, we get a serialized behaviour which hurts random get performance. So we are moving back to pread, which means we will get more parallelism - depending your table read pattern of course. But I want to get even more parallelism, by preading multiple hfiles during a scan/get for example. This will just up the thread pressure on the datanode.
          Hide
          Jay Booth added a comment -

          BENCHMARK UPDATE:

          For single threaded reads of 100MB, the new implementation is about the same, a hair faster, probably because I have the (now tuneable) packet size cranked up to 512kb. DFS came in around 90MB/s, my new implementation with the higher packet sizes averaged around 95MB/s. (30 runs each over a 100MB file on a machine with 7G ram, so we're probably reading from FS cache for most of those reads)

          For multithreaded reads, the new implementation seems to have some kind of synchronization chokepoint, probably in the commons-pool which I'm using to grab the file handles needed to process each packet. Commons-pool predates Java 5 so they're probably using good-ol synchronized when something based around a ReadWriteLock could be better for our needs here. I'll write something up when I get a chance.

          Numbers were: 20 threads, DFS, 5MB/s/thread – multiplexing, 1.5 MB/s/thread. In both cases, the first couple threads to start and last couple to finish ran way fast because they weren't contending for disk or network resources – then when contention started, multiplexing peaked out lower. The block fetching is the only system wide point of synchronization (aside from a very brief one to do round-robin arithmetic to pick the right Selector when adding new threads). Adjusting the number of SelectorThreads for the Multiplexing implementation didn't appreciably change results, so I have to figure it's locking on that BlockChannelPool rather than the very brief lock on the way into a given SelectorThread.

          Also, I cleaned up the code a bit and integrated with SimulatedFSDataset so that it seemed all tests were passing. I'll run a full ant test and report whether it fails.

          I'll post the newest patch later tonight or tomorrow morning along with an updated architecture description, estimate I should get some time to throw together a more performant BlockChannelPool over the next week or two.

          Show
          Jay Booth added a comment - BENCHMARK UPDATE: For single threaded reads of 100MB, the new implementation is about the same, a hair faster, probably because I have the (now tuneable) packet size cranked up to 512kb. DFS came in around 90MB/s, my new implementation with the higher packet sizes averaged around 95MB/s. (30 runs each over a 100MB file on a machine with 7G ram, so we're probably reading from FS cache for most of those reads) For multithreaded reads, the new implementation seems to have some kind of synchronization chokepoint, probably in the commons-pool which I'm using to grab the file handles needed to process each packet. Commons-pool predates Java 5 so they're probably using good-ol synchronized when something based around a ReadWriteLock could be better for our needs here. I'll write something up when I get a chance. Numbers were: 20 threads, DFS, 5MB/s/thread – multiplexing, 1.5 MB/s/thread. In both cases, the first couple threads to start and last couple to finish ran way fast because they weren't contending for disk or network resources – then when contention started, multiplexing peaked out lower. The block fetching is the only system wide point of synchronization (aside from a very brief one to do round-robin arithmetic to pick the right Selector when adding new threads). Adjusting the number of SelectorThreads for the Multiplexing implementation didn't appreciably change results, so I have to figure it's locking on that BlockChannelPool rather than the very brief lock on the way into a given SelectorThread. Also, I cleaned up the code a bit and integrated with SimulatedFSDataset so that it seemed all tests were passing. I'll run a full ant test and report whether it fails. I'll post the newest patch later tonight or tomorrow morning along with an updated architecture description, estimate I should get some time to throw together a more performant BlockChannelPool over the next week or two.
          Hide
          Todd Lipcon added a comment -

          The 5M/sec vs 1.5M/sec for contended read seems pretty bad. We definitely need to sort that out before considering putting this in.

          Can you try running a test with 100-200 concurrent clients each reading 32K chunks with the pread API? this would simulate an hbase-like workload.

          Show
          Todd Lipcon added a comment - The 5M/sec vs 1.5M/sec for contended read seems pretty bad. We definitely need to sort that out before considering putting this in. Can you try running a test with 100-200 concurrent clients each reading 32K chunks with the pread API? this would simulate an hbase-like workload.
          Hide
          ryan rawson added a comment -

          Great work, but Todd is dead on, we need to look at the few hundred but small chunk read scenario. It's currently where we hurt badly, and one of the things hopefully this patch can address. I would be happy to test this out when you are ready.

          Show
          ryan rawson added a comment - Great work, but Todd is dead on, we need to look at the few hundred but small chunk read scenario. It's currently where we hurt badly, and one of the things hopefully this patch can address. I would be happy to test this out when you are ready.
          Hide
          Jay Booth added a comment -

          Absolutely – every packet send has to fetch the BlockChannel that it's transferring from out of a shared pool. Given that I'm using commons-pool for this, it's probably a straight "synchronized" on every get, which would predictably kill performance. Commons-pool is more built around the idea of a nonsharable database connection anyways, so I'll write my own custom thingy that works off of a ReadWriteLock and allows different threads to share the same BlockChannel instance (since BlockChannel only exposes threadsafe channel operations).

          If that doesn't do it, well, I'll see if I can figure it out but I agree that this is not performant enough yet. I think there's an argument for committing this with equivalent performance because it simplifies the read pipeline on the datanode side and consumes fewer resources (selectors/filechannels), but 1/3 the throughput, no way. On the upside, it means I won't be adding new dependencies with this patch anymore once I rebuild that block channel pool. (Kinda jumping the gun here on assuming the cause but it's literally the only source of systemwide synchronization, and things only slowed down once I added a bunch of threads, so...)

          I could definitely set that random read test up, once I get the slow pool issue sorted out. Assuming that the pool is the issue, this setup should save a bunch of time specifically in that scenario – no per-request buffer allocation or file opening.

          Show
          Jay Booth added a comment - Absolutely – every packet send has to fetch the BlockChannel that it's transferring from out of a shared pool. Given that I'm using commons-pool for this, it's probably a straight "synchronized" on every get, which would predictably kill performance. Commons-pool is more built around the idea of a nonsharable database connection anyways, so I'll write my own custom thingy that works off of a ReadWriteLock and allows different threads to share the same BlockChannel instance (since BlockChannel only exposes threadsafe channel operations). If that doesn't do it, well, I'll see if I can figure it out but I agree that this is not performant enough yet. I think there's an argument for committing this with equivalent performance because it simplifies the read pipeline on the datanode side and consumes fewer resources (selectors/filechannels), but 1/3 the throughput, no way. On the upside, it means I won't be adding new dependencies with this patch anymore once I rebuild that block channel pool. (Kinda jumping the gun here on assuming the cause but it's literally the only source of systemwide synchronization, and things only slowed down once I added a bunch of threads, so...) I could definitely set that random read test up, once I get the slow pool issue sorted out. Assuming that the pool is the issue, this setup should save a bunch of time specifically in that scenario – no per-request buffer allocation or file opening.
          Hide
          Jay Booth added a comment -

          Here's my latest patch. I replaced the dependency on commons-pool for filechannel sharing with a ReadWriteLock-based pool where it only acquires a global lock to register newly opened channels or while cleaning up excess channels. I also have better integration with DataXceiverServer.

          I haven't had time to benchmark and won't for a week or so, but I'm hoping for equal speed streaming and better performance for random read.

          ARCHITECTURE

          • DataXCeiverServer now reads the first byte of every request (OP)
          • DataXCeivers are now instantiated with an op and inputstream
          • If Op was READ_BLOCK and multiplex is enabled, dispatch thread registers with MultiplexedBlockSender and immediately dies
          • MultiplexedBlockSender.register() assigns requests across a configurable pool of SelectorThreads, round robin
          • Each SelectorThread maintains selector and buffer, continually polls for writable and writes packets, sharing resources across requests
          • Maintains current wire protocol
          • Packet based model adaptable to asynchronous disk I/O in the future
          • Possibly adaptable for local reads by writing packets to a WritableByteChannel wrapping a ByteBuffer
          • Easy to implement connection re-use and keep-alive parameter by dumping threads into a read Selector and re-registering or closing as necessary
          Show
          Jay Booth added a comment - Here's my latest patch. I replaced the dependency on commons-pool for filechannel sharing with a ReadWriteLock-based pool where it only acquires a global lock to register newly opened channels or while cleaning up excess channels. I also have better integration with DataXceiverServer. I haven't had time to benchmark and won't for a week or so, but I'm hoping for equal speed streaming and better performance for random read. ARCHITECTURE DataXCeiverServer now reads the first byte of every request (OP) DataXCeivers are now instantiated with an op and inputstream If Op was READ_BLOCK and multiplex is enabled, dispatch thread registers with MultiplexedBlockSender and immediately dies MultiplexedBlockSender.register() assigns requests across a configurable pool of SelectorThreads, round robin Each SelectorThread maintains selector and buffer, continually polls for writable and writes packets, sharing resources across requests Maintains current wire protocol Packet based model adaptable to asynchronous disk I/O in the future Possibly adaptable for local reads by writing packets to a WritableByteChannel wrapping a ByteBuffer Easy to implement connection re-use and keep-alive parameter by dumping threads into a read Selector and re-registering or closing as necessary
          Hide
          Zlatin Balevsky added a comment -

          I see a problem with doing the disk read on the same thread that is doing the select()-ing; the round-robining of several selector threads doesn't help you avoid a situation where a channel is writable, but the selecting thread is stuck in a transferTo call to another channel even if there are other selector threads in handlers[] available. With an architecture like this you will always perform worse than a thread-per-stream approach.

          Instead you could have a single selector thread that blocks only on select() and never does any disk io (including creation of RandomAccessFile objects). It simply dispatches the writable channels to a threadpool that does the actual transferTo calls.

          Show
          Zlatin Balevsky added a comment - I see a problem with doing the disk read on the same thread that is doing the select()-ing; the round-robining of several selector threads doesn't help you avoid a situation where a channel is writable, but the selecting thread is stuck in a transferTo call to another channel even if there are other selector threads in handlers[] available. With an architecture like this you will always perform worse than a thread-per-stream approach. Instead you could have a single selector thread that blocks only on select() and never does any disk io (including creation of RandomAccessFile objects). It simply dispatches the writable channels to a threadpool that does the actual transferTo calls.
          Hide
          Todd Lipcon added a comment -

          Have a few comments on the code, but before I write an in depth review, I'm interested in seeing the benchmarks.

          Show
          Todd Lipcon added a comment - Have a few comments on the code, but before I write an in depth review, I'm interested in seeing the benchmarks.
          Hide
          Jay Booth added a comment -

          Yeah, I'll do my best to get benchmarks by the end of the weekend, kind of a crazy week this week and I moved this past weekend, so I don't have a ton of time. Todd, if you feel like blasting a couple stream-of-consciousness comments to me via email, go right ahead, otherwise I'll run the benchmarks this weekend and wait for the well-written version .

          Zlatin, I originally had a similar architecture to what you're describing, using a BlockingQueue to funnel the actual work to a threadpool, but I had some issues with being able to get the locking quite right, either I wasn't getting things into the queue as fast as possible, or I was burning a lot of empty cycles in the selector thread. Specifically, I can't cancel a SelectionKey and then re-register with the same selector afterwards, it leads to exceptions, so my Selector thread was spinning in a tight loop verifying that, yes, all of these writable SelectionKeys are currently enqueued for work, whenever anything was being processed. But that was a couple iterations ago, maybe I'll have better luck trying it now. What we really need is a libevent-like framework, I'll spend a little time reviewing the outward facing API for that and scratching my noggin.

          Ultimately, only so much I/O can actually happen at a time before the disk is swamped, so it might be that a set of, say, 32 selector threads gets the same performance as 1024 threads. In that case, we'd be taking up fewer resources for the same performance. At any rate, I need to benchmark before speculating further.

          Show
          Jay Booth added a comment - Yeah, I'll do my best to get benchmarks by the end of the weekend, kind of a crazy week this week and I moved this past weekend, so I don't have a ton of time. Todd, if you feel like blasting a couple stream-of-consciousness comments to me via email, go right ahead, otherwise I'll run the benchmarks this weekend and wait for the well-written version . Zlatin, I originally had a similar architecture to what you're describing, using a BlockingQueue to funnel the actual work to a threadpool, but I had some issues with being able to get the locking quite right, either I wasn't getting things into the queue as fast as possible, or I was burning a lot of empty cycles in the selector thread. Specifically, I can't cancel a SelectionKey and then re-register with the same selector afterwards, it leads to exceptions, so my Selector thread was spinning in a tight loop verifying that, yes, all of these writable SelectionKeys are currently enqueued for work, whenever anything was being processed. But that was a couple iterations ago, maybe I'll have better luck trying it now. What we really need is a libevent-like framework, I'll spend a little time reviewing the outward facing API for that and scratching my noggin. Ultimately, only so much I/O can actually happen at a time before the disk is swamped, so it might be that a set of, say, 32 selector threads gets the same performance as 1024 threads. In that case, we'd be taking up fewer resources for the same performance. At any rate, I need to benchmark before speculating further.
          Hide
          Zlatin Balevsky added a comment -

          Jay,

          the selector thread is likely busylooping because select() will return immediately if any channels are writable. Cancelling takes a select() call and you cannot re-register the channel until the key has been properly cancelled and removed from the selector key sets. It is easier to turn write interest off before passing the writable channel to the threadpool. When the threadpool is done with transferTo(), pass the channel back to the select()-ing thread and instruct it to turn write interest back on. (Do not change the interest outside the selecting thread.)

          Hope this helps.

          Show
          Zlatin Balevsky added a comment - Jay, the selector thread is likely busylooping because select() will return immediately if any channels are writable. Cancelling takes a select() call and you cannot re-register the channel until the key has been properly cancelled and removed from the selector key sets. It is easier to turn write interest off before passing the writable channel to the threadpool. When the threadpool is done with transferTo(), pass the channel back to the select()-ing thread and instruct it to turn write interest back on. (Do not change the interest outside the selecting thread.) Hope this helps.
          Hide
          Jay Booth added a comment -

          Thanks Zlatin, I think you're right. I'll look at finding a way to remove writable interest without cancelling the key, that could fix the busy looping issue, then I could use a condition to ensure wakeup when something is newly writable-interested (via completed packet or new request) and refactor back to a single selector thread and several executing threads. I'll make a copy of the patch and try benchmarking both methods.

          Show
          Jay Booth added a comment - Thanks Zlatin, I think you're right. I'll look at finding a way to remove writable interest without cancelling the key, that could fix the busy looping issue, then I could use a condition to ensure wakeup when something is newly writable-interested (via completed packet or new request) and refactor back to a single selector thread and several executing threads. I'll make a copy of the patch and try benchmarking both methods.
          Hide
          Jay Booth added a comment -

          New patch – Took Zlatin's advice and utilized selectionKey.interestOps(0) to avoid busy waits, so we're back to a single selector and an ExecutorService. The ExecutorService reuses threads if possible, destroying threads that haven't been used in 60 seconds. Analyzed logs and the selectorThread doesn't seem to busy wait ever. Buffers are now stored in threadlocals and allocated per thread (they're now HeapByteBuffers since we might have some churn and most of our transfer is using transferTo anyways). Still uses shared BlockChannelPool implemented via ReadWriteLock.

          I think this will be pretty good, will benchmark tonight.

          Show
          Jay Booth added a comment - New patch – Took Zlatin's advice and utilized selectionKey.interestOps(0) to avoid busy waits, so we're back to a single selector and an ExecutorService. The ExecutorService reuses threads if possible, destroying threads that haven't been used in 60 seconds. Analyzed logs and the selectorThread doesn't seem to busy wait ever. Buffers are now stored in threadlocals and allocated per thread (they're now HeapByteBuffers since we might have some churn and most of our transfer is using transferTo anyways). Still uses shared BlockChannelPool implemented via ReadWriteLock. I think this will be pretty good, will benchmark tonight.
          Hide
          Jay Booth added a comment -

          New patch and better benchmarks:

          Environment:
          8x2GHz, 7GB RAM, namenode and dfs client
          8x2GHz, 7GB RAM, datanode

          Streaming:
          Single threaded: 60 runs over 100MB file, presumed in memory so network is chokepoint
          Current DFS : 92MB/s over 60 runs
          Multiplex : 97 MB/s over 60 runs

          • Either random variation, or maybe larger packet size helps

          Multi-threaded - 32 threads reading 100MB file, 60X each
          Both around 3.25MB/s/thread, 104 MB/s aggregate
          Network saturation

          Random reads:
          The multiplexed implementation saves about 1.5 ms, probably by avoiding extra file-opens and buffer allocation.

          • 5 iterations of 2000 reads each, 32kb, front of file, singlethreaded
          • splits for current DFS: 5.3, 4.6, 5.0, 4.4, 6.4
          • splits for multiplex: 3.2, 3.0, 4.6, 3.3 ,3.2
          • multithreaded concurrent read speeds on a single host converged with more threads – probably client-side delay negotiating lots of new tcp connections

          File handle consumption:
          Both "rest" at 401 open files (mostly jars)

          When doing random reads across 128 threads, BlockSender spikes to the 1150, opening a blockfile, metafile, selector, and socket for each concurrent connection.

          MultiplexedBlockSender only jumps to 530, with just the socket as a per-connection resource, blockfiles, metafiles and the single selector are shared.

          I'll post a comment later with an updated description of the patch, and when I get a chance, I'll run some more disk-bound benchmarks, I think the asynchronous approach will pay some dividends there by letting the operating system do more of the work.

          Super brief patch notes:
          eliminated silly add'l dependency on commons-math, now has no new dependencies
          incorporated Zlatin's suggestions upthread to do asynchronous I/O, 1 shared selector
          BlockChannelPool is shared across threads
          Buffers are threadlocal so they'll tend to be re-used rather than re-allocated

          Show
          Jay Booth added a comment - New patch and better benchmarks: Environment: 8x2GHz, 7GB RAM, namenode and dfs client 8x2GHz, 7GB RAM, datanode Streaming: Single threaded: 60 runs over 100MB file, presumed in memory so network is chokepoint Current DFS : 92MB/s over 60 runs Multiplex : 97 MB/s over 60 runs Either random variation, or maybe larger packet size helps Multi-threaded - 32 threads reading 100MB file, 60X each Both around 3.25MB/s/thread, 104 MB/s aggregate Network saturation Random reads: The multiplexed implementation saves about 1.5 ms, probably by avoiding extra file-opens and buffer allocation. 5 iterations of 2000 reads each, 32kb, front of file, singlethreaded splits for current DFS: 5.3, 4.6, 5.0, 4.4, 6.4 splits for multiplex: 3.2, 3.0, 4.6, 3.3 ,3.2 multithreaded concurrent read speeds on a single host converged with more threads – probably client-side delay negotiating lots of new tcp connections File handle consumption: Both "rest" at 401 open files (mostly jars) When doing random reads across 128 threads, BlockSender spikes to the 1150, opening a blockfile, metafile, selector, and socket for each concurrent connection. MultiplexedBlockSender only jumps to 530, with just the socket as a per-connection resource, blockfiles, metafiles and the single selector are shared. I'll post a comment later with an updated description of the patch, and when I get a chance, I'll run some more disk-bound benchmarks, I think the asynchronous approach will pay some dividends there by letting the operating system do more of the work. Super brief patch notes: eliminated silly add'l dependency on commons-math, now has no new dependencies incorporated Zlatin's suggestions upthread to do asynchronous I/O, 1 shared selector BlockChannelPool is shared across threads Buffers are threadlocal so they'll tend to be re-used rather than re-allocated
          Hide
          Zlatin Balevsky added a comment -

          Current DFS : 92MB/s over 60 runs

          Multiplex : 97 MB/s over 60 runs

          Either random variation, or maybe larger packet size helps

          A http://en.wikipedia.org/wiki/Student's_t-test will help you figure out if this difference is statistically significant or can be attributed to random variation. It is an essential tool when benchmarking modifications. The R project distro will make it trivial to perform.

          Show
          Zlatin Balevsky added a comment - Current DFS : 92MB/s over 60 runs Multiplex : 97 MB/s over 60 runs Either random variation, or maybe larger packet size helps A http://en.wikipedia.org/wiki/Student's_t-test will help you figure out if this difference is statistically significant or can be attributed to random variation. It is an essential tool when benchmarking modifications. The R project distro will make it trivial to perform.
          Hide
          dhruba borthakur added a comment -

          Thsi patch uses nio to multiplex IO over sockets from the clients. Does this patch also do non-blocking IO to the block and block.meta files?

          Show
          dhruba borthakur added a comment - Thsi patch uses nio to multiplex IO over sockets from the clients. Does this patch also do non-blocking IO to the block and block.meta files?
          Hide
          Jay Booth added a comment -

          Yeah, it only uses nonblocking pread ops on the block and block.meta files.. it sends the packet header and checksums in one packet (maybe just part of the checksums if TCP buff was full), then repeatedly makes requests to send PACKET_LENGTH (default 512kb) bytes until they're sent. When I had some trace logging enabled, I could see the TCP window scale up.. first request sent 96k in a packet, then it scaled up to 512k per packet after a few.

          Here's a (simplified) breakdown of the control structures and main loop:

          {{
          DataXCeiverServer – accepts conns, creates thread per conn
          From thread:
          read OP, blocking
          if we're a read request and multiplex enabled, delegate to MultiplexedBlockSender and die
          otherwise instantiate DataXCeiver (which now takes op as an arg) and call xceiver.run()}}

          MultiplexedBlockSender // maintains ExecutorService, SelectorThread and exposes public register() method
          register(Socket conn); // configures nonblocking, sets up Connection object, dispatches the first packet-send as an optimization, then puts the Future<Connection> in an inbox for the selector thread

          SelectorThread // maintains Selector, BlockingQueue<Future<Connection>> inbox, LinkedList<Future<Connection>> processingQueue
          main loop:
          1) pull all futures from inbox, add to processingqueue
          2) iterate/poll/remove Futures from processingQueue, then close/re-register those that finished sending a packet as appropriate (linear time, but pretty fast)
          3) select
          4) dispatch selected connections, add their Futures to processingQueue

          Connection.sendPacket(BlockChannelPool, ByteBuffer) // workhorse method, invoked via Callable
          maintains a bunch of internal state variables per connection
          fetches BlockChannel object from BlockChannelPool – BlockChannel only exposes p-read methods for underlying channels
          buffers packet header and sums, sends, records how much successfully sent – if less than 100%, return and wait for writable
          tries to send PACKET_LENGTH bytes from main file via transferTo, if less than 100%, return and wait for writable
          marks self as either FINISHED or READY, depending on if that was the last packet
          }}

          Regarding file IO, I don't know if it's faster to send the packet header as it's own 13 byte packet and use transferTo for the meta file, or to do what I'm doing now and buffer them into one packet. I feel like it'll be a wash.. or at any rate a minor difference because the checksums are so much smaller than the main data.

          What do people think about a test regime for this? It's a really big set of changes but it opens up a lot of doors (particularly connection re-use with that register() paradigm), seems to perform equal/better depending on the case, gets a big win on open file descriptors and factors all of the server-side protocol logic into one method, instead of spread out across several classes.

          I certainly understand being hesitant to commit such a big change without some pretty extensive testing, but if anyone had any direction as to what they'd like to see tested, that'd be awesome. I'm already planning on setting up some disk-bound benchmarks now that I've tested network-bound ones.. anything else that people want to see? It seems to pass all unit tests, my last run had a couple seemingly pre-existing failures but 99% of them passed. I guess I should do another full run and account for any that don't pass while I'm at it.

          Show
          Jay Booth added a comment - Yeah, it only uses nonblocking pread ops on the block and block.meta files.. it sends the packet header and checksums in one packet (maybe just part of the checksums if TCP buff was full), then repeatedly makes requests to send PACKET_LENGTH (default 512kb) bytes until they're sent. When I had some trace logging enabled, I could see the TCP window scale up.. first request sent 96k in a packet, then it scaled up to 512k per packet after a few. Here's a (simplified) breakdown of the control structures and main loop: {{ DataXCeiverServer – accepts conns, creates thread per conn From thread: read OP, blocking if we're a read request and multiplex enabled, delegate to MultiplexedBlockSender and die otherwise instantiate DataXCeiver (which now takes op as an arg) and call xceiver.run()}} MultiplexedBlockSender // maintains ExecutorService, SelectorThread and exposes public register() method register(Socket conn); // configures nonblocking, sets up Connection object, dispatches the first packet-send as an optimization, then puts the Future<Connection> in an inbox for the selector thread SelectorThread // maintains Selector, BlockingQueue<Future<Connection>> inbox, LinkedList<Future<Connection>> processingQueue main loop: 1) pull all futures from inbox, add to processingqueue 2) iterate/poll/remove Futures from processingQueue, then close/re-register those that finished sending a packet as appropriate (linear time, but pretty fast) 3) select 4) dispatch selected connections, add their Futures to processingQueue Connection.sendPacket(BlockChannelPool, ByteBuffer) // workhorse method, invoked via Callable maintains a bunch of internal state variables per connection fetches BlockChannel object from BlockChannelPool – BlockChannel only exposes p-read methods for underlying channels buffers packet header and sums, sends, records how much successfully sent – if less than 100%, return and wait for writable tries to send PACKET_LENGTH bytes from main file via transferTo, if less than 100%, return and wait for writable marks self as either FINISHED or READY, depending on if that was the last packet }} Regarding file IO, I don't know if it's faster to send the packet header as it's own 13 byte packet and use transferTo for the meta file, or to do what I'm doing now and buffer them into one packet. I feel like it'll be a wash.. or at any rate a minor difference because the checksums are so much smaller than the main data. What do people think about a test regime for this? It's a really big set of changes but it opens up a lot of doors (particularly connection re-use with that register() paradigm), seems to perform equal/better depending on the case, gets a big win on open file descriptors and factors all of the server-side protocol logic into one method, instead of spread out across several classes. I certainly understand being hesitant to commit such a big change without some pretty extensive testing, but if anyone had any direction as to what they'd like to see tested, that'd be awesome. I'm already planning on setting up some disk-bound benchmarks now that I've tested network-bound ones.. anything else that people want to see? It seems to pass all unit tests, my last run had a couple seemingly pre-existing failures but 99% of them passed. I guess I should do another full run and account for any that don't pass while I'm at it.
          Hide
          dhruba borthakur added a comment -

          Thanks for the description Jay. What is ur feedback about doing a similar thing (as a follow up patch) for the write code path too?

          Show
          dhruba borthakur added a comment - Thanks for the description Jay. What is ur feedback about doing a similar thing (as a follow up patch) for the write code path too?
          Hide
          Jay Booth added a comment -

          I'll enthusiastically cheerlead?

          More seriously, I'm willing to put in the work to get everyone to a comfort level with this patch so it gets committed in some form and we get some wins out of it, but the write path is more complicated, I don't understand it as well and I'm honestly going to need a little bit of a break over the summer. I'd love to help in whatever way I can but I'm probably not going to have the bandwidth to be the main driver of it in the short term.. I don't fully grok the write pipeline and the intricacies of append yet, so at the least someone else would have to be involved.

          Show
          Jay Booth added a comment - I'll enthusiastically cheerlead? More seriously, I'm willing to put in the work to get everyone to a comfort level with this patch so it gets committed in some form and we get some wins out of it, but the write path is more complicated, I don't understand it as well and I'm honestly going to need a little bit of a break over the summer. I'd love to help in whatever way I can but I'm probably not going to have the bandwidth to be the main driver of it in the short term.. I don't fully grok the write pipeline and the intricacies of append yet, so at the least someone else would have to be involved.
          Hide
          ryan rawson added a comment -

          this sounds great guys. For HBase, there is more need for the read path, rather than the write path. I would love to test this for HBase, and we could postpone/delay the write path code.

          Show
          ryan rawson added a comment - this sounds great guys. For HBase, there is more need for the read path, rather than the write path. I would love to test this for HBase, and we could postpone/delay the write path code.
          Hide
          Todd Lipcon added a comment -

          +1 on ignoring write for now. This mainly benefits high concurrency random reads, let's get this working and then think about the write path.

          I can devote some time to benchmarking and reviewing this more thoroughly, but likely not until the end of this month / beginning of next.

          Show
          Todd Lipcon added a comment - +1 on ignoring write for now. This mainly benefits high concurrency random reads, let's get this working and then think about the write path. I can devote some time to benchmarking and reviewing this more thoroughly, but likely not until the end of this month / beginning of next.
          Hide
          Raghu Angadi added a comment -

          Thanks Jay.

          This is a well known limitation on DataNode and I think an important problem to fix. Also +1 for fixing the reads first. I just skimmed through the patch. will read it more thoroughly.

          There are various performance gains mentioned in the Jira with thread pool even for common case with small number of readers.. I don't see how that is possible. Any benchmarks would be great. Thread pools do have resource benefits (threads, epoll descriptors etc) even in normal case.

          The way I see the problem and the solution :

          • Problem : Datanode should gracefully handle thousands of simultaneous readers.

          Implementation :

          • Use a thread pool to serve data.
            • I think it is better to use something like Netty rather than worry about all the selector handling as in this patch.
            • I might implement a version by reusing the existing code. will update in couple of weeks.
          • Most of the datanode installations have multiple disks (some even in double digits). I think it is very important to have separate pools for each partition. Otherwise, each disk will be accessed only as much as the slowest disk (when DN has enough load). That would be a huge loss of available disk bandwidth in the exact scenarios this feature is supposed help.
          Show
          Raghu Angadi added a comment - Thanks Jay. This is a well known limitation on DataNode and I think an important problem to fix. Also +1 for fixing the reads first. I just skimmed through the patch. will read it more thoroughly. There are various performance gains mentioned in the Jira with thread pool even for common case with small number of readers.. I don't see how that is possible. Any benchmarks would be great. Thread pools do have resource benefits (threads, epoll descriptors etc) even in normal case. The way I see the problem and the solution : Problem : Datanode should gracefully handle thousands of simultaneous readers. Implementation : Use a thread pool to serve data. I think it is better to use something like Netty rather than worry about all the selector handling as in this patch. I might implement a version by reusing the existing code. will update in couple of weeks. Most of the datanode installations have multiple disks (some even in double digits). I think it is very important to have separate pools for each partition. Otherwise, each disk will be accessed only as much as the slowest disk (when DN has enough load). That would be a huge loss of available disk bandwidth in the exact scenarios this feature is supposed help.
          Hide
          Todd Lipcon added a comment -

          I think it is better to use something like Netty rather than worry about all the selector handling as in this patch.

          I started some work on this last month and got a prototype working. The issue is that switching to Netty is rather wholesale - all of the existing code has to change away from using InputStream/OutputStream to use the Netty Channels. I managed to write some interfacing code to provide IS/OS interfaces around the Netty channels and got unit tests to pass, but there's a lot of inefficiency introduced by my wrappers.

          I'll continue to work on that as I have time, but I think this smaller patch is worth considering in the meantime.

          Show
          Todd Lipcon added a comment - I think it is better to use something like Netty rather than worry about all the selector handling as in this patch. I started some work on this last month and got a prototype working. The issue is that switching to Netty is rather wholesale - all of the existing code has to change away from using InputStream/OutputStream to use the Netty Channels. I managed to write some interfacing code to provide IS/OS interfaces around the Netty channels and got unit tests to pass, but there's a lot of inefficiency introduced by my wrappers. I'll continue to work on that as I have time, but I think this smaller patch is worth considering in the meantime.
          Hide
          Zlatin Balevsky added a comment -

          I think it is very important to have separate pools for each partition

          +1

          Show
          Zlatin Balevsky added a comment - I think it is very important to have separate pools for each partition +1
          Hide
          Jay Booth added a comment -

          .bq I think it is very important to have separate pools for each partition. Otherwise, each disk will be accessed only as much as the slowest disk (when DN has enough load).

          This would be the case if I were using a fixed-size thread pool and a LinkedBlockingQueue – but I'm not, see Executors.newCachedThreadPool(), it's actually bounded at Integer.MAX_VALUE threads and uses a SynchronousQueue. If a new thread is needed in order to start work on a task immediately, it's created. Otherwise, an existing waiting thread will be re-used. (Threads are purged if they've been idle for 60 seconds). Either way, the underlying I/O request is dispatched pretty much immediately after the connection is writable. So I don't see why separate pools per partition would help anything, the operating system will handle IO requests as it can and put threads into runnable state as it can regardless of which pool they're in.

          RE: Netty, I'm not very knowledgeable about it beyond the Cliff's Notes version, but my code dealing with the Selector is pretty small – the main loop is under 75 lines, and java.util.concurrent does most of the heavy lifting. Most of the code is dealing with application and protocol specifics. So my instinct in general is that adding a framework may actually increase the amount of code, especially if there's any mismatches between what we're doing and what it wants us to do (the packet-header, sums data, main data format is pretty specific to us). Plus, as Todd said, we can't really change the blocking IO nature of the main accept() loop in DataXceiverServer without this becoming a much bigger patch, although I agree that we should go there in general. That being said, better is better, so if a Netty implementation took up fewer lines of code and performed better, then that speaks for itself.

          Show
          Jay Booth added a comment - .bq I think it is very important to have separate pools for each partition. Otherwise, each disk will be accessed only as much as the slowest disk (when DN has enough load). This would be the case if I were using a fixed-size thread pool and a LinkedBlockingQueue – but I'm not, see Executors.newCachedThreadPool(), it's actually bounded at Integer.MAX_VALUE threads and uses a SynchronousQueue. If a new thread is needed in order to start work on a task immediately, it's created. Otherwise, an existing waiting thread will be re-used. (Threads are purged if they've been idle for 60 seconds). Either way, the underlying I/O request is dispatched pretty much immediately after the connection is writable. So I don't see why separate pools per partition would help anything, the operating system will handle IO requests as it can and put threads into runnable state as it can regardless of which pool they're in. RE: Netty, I'm not very knowledgeable about it beyond the Cliff's Notes version, but my code dealing with the Selector is pretty small – the main loop is under 75 lines, and java.util.concurrent does most of the heavy lifting. Most of the code is dealing with application and protocol specifics. So my instinct in general is that adding a framework may actually increase the amount of code, especially if there's any mismatches between what we're doing and what it wants us to do (the packet-header, sums data, main data format is pretty specific to us). Plus, as Todd said, we can't really change the blocking IO nature of the main accept() loop in DataXceiverServer without this becoming a much bigger patch, although I agree that we should go there in general. That being said, better is better, so if a Netty implementation took up fewer lines of code and performed better, then that speaks for itself.
          Hide
          Raghu Angadi added a comment -

          > RE: Netty, I'm not very knowledgeable about it beyond the Cliff's Notes version, but my code dealing with the Selector is pretty small - the main loop is under 75 lines, and java.util.concurrent does most of the heavy lifting

          Jay, I think is ok to ignore Netty for this jira. it could be re-factored later.

          >> I think it is very important to have separate pools for each partition.
          > This would be the case if I were using a fixed-size thread pool and a LinkedBlockingQueue - but I'm not, see Executors.newCachedThreadPool(),

          hmm.. does it mean that if you have thousand clients and the load is disk bound, we end up with 1000 threads?

          Show
          Raghu Angadi added a comment - > RE: Netty, I'm not very knowledgeable about it beyond the Cliff's Notes version, but my code dealing with the Selector is pretty small - the main loop is under 75 lines, and java.util.concurrent does most of the heavy lifting Jay, I think is ok to ignore Netty for this jira. it could be re-factored later. >> I think it is very important to have separate pools for each partition. > This would be the case if I were using a fixed-size thread pool and a LinkedBlockingQueue - but I'm not, see Executors.newCachedThreadPool(), hmm.. does it mean that if you have thousand clients and the load is disk bound, we end up with 1000 threads?
          Hide
          Jay Booth added a comment -

          >>> I think it is very important to have separate pools for each partition.
          >> This would be the case if I were using a fixed-size thread pool and a LinkedBlockingQueue - but I'm not, see Executors.newCachedThreadPool(),

          >hmm.. does it mean that if you have thousand clients and the load is disk bound, we end up with 1000 threads?

          Yeah, although it'll likely turn out to be less than 1000 in practice.. If the requests are all short-lived, it could be significantly less than 1000 threads when you consider re-use, if it's 1000 long reads, it'll probably wind up being only a little less if at all. The threads themselves are really lightweight, the only resources attached to them are a ThreadLocal<ByteBuffer(8096)>. (8k seemed ok for the ByteBuffer because the header+checksums portion is always significantly less than that, and the main block file transfers are done using transferTo).

          I chose this approach after initially experimenting with a fixed-size threadpool and LinkedBlockingQueue because the handoff is faster and every pending IO request is guaranteed to become an actual disk-read syscall waiting on the operating system as fast as possible. This way, the operating system decides which disk request to fulfill first, taking advantage of the lower-level optimizations around disk IO. Since the threads are pretty lightweight and the lower-level calls do a better job of optimal fulfillment, I think this will work better than a fixed-size threadpool, where for example, 2 adjacent reads from separate threads could be separated from each other in time whereas the disk controller might fulfill both simultaneously and faster. This becomes even more important, I think, with the higher 512kb packet size – those are big chunks of work per-sycall that can be optimized by the underlying OS. Regarding the extra resource allocation for the threads – if we're disk-bound, then generally speaking a few extra memory resources shouldn't be a huge deal – the gains from dispatching more disk requests in parallel should outweigh the memory allocation and context switch costs.

          The above is all in theory – I haven't benchmarked parallel implementations head-to-head. But certainly for random reads, and likely for longer reads, this approach should get the syscall invoked as fast as possible. Switching between the two models would be pretty simple, just change the parameters we pass to the constructor for new ThreadPoolExecutorService().

          Show
          Jay Booth added a comment - >>> I think it is very important to have separate pools for each partition. >> This would be the case if I were using a fixed-size thread pool and a LinkedBlockingQueue - but I'm not, see Executors.newCachedThreadPool(), >hmm.. does it mean that if you have thousand clients and the load is disk bound, we end up with 1000 threads? Yeah, although it'll likely turn out to be less than 1000 in practice.. If the requests are all short-lived, it could be significantly less than 1000 threads when you consider re-use, if it's 1000 long reads, it'll probably wind up being only a little less if at all. The threads themselves are really lightweight, the only resources attached to them are a ThreadLocal<ByteBuffer(8096)>. (8k seemed ok for the ByteBuffer because the header+checksums portion is always significantly less than that, and the main block file transfers are done using transferTo). I chose this approach after initially experimenting with a fixed-size threadpool and LinkedBlockingQueue because the handoff is faster and every pending IO request is guaranteed to become an actual disk-read syscall waiting on the operating system as fast as possible. This way, the operating system decides which disk request to fulfill first, taking advantage of the lower-level optimizations around disk IO. Since the threads are pretty lightweight and the lower-level calls do a better job of optimal fulfillment, I think this will work better than a fixed-size threadpool, where for example, 2 adjacent reads from separate threads could be separated from each other in time whereas the disk controller might fulfill both simultaneously and faster. This becomes even more important, I think, with the higher 512kb packet size – those are big chunks of work per-sycall that can be optimized by the underlying OS. Regarding the extra resource allocation for the threads – if we're disk-bound, then generally speaking a few extra memory resources shouldn't be a huge deal – the gains from dispatching more disk requests in parallel should outweigh the memory allocation and context switch costs. The above is all in theory – I haven't benchmarked parallel implementations head-to-head. But certainly for random reads, and likely for longer reads, this approach should get the syscall invoked as fast as possible. Switching between the two models would be pretty simple, just change the parameters we pass to the constructor for new ThreadPoolExecutorService().
          Hide
          Zlatin Balevsky added a comment -

          The max and current sizes of the threadpool really should be exported as metrics if the unlimited model is used.

          Show
          Zlatin Balevsky added a comment - The max and current sizes of the threadpool really should be exported as metrics if the unlimited model is used.
          Hide
          Jay Booth added a comment -

          0.20.2 compatible patch!

          A couple people mentioned that it would be much easier for them to benchmark if I produced an 0.20.2 compatible patch. So here it is, it works, seems to pass all unit tests that I ran on it, and I even did a hadoop fs -put and hadoop fs -cat. But that's the entire extent of the testing, unit tests and a super-simple pseudodistributed operation.

          So anyone who wants to try this on some I/O bound jobs on a test 0.20.2 cluster and see if they have speedups, please feel free and report results.

          Show
          Jay Booth added a comment - 0.20.2 compatible patch! A couple people mentioned that it would be much easier for them to benchmark if I produced an 0.20.2 compatible patch. So here it is, it works, seems to pass all unit tests that I ran on it, and I even did a hadoop fs -put and hadoop fs -cat. But that's the entire extent of the testing, unit tests and a super-simple pseudodistributed operation. So anyone who wants to try this on some I/O bound jobs on a test 0.20.2 cluster and see if they have speedups, please feel free and report results.
          Hide
          Jay Booth added a comment -

          I heard all the cool kids are running HDFS-200 and HDFS-826 on their 0.20.2 installations these days, so I merged HDFS-918 with them.

          Also, nobody use the existing 0.20.2 patch, I'll delete now and post a new one tonight – it happens to be missing a very important Thread.start() invocation.

          Show
          Jay Booth added a comment - I heard all the cool kids are running HDFS-200 and HDFS-826 on their 0.20.2 installations these days, so I merged HDFS-918 with them. Also, nobody use the existing 0.20.2 patch, I'll delete now and post a new one tonight – it happens to be missing a very important Thread.start() invocation.
          Hide
          Jay Booth added a comment -

          Cleaned up a bug in the BlockChannelPool.cleanup() code, added new unit test, improved descriptions of new config values (useMultiplex, packetSize, maxOpenBlockChannels, minOpenBlockchannels (number to cleanup() to)).

          This patch is for branch 20, I'll post a new one against trunk tonight.

          Show
          Jay Booth added a comment - Cleaned up a bug in the BlockChannelPool.cleanup() code, added new unit test, improved descriptions of new config values (useMultiplex, packetSize, maxOpenBlockChannels, minOpenBlockchannels (number to cleanup() to)). This patch is for branch 20, I'll post a new one against trunk tonight.
          Hide
          Andrew Purtell added a comment -

          I applied hdfs-918-branch20.2.patch to vanilla 0.20.2 and built a new version with 'ant tar', then substituted the resulting Hadoop core and test jars for those bundled with HBase 0.20.3, and built a new version of that with 'ant tar', then built new AMIs using a well tested process that normally produces working HBase+Hadoop systems. HDFS appears to initialize fine (I see registration messages in the NN and DN logs) but the DFSClient in the HBase master cannot bootstrap:

          2010-03-30 22:31:17,690 INFO org.apache.hadoop.hdfs.DFSClient: Could not obtain block blk_3225003771095476151_1021 from any node: java.io.IOException: No live nodes contain current block
          2010-03-30 22:33:20,698 INFO org.apache.hadoop.hdfs.DFSClient: Could not obtain block blk_3225003771095476151_1021 from any node: java.io.IOException: No live nodes contain current block
          ...

          The EC2 stuff runs at INFO normally because this is about benchmarking, but I can switch to DEBUG and provide logs if it might be useful.

          Show
          Andrew Purtell added a comment - I applied hdfs-918-branch20.2.patch to vanilla 0.20.2 and built a new version with 'ant tar', then substituted the resulting Hadoop core and test jars for those bundled with HBase 0.20.3, and built a new version of that with 'ant tar', then built new AMIs using a well tested process that normally produces working HBase+Hadoop systems. HDFS appears to initialize fine (I see registration messages in the NN and DN logs) but the DFSClient in the HBase master cannot bootstrap: 2010-03-30 22:31:17,690 INFO org.apache.hadoop.hdfs.DFSClient: Could not obtain block blk_3225003771095476151_1021 from any node: java.io.IOException: No live nodes contain current block 2010-03-30 22:33:20,698 INFO org.apache.hadoop.hdfs.DFSClient: Could not obtain block blk_3225003771095476151_1021 from any node: java.io.IOException: No live nodes contain current block ... The EC2 stuff runs at INFO normally because this is about benchmarking, but I can switch to DEBUG and provide logs if it might be useful.
          Hide
          Jay Booth added a comment -

          Weird. We've been running an almost-the-same version of the patch on our dev cluster for a week and this version passed TestPRead and TestDataTransferProtocol.. admittedly this isn't the exact version we ran on our cluster so there could be a difference but it passes tests, I'm a little stymied.

          There weren't any exceptions or anything in the datanode log? That error will typically happen when it tries and fails to read the block from where it should be, so hopefully there will be some errors in the DN log.

          Show
          Jay Booth added a comment - Weird. We've been running an almost-the-same version of the patch on our dev cluster for a week and this version passed TestPRead and TestDataTransferProtocol.. admittedly this isn't the exact version we ran on our cluster so there could be a difference but it passes tests, I'm a little stymied. There weren't any exceptions or anything in the datanode log? That error will typically happen when it tries and fails to read the block from where it should be, so hopefully there will be some errors in the DN log.
          Hide
          Jay Booth added a comment -

          Straightened out the block not found thing with Andrew, that was on his end, but then he found a resource leak that's fixed here – I'll post a trunk patch which incorporates this fix and the previous fix shortly.

          Show
          Jay Booth added a comment - Straightened out the block not found thing with Andrew, that was on his end, but then he found a resource leak that's fixed here – I'll post a trunk patch which incorporates this fix and the previous fix shortly.
          Hide
          Jay Booth added a comment -

          Trunk patch with previous fixes.

          Show
          Jay Booth added a comment - Trunk patch with previous fixes.
          Hide
          Andrew Purtell added a comment -

          Interested in another test with a 0.20 branch patch Jay?

          Show
          Andrew Purtell added a comment - Interested in another test with a 0.20 branch patch Jay?
          Hide
          Jay Booth added a comment -

          Am I ever, this one should be good to go but I'm on my way out right now and won't be around to help if it breaks again. If you wanna give it a whirl, be my guest

          Show
          Jay Booth added a comment - Am I ever, this one should be good to go but I'm on my way out right now and won't be around to help if it breaks again. If you wanna give it a whirl, be my guest
          Hide
          Andrew Purtell added a comment -

          Same result as before with the patch "hdfs-918-branch20.2.patch 2010-04-01 03:36 AM". All logs here: http://hbase.s3.amazonaws.com/0.20.3-918-201004020111424808000-randomWrite-1.tar.bz2

          Show
          Andrew Purtell added a comment - Same result as before with the patch "hdfs-918-branch20.2.patch 2010-04-01 03:36 AM". All logs here: http://hbase.s3.amazonaws.com/0.20.3-918-201004020111424808000-randomWrite-1.tar.bz2
          Hide
          Andrew Purtell added a comment -

          Sorry, test which contains HBase regionserver abort (due to failure to roll HLog) is here: http://hbase.s3.amazonaws.com/0.20.3-918-201004020111424808000-sequentialWrite-1.tar.bz2 . But DN logs are similar for both tests.

          Show
          Andrew Purtell added a comment - Sorry, test which contains HBase regionserver abort (due to failure to roll HLog) is here: http://hbase.s3.amazonaws.com/0.20.3-918-201004020111424808000-sequentialWrite-1.tar.bz2 . But DN logs are similar for both tests.
          Hide
          Jay Booth added a comment -

          Well gosh darn.. maybe I should just borrow your scripts and try to run it a few times until I fix the problem. Don't suppose you have any lsof output?

          Show
          Jay Booth added a comment - Well gosh darn.. maybe I should just borrow your scripts and try to run it a few times until I fix the problem. Don't suppose you have any lsof output?
          Hide
          Jay Booth added a comment -

          Managed to get back to this.

          Rebased on branch-20-append.

          Fixed resource leak issue that apurtell identified.

          Runs through HBase PerformanceEvaluation on my workstation completely with default ulimit of 1024, no crashes.

          I'm going to try and benchmark this on a real cluster this weekend and report results. Happy Friday everyone

          Show
          Jay Booth added a comment - Managed to get back to this. Rebased on branch-20-append. Fixed resource leak issue that apurtell identified. Runs through HBase PerformanceEvaluation on my workstation completely with default ulimit of 1024, no crashes. I'm going to try and benchmark this on a real cluster this weekend and report results. Happy Friday everyone
          Hide
          Jay Booth added a comment -

          Benchmarked on EC2 this weekend, I set up 0.20.2-append clean, a copy with my multiplex patch applied, and a third copy which only ports filechannel pooling to the current architecture (can submit that patch later, it's at home).

          All runs were with HBase block caching disabled to highlight the difference in filesystem access speeds.

          This is running across a decently small dataset (little less than 1GB) so all files are presumably in memory for the majority of test duration.

          Run involved 6 clients reading 1,000,000 rows each divided over 10 mappers. Cluster setup was 3x EC2 High-CPU XL, 1 NN/JT/ZK/Master and 2x DN/TT/RS. Ran in 3 batches of 3 runs each. Cluster was restarted in between each batch for each run type because we're changing DN implementation.

          Topline numbers (rest are in document):

          Total Run Averages

          Test clean pool multiplex
          random 21159050.44 19448216.89 16806247
          scan 436106.89 442452.54 443262.56
          sequential 19298239.78 17871047.67 14987028.44

          Pool is 7.5% gain, multiplex is more like 20% for random reads

          Only batches 2+3 (batch 1 was a little messed up and doesn't track with others)
          Test clean pool multiplex
          random 20555308.67 18425017 16987643.33
          scan 426849 427277.98 448031
          sequential 18665323.67 16969885.83 15102404

          Pool is 10% gain, multiplex is 17% or so for random reads

          Per row for random read (batches 2+3 only):
          clean: 3.42ms
          pool: 3.07ms
          multiplex: 2.83ms

          Show
          Jay Booth added a comment - Benchmarked on EC2 this weekend, I set up 0.20.2-append clean, a copy with my multiplex patch applied, and a third copy which only ports filechannel pooling to the current architecture (can submit that patch later, it's at home). All runs were with HBase block caching disabled to highlight the difference in filesystem access speeds. This is running across a decently small dataset (little less than 1GB) so all files are presumably in memory for the majority of test duration. Run involved 6 clients reading 1,000,000 rows each divided over 10 mappers. Cluster setup was 3x EC2 High-CPU XL, 1 NN/JT/ZK/Master and 2x DN/TT/RS. Ran in 3 batches of 3 runs each. Cluster was restarted in between each batch for each run type because we're changing DN implementation. Topline numbers (rest are in document): Total Run Averages Test clean pool multiplex random 21159050.44 19448216.89 16806247 scan 436106.89 442452.54 443262.56 sequential 19298239.78 17871047.67 14987028.44 Pool is 7.5% gain, multiplex is more like 20% for random reads Only batches 2+3 (batch 1 was a little messed up and doesn't track with others) Test clean pool multiplex random 20555308.67 18425017 16987643.33 scan 426849 427277.98 448031 sequential 18665323.67 16969885.83 15102404 Pool is 10% gain, multiplex is 17% or so for random reads Per row for random read (batches 2+3 only): clean: 3.42ms pool: 3.07ms multiplex: 2.83ms
          Hide
          Jay Booth added a comment -

          Here's the patch with just the filechannel pool improvements. This one showed some decent performance improvements while incorporating existing logic regarding the bits and bytes of block reading – memoized block and all of that. The multiplexed implementation is currently missing memoized block.

          Would people be interested in another JIRA focusing on just the pooling improvements (.5ms per read or so)? I could forward-port that to trunk and then continue work on the multiplexing implementation which did seem to provide an additional performance boost.

          Show
          Jay Booth added a comment - Here's the patch with just the filechannel pool improvements. This one showed some decent performance improvements while incorporating existing logic regarding the bits and bytes of block reading – memoized block and all of that. The multiplexed implementation is currently missing memoized block. Would people be interested in another JIRA focusing on just the pooling improvements (.5ms per read or so)? I could forward-port that to trunk and then continue work on the multiplexing implementation which did seem to provide an additional performance boost.
          Hide
          Jean-Daniel Cryans added a comment -

          I tested the latest full patch (not the pool one) and didn't see any issue, although it was on a single machine and with block cache ON (but lots of evictions). I need to do some more proper testing on my test cluster.

          Would people be interested in another JIRA focusing on just the pooling improvements

          I am, and I think that having smaller patches will help getting this committed.

          Show
          Jean-Daniel Cryans added a comment - I tested the latest full patch (not the pool one) and didn't see any issue, although it was on a single machine and with block cache ON (but lots of evictions). I need to do some more proper testing on my test cluster. Would people be interested in another JIRA focusing on just the pooling improvements I am, and I think that having smaller patches will help getting this committed.
          Hide
          Edward Capriolo added a comment -

          Fundamentally using selectors is more efficient than the one-x-per-request model. It is not only the random-read of HBase that run into DataXCeiver issues. For example hive supports dynamic partitioning. A single query may output to several thousand partitions and I have had to raise DataXCeivers and /etc/security/limits.conf to account for this. (very frustrating to still be upping ulimits in the 21st century

          Also with solid-state-drive technology becoming a bigger part of the datacenter, the assumption that opening a socket per request is acceptable because the disk reads will be the bottleneck before the number of sockets on a system may not be correct for long.

          It looks to be a win for many use-cases and should not be significant to standard map-reduce use cases. What do we have to do to get this patch to go +1?

          Show
          Edward Capriolo added a comment - Fundamentally using selectors is more efficient than the one-x-per-request model. It is not only the random-read of HBase that run into DataXCeiver issues. For example hive supports dynamic partitioning. A single query may output to several thousand partitions and I have had to raise DataXCeivers and /etc/security/limits.conf to account for this. (very frustrating to still be upping ulimits in the 21st century Also with solid-state-drive technology becoming a bigger part of the datacenter, the assumption that opening a socket per request is acceptable because the disk reads will be the bottleneck before the number of sockets on a system may not be correct for long. It looks to be a win for many use-cases and should not be significant to standard map-reduce use cases. What do we have to do to get this patch to go +1?
          Hide
          stack added a comment -

          I applied the patch on 0.20-append branch. It applies cleanly still. Brought it up on an already loaded cluster and it seems to basically work. Running more tests now.

          Show
          stack added a comment - I applied the patch on 0.20-append branch. It applies cleanly still. Brought it up on an already loaded cluster and it seems to basically work. Running more tests now.
          Hide
          Todd Lipcon added a comment -

          Just took a swing through this patch. A few thoughts:

          • Ed: this is only on the read side, so number of xceivers for outputting thousands of partitions on a single DN won't be helped here. This patch helps the situation where multiple readers access the same file concurrently, but currently open it separately for each accessor.
          • Jay: I think the approach of splitting into smaller patches is good. The hdfs-918-pool.patch is pretty manageable, but it seems it could trivially be split in two - the first half just refactoring BlockSender to use the new ReadChannel interface, and the second half actually introducing the pool.

          Regarding thoughts on the channel pooling patch itself:

          • Did you do any experimentation of just adding a small LRU cache of the DataChecksum info for each block? I'm curious what percent of the latency improvements is due to avoiding open/close vs avoiding the extra "read" of the checksum header at the top of the meta file.
          • The pool holds its write lock while calling open() on blocks. It seems this lock only needs to be per-block-id rather than across the whole pool. My worry is that, if one disk is heavily loaded or broken in some way, the open() call could stall several seconds, and then all other reads even on other volumes could be held up.

          But, generally, I think this is promising! I think we could realistically get this in for 0.23.

          Show
          Todd Lipcon added a comment - Just took a swing through this patch. A few thoughts: Ed: this is only on the read side, so number of xceivers for outputting thousands of partitions on a single DN won't be helped here. This patch helps the situation where multiple readers access the same file concurrently, but currently open it separately for each accessor. Jay: I think the approach of splitting into smaller patches is good. The hdfs-918-pool.patch is pretty manageable, but it seems it could trivially be split in two - the first half just refactoring BlockSender to use the new ReadChannel interface, and the second half actually introducing the pool. Regarding thoughts on the channel pooling patch itself: Did you do any experimentation of just adding a small LRU cache of the DataChecksum info for each block? I'm curious what percent of the latency improvements is due to avoiding open/close vs avoiding the extra "read" of the checksum header at the top of the meta file. The pool holds its write lock while calling open() on blocks. It seems this lock only needs to be per-block-id rather than across the whole pool. My worry is that, if one disk is heavily loaded or broken in some way, the open() call could stall several seconds, and then all other reads even on other volumes could be held up. But, generally, I think this is promising! I think we could realistically get this in for 0.23.
          Hide
          Todd Lipcon added a comment -

          Another thought: it should be trivial to enable/disable the pooling behavior in BlockSender, right? If so we could treat this feature as "experimental" (off by default but easy to switch on) for one or two releases if not everyone feels comfortable with it.

          Show
          Todd Lipcon added a comment - Another thought: it should be trivial to enable/disable the pooling behavior in BlockSender, right? If so we could treat this feature as "experimental" (off by default but easy to switch on) for one or two releases if not everyone feels comfortable with it.
          Hide
          stack added a comment -

          I ran unit tests and the main difference was failure of '[junit] Test org.apache.hadoop.hdfs.TestFileConcurrentReader FAILED' (There seem to be a few failing tests already on this append branch).

          @Todd When you say we could get it into 0.23, you thinking all of it or just some pieces of this patch?

          Show
          stack added a comment - I ran unit tests and the main difference was failure of ' [junit] Test org.apache.hadoop.hdfs.TestFileConcurrentReader FAILED' (There seem to be a few failing tests already on this append branch). @Todd When you say we could get it into 0.23, you thinking all of it or just some pieces of this patch?
          Hide
          Todd Lipcon added a comment -

          When you say we could get it into 0.23, you thinking all of it or just some pieces of this patch?

          I only looked at the ReadChannel pool bit so far. Let's do one thing at a time If we get that in, no reason we can't move on to the next bit. Like I said above, for the more "scary" changes, I think it's important that we give some config flag to enable/disable the new behavior where we can.

          Show
          Todd Lipcon added a comment - When you say we could get it into 0.23, you thinking all of it or just some pieces of this patch? I only looked at the ReadChannel pool bit so far. Let's do one thing at a time If we get that in, no reason we can't move on to the next bit. Like I said above, for the more "scary" changes, I think it's important that we give some config flag to enable/disable the new behavior where we can.
          Hide
          stack added a comment -

          I'm game for enable/disable flag.

          I did some basic load testing over last few days. It holds up fine seemingly. I'll keep running loadings.

          Show
          stack added a comment - I'm game for enable/disable flag. I did some basic load testing over last few days. It holds up fine seemingly. I'll keep running loadings.
          Hide
          stack added a comment -

          Ran some read/write loading over a ten node cluster and all seems to work basically. Did a bit of log splunking. Nothing untoward.

          Show
          stack added a comment - Ran some read/write loading over a ten node cluster and all seems to work basically. Did a bit of log splunking. Nothing untoward.
          Hide
          Jay Booth added a comment -

          Hey all, sorry for the slow response, been swamped with the new year and all.

          RE: unit tests, at one point it was passing all tests, not sure if the tests changed or this changed but I can take a look at it.

          RE: 0.23, I can look at forward porting this again, but a lot of changes have gone in since then.

          @stack, were you testing the "only pooling" patch or the "with full multiplexing" patch?

          "Only pooling" would be much simpler to forward port, although I do think that the full multiplexing patch is pretty worthwhile. Aside from the small-but-significant performance gain, it was IMO much better factoring to have the DN-side logic all encapsulated in a Connection object which has sendPacket() repeatedly called, rather than a giant procedural loop that goes down and back up through several classes. The architecture also made keepalive pretty straightforward.. just throw that connection back into a listening pool when done, and make corresponding changes on client side. But, I guess that logic's been revised now anyways, so it'd be a significant piece of work to bring it all back up to date.

          Show
          Jay Booth added a comment - Hey all, sorry for the slow response, been swamped with the new year and all. RE: unit tests, at one point it was passing all tests, not sure if the tests changed or this changed but I can take a look at it. RE: 0.23, I can look at forward porting this again, but a lot of changes have gone in since then. @stack, were you testing the "only pooling" patch or the "with full multiplexing" patch? "Only pooling" would be much simpler to forward port, although I do think that the full multiplexing patch is pretty worthwhile. Aside from the small-but-significant performance gain, it was IMO much better factoring to have the DN-side logic all encapsulated in a Connection object which has sendPacket() repeatedly called, rather than a giant procedural loop that goes down and back up through several classes. The architecture also made keepalive pretty straightforward.. just throw that connection back into a listening pool when done, and make corresponding changes on client side. But, I guess that logic's been revised now anyways, so it'd be a significant piece of work to bring it all back up to date.
          Hide
          stack added a comment -

          I was testing full patch. Seemed fine. (I like the sound of an easy to implement keepalive).

          Show
          stack added a comment - I was testing full patch. Seemed fine. (I like the sound of an easy to implement keepalive).
          Hide
          Todd Lipcon added a comment -

          I rebased just the pooling part of the patch and posted at HDFS-1323

          Show
          Todd Lipcon added a comment - I rebased just the pooling part of the patch and posted at HDFS-1323
          Hide
          LiuLei added a comment -

          Hi everyone,

          What is state of the issues, can I applied hdfs-918-branch20.2.patch to hadoop1.0?

          Show
          LiuLei added a comment - Hi everyone, What is state of the issues, can I applied hdfs-918-branch20.2.patch to hadoop1.0?

            People

            • Assignee:
              Jay Booth
              Reporter:
              Jay Booth
            • Votes:
              2 Vote for this issue
              Watchers:
              40 Start watching this issue

              Dates

              • Created:
                Updated:

                Development