Hadoop HDFS
  1. Hadoop HDFS
  2. HDFS-223

Asynchronous IO Handling in Hadoop and HDFS

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      I think Hadoop needs utilities or framework to make it simpler to deal with generic asynchronous IO in Hadoop.

      Example use case :

      Its been a long standing problem that DataNode takes too many threads for data transfers. Each write operation takes up 2 threads at each of the datanodes and each read operation takes one irrespective of how much activity is on the sockets. The kinds of load that HDFS serves has been expanding quite fast and HDFS should handle these varied loads better. If there is a framework for non-blocking IO, read and write pipeline state machines could be implemented with async events on a fixed number of threads.

      A generic utility is better since it could be used in other places like DFSClient. DFSClient currently creates 2 extra threads for each file it has open for writing.

      Initially I started writing a primitive "selector", then tried to see if such facility already exists. Apache MINA seemed to do exactly this. My impression after looking the the interface and examples is that it does not give kind control we might prefer or need. First use case I was thinking of implementing using MINA was to replace "response handlers" in DataNode. The response handlers are simpler since they don't involve disk I/O. I asked on MINA user list, but looks like it can not be done, I think mainly because the sockets are already created.

      Essentially what I have in mind is similar to MINA, except that read and write of the sockets is done by the event handlers. The lowest layer essentially invokes selectors, invokes event handlers on single or on multiple threads. Each event handler is is expected to do some non-blocking work. We would of course have utility handler implementations that do read, write, accept etc, that are useful for simple processing.

      Sam Pullara mentioned that xSockets is more flexible. It is under GPL.

      Are there other such implementations we should look at?

      1. GrizzlyEchoServer.patch
        5 kB
        Raghu Angadi
      2. MinaEchoServer.patch
        43 kB
        Raghu Angadi

        Issue Links

          Activity

          Hide
          Ankur added a comment -

          You can extend NIOSession to wrap the non-blocking sockets. The wrapper can then be added to a NIOProcessor that can be run via an executor service. However taking a look at the trunk code, NIOProcessor seems to be marked final so it looks like it can't be extended to modify/add behaviour. Instead one can extend AbstractPollingIOProcessor to implement a custom NIOProcessor that takes care of doing non-blocking work on NIOSession.

          It doesn't look like there are other such libraries having Apache compatible license. So we are left with little choice.
          Apache Mina looks more complex to me than using Java NIO and we might end up writing more code than ideal to workaround its various limitations which would ultimately make our code unnecessarily more complex. This would offset the benefit of using an external NIO library.

          Unless we have a better option, I would suggest have something custom written that leaves all the flexibility and control with us.

          Show
          Ankur added a comment - You can extend NIOSession to wrap the non-blocking sockets. The wrapper can then be added to a NIOProcessor that can be run via an executor service. However taking a look at the trunk code, NIOProcessor seems to be marked final so it looks like it can't be extended to modify/add behaviour. Instead one can extend AbstractPollingIOProcessor to implement a custom NIOProcessor that takes care of doing non-blocking work on NIOSession. It doesn't look like there are other such libraries having Apache compatible license. So we are left with little choice. Apache Mina looks more complex to me than using Java NIO and we might end up writing more code than ideal to workaround its various limitations which would ultimately make our code unnecessarily more complex. This would offset the benefit of using an external NIO library. Unless we have a better option, I would suggest have something custom written that leaves all the flexibility and control with us.
          Hide
          steve_l added a comment -

          There is always the possibility of extending mina and getting those changes back into the codebase.

          Show
          steve_l added a comment - There is always the possibility of extending mina and getting those changes back into the codebase.
          Hide
          Ankur added a comment -

          I Would like to share more ideas on this one and possibly contribute.

          Show
          Ankur added a comment - I Would like to share more ideas on this one and possibly contribute.
          Hide
          Raghu Angadi added a comment -

          Thanks Ankur.

          Untill yesterday I was thinking of custom implementation, say leaner MINA . I actually started writing code. But it might just be better and faster to take the parts we want from MINA make necessary changes. will talk to Owen today about this approach.

          Some important changes :

          • Threading model : MINA's division of work is from pre-devpoll era I think. It divides the connections rather than individual pieces work among the threads since that divides the polling cost as well. But polling cost is virtually zero. Also since some of our handlers do disk I/O, it is better not to tie one connection to one worker thread. In DataNode's case we want to have a set of threads per disk.
          • We need another layer below IoSession. Mina's handlers deal with data already read or written by Mina's core. We will have another layer in between where another type of handlers can handle events like 'readReady()', writeReady(), acceptReady() etc.
          • Users can register existing sockets of course.

          If this sounds good I will prepare an initial version and we can go from there. I didn't look at xSockets since I didn't want it to influence my implementation (I am not sure of GPL implications).

          Show
          Raghu Angadi added a comment - Thanks Ankur. Untill yesterday I was thinking of custom implementation, say leaner MINA . I actually started writing code. But it might just be better and faster to take the parts we want from MINA make necessary changes. will talk to Owen today about this approach. Some important changes : Threading model : MINA's division of work is from pre-devpoll era I think. It divides the connections rather than individual pieces work among the threads since that divides the polling cost as well. But polling cost is virtually zero. Also since some of our handlers do disk I/O, it is better not to tie one connection to one worker thread. In DataNode's case we want to have a set of threads per disk. We need another layer below IoSession. Mina's handlers deal with data already read or written by Mina's core. We will have another layer in between where another type of handlers can handle events like 'readReady()', writeReady(), acceptReady() etc. Users can register existing sockets of course. If this sounds good I will prepare an initial version and we can go from there. I didn't look at xSockets since I didn't want it to influence my implementation (I am not sure of GPL implications).
          Hide
          Chad Walters added a comment -

          There has been some discussion on the Thrift list about better async IO support in both C++ and Java. I believe some work was done on an NIO-based implementation although I am not sure where that has ended up. Perhaps you'd stir up some interest if you sent a not to the Thrift mailing list.

          Show
          Chad Walters added a comment - There has been some discussion on the Thrift list about better async IO support in both C++ and Java. I believe some work was done on an NIO-based implementation although I am not sure where that has ended up. Perhaps you'd stir up some interest if you sent a not to the Thrift mailing list.
          Hide
          dhruba borthakur added a comment -

          +1 on getting async IO.

          Do folks believe that this will improve IO latency? Can DFSIOtest be used to demonstrate performance improvement?

          Show
          dhruba borthakur added a comment - +1 on getting async IO. Do folks believe that this will improve IO latency? Can DFSIOtest be used to demonstrate performance improvement?
          Hide
          Raghu Angadi added a comment - - edited

          > Do folks believe that this will improve IO latency? Can DFSIOtest be used to demonstrate performance improvement?

          I don't think it improves performance of the benchmarks (our benchmarks are not resource intensive). The main motivation is to use less resources to do the same work and handle very large number of clients gracefully. The requirement w.r.t performance should be to show no regressions in benchmarks.

          And motivation to have a framework rather than individual implementation (i.e. Hadoop RPC) is because we have different types of cases (with and without disk i/o etc) in multiple places in Hadoop.

          Show
          Raghu Angadi added a comment - - edited > Do folks believe that this will improve IO latency? Can DFSIOtest be used to demonstrate performance improvement? I don't think it improves performance of the benchmarks (our benchmarks are not resource intensive). The main motivation is to use less resources to do the same work and handle very large number of clients gracefully. The requirement w.r.t performance should be to show no regressions in benchmarks. And motivation to have a framework rather than individual implementation (i.e. Hadoop RPC) is because we have different types of cases (with and without disk i/o etc) in multiple places in Hadoop.
          Hide
          Raghu Angadi added a comment -

          In summary, async at user level can not improve latency. May be in the future if the OS and JVM support some real async disk I/O, performance could improve.

          Show
          Raghu Angadi added a comment - In summary, async at user level can not improve latency. May be in the future if the OS and JVM support some real async disk I/O, performance could improve.
          Hide
          Ankur added a comment -

          > Threading model ...

          > Users can register existing sockets of course ...

          The basic issue is with IOAcceptor & IOConnectors. Even though we can configure their threading model to use more IOProcessor threads allowing 1 processor thread to serve I/O on more than 1 connection, we still cannot use an IOProcessor standalone since its internal to acceptor/connector.
          I think we would need to extend an existing IOProcessor or implement our own so that we can run it standalone and configure its threading model to use more than 1 thread.

          > We need another layer below IoSession....

          I don't think this is required. We can ingest IOFilters to do any sort of pre-processing on data already read by mina core. Also read, write and other application specific events can be implemented via mina state machine http://mina.apache.org/introduction-to-mina-statemachine.html. But there is no official release of it yet. One has to get it from trunk.

          Show
          Ankur added a comment - > Threading model ... > Users can register existing sockets of course ... The basic issue is with IOAcceptor & IOConnectors. Even though we can configure their threading model to use more IOProcessor threads allowing 1 processor thread to serve I/O on more than 1 connection, we still cannot use an IOProcessor standalone since its internal to acceptor/connector. I think we would need to extend an existing IOProcessor or implement our own so that we can run it standalone and configure its threading model to use more than 1 thread. > We need another layer below IoSession.... I don't think this is required. We can ingest IOFilters to do any sort of pre-processing on data already read by mina core. Also read, write and other application specific events can be implemented via mina state machine http://mina.apache.org/introduction-to-mina-statemachine.html . But there is no official release of it yet. One has to get it from trunk.
          Hide
          dhruba borthakur added a comment -

          Since this is a very core piece of the datanode, it would be nice if the new code can be developed so that it can be switched on/off on demand.

          Show
          dhruba borthakur added a comment - Since this is a very core piece of the datanode, it would be nice if the new code can be developed so that it can be switched on/off on demand.
          Hide
          Raghu Angadi added a comment - - edited

          net/MinaEchoServer.java in the attached patch is a demo for something Hadoop can use initially. Here the channel is created by user code but I/O is handled by Mina.

          The 5 files in net/mina are essentially same as the files with the same names under org.apache.mina.transport.socket.nio. These have very minor modifications (search for "HADOOP" in the files).

          For this hack I had to modify the files rather than extending them since these are either marked 'final' or not public. I am not sure of policy behind public and package private stuff in Mina. Some very useful things are public and some are not.

          I think MINA is more of a 'server framework' rather than an 'NIO framework'.. it is so close to being both.

          Patch demos one of the features Hadoop would like. Though this is a hack, we can use this now to cut number of threads while writing data to HDFS by half (both at the client and at the datanode).

          To run the patch, copy mina-core-2.0.0-M2.jar from mina-2.0.0-M2.tar.gz to trunk/lib. Then apply the patch, and run 'ant' and bin/hadoop org.apache.hadoop.net.MinaEchoServer.

          Ankur, I am still wondering about the next steps. There does not seem to be much interest on MINA side. So it might be better to make changes then persuade them. But Hadoop might prefer to get the features in MINA first then start using in here. Its could be a bit of stalemate. Also I don't see much enthusiasm or priority for this on Hadoop side... may be the user base affected by the change needs to expand more, I am not sure. Mean while
          things like HADOOP-3859 keep the things up for a little longer.

          Edit : minor

          Show
          Raghu Angadi added a comment - - edited net/MinaEchoServer.java in the attached patch is a demo for something Hadoop can use initially. Here the channel is created by user code but I/O is handled by Mina. The 5 files in net/mina are essentially same as the files with the same names under org.apache.mina.transport.socket.nio . These have very minor modifications (search for "HADOOP" in the files). For this hack I had to modify the files rather than extending them since these are either marked 'final' or not public. I am not sure of policy behind public and package private stuff in Mina. Some very useful things are public and some are not. I think MINA is more of a 'server framework' rather than an 'NIO framework'.. it is so close to being both. Patch demos one of the features Hadoop would like. Though this is a hack, we can use this now to cut number of threads while writing data to HDFS by half (both at the client and at the datanode). To run the patch, copy mina-core-2.0.0-M2.jar from mina-2.0.0-M2.tar.gz to trunk/lib. Then apply the patch, and run 'ant' and bin/hadoop org.apache.hadoop.net.MinaEchoServer . Ankur, I am still wondering about the next steps. There does not seem to be much interest on MINA side. So it might be better to make changes then persuade them. But Hadoop might prefer to get the features in MINA first then start using in here. Its could be a bit of stalemate. Also I don't see much enthusiasm or priority for this on Hadoop side... may be the user base affected by the change needs to expand more, I am not sure. Mean while things like HADOOP-3859 keep the things up for a little longer. Edit : minor
          Hide
          Raghu Angadi added a comment -

          I think I like Grizzly better. Attached GrizzlyEchoServer.patch implements a simple echo server. The example deliberately avoids using available 'ReadFilter' and 'EchoFilter'.

          Some of the features (these could be good or bad in different perspectives) :

          • 'NIO Framework' is a separate part of Grizzly.
          • It lets user handle I/O at different levels.
          • While dealing with low level IO, user of course need to be more careful. For. e.g. the attached echo server needs to enable "OP_READ" in every call to 'execute()'. The documentation does not seem extensive but support might be pretty good.
          • Pretty much everything is public. If some thing is not public then it is mostly protected.

          I need to check how to control threads used by the framwork. Sometimes we might want all the work to be done on just one thread.

          To run the example: drop grizzly-framwork jar in trunk/lib, apply the patch under 'trunk', compile, and 'bin/hadoop org.apache.hadoop.net.GrizzlyEchoServer'.

          Show
          Raghu Angadi added a comment - I think I like Grizzly better. Attached GrizzlyEchoServer.patch implements a simple echo server. The example deliberately avoids using available 'ReadFilter' and 'EchoFilter'. Some of the features (these could be good or bad in different perspectives) : 'NIO Framework' is a separate part of Grizzly. It lets user handle I/O at different levels. While dealing with low level IO, user of course need to be more careful. For. e.g. the attached echo server needs to enable "OP_READ" in every call to 'execute()'. The documentation does not seem extensive but support might be pretty good. Pretty much everything is public. If some thing is not public then it is mostly protected. I need to check how to control threads used by the framwork. Sometimes we might want all the work to be done on just one thread. To run the example: drop grizzly-framwork jar in trunk/lib, apply the patch under 'trunk', compile, and ' bin/hadoop org.apache.hadoop.net.GrizzlyEchoServer '.
          Hide
          Ankur added a comment -

          Thanks Raghu for providing something so quickly. I will try out the GrizzlyEchoServer patch and let you know how I feel about this.

          Show
          Ankur added a comment - Thanks Raghu for providing something so quickly. I will try out the GrizzlyEchoServer patch and let you know how I feel about this.
          Hide
          Ankur added a comment -

          I read a bit about Grizzly framework and tried the attached patch. Grizzly definitely looks simpler and flexible compared to Mina. It lets you implement custom protocol handlers with ease and in a way that's easy to read and understand.

          The thing that I especially liked is its simple architecture and the fact that it does not hide the underlying NIO objects thus providing more control and being flexible.

          It appears to be a lightweight NIO framework and their reported performance comparisons with other NIO frameworks look encouraging.

          The only flip-side I see is lack of documentation but shouldn't pose too much of a problem considering the fact that code seem to have decent javadoc comments.

          Licensing of our choice would be CDDL which is compatible with Apache License, I think. So even that's a non-issue.

          Looks like a good candidate to me get started .

          If folks agree then the next step I guess would be to cook up a patch replacing read/write in datanode with Async I/O using Grizzly.

          Show
          Ankur added a comment - I read a bit about Grizzly framework and tried the attached patch. Grizzly definitely looks simpler and flexible compared to Mina. It lets you implement custom protocol handlers with ease and in a way that's easy to read and understand. The thing that I especially liked is its simple architecture and the fact that it does not hide the underlying NIO objects thus providing more control and being flexible. It appears to be a lightweight NIO framework and their reported performance comparisons with other NIO frameworks look encouraging. The only flip-side I see is lack of documentation but shouldn't pose too much of a problem considering the fact that code seem to have decent javadoc comments. Licensing of our choice would be CDDL which is compatible with Apache License, I think. So even that's a non-issue. Looks like a good candidate to me get started . If folks agree then the next step I guess would be to cook up a patch replacing read/write in datanode with Async I/O using Grizzly.
          Hide
          Raghu Angadi added a comment - - edited

          > I guess would be to cook up a patch replacing read/write in datanode with Async I/O using Grizzly.
          I agree. Before we replacing all of the datanode transfers, we could do one or both of the following :

          • Replace reads. This will help many users including HBase. will be a good a test case without affecting more critical write path. Bugs in read pipeline don't corrupt HDFS data.
          • Replace "responder thread" while writing. I think for this, we might need to find out how to make Grizzly not close the socket.
          Show
          Raghu Angadi added a comment - - edited > I guess would be to cook up a patch replacing read/write in datanode with Async I/O using Grizzly. I agree. Before we replacing all of the datanode transfers, we could do one or both of the following : Replace reads. This will help many users including HBase. will be a good a test case without affecting more critical write path. Bugs in read pipeline don't corrupt HDFS data. Replace "responder thread" while writing. I think for this, we might need to find out how to make Grizzly not close the socket.
          Hide
          Ankur added a comment -

          Sounds good! I'll have a patch coming soon with the suggested modifications.

          Show
          Ankur added a comment - Sounds good! I'll have a patch coming soon with the suggested modifications.
          Hide
          Ankur added a comment -

          Looks like work is going to keep me busy this week. So I might not be able to come-up with the patch until next week.

          Raghu, if you think you can do a patch before next week, please feel free to go ahead.

          Show
          Ankur added a comment - Looks like work is going to keep me busy this week. So I might not be able to come-up with the patch until next week. Raghu, if you think you can do a patch before next week, please feel free to go ahead.
          Hide
          Meng Mao added a comment -

          Any news on this front?

          Our team has a vested interest in an alternate solution to the current DFS data write implementation. Couching my argument in a very naive perspective:
          Our code worked just fine in 0.15, with no apparent inability to scale, even on very large test data sets. We've been stuck trying to get those same tests to work in 0.18 for ages, and at least are now aware of the design changes that are causing failures. What it boils down to for us, is that if Hadoop sticks with the 2 threads per write operation model, then our code will simply outstrip machine resources when we try to scale. So our course of action would have to be to review each mapreduce operation in our pipeline and consider whether it might incur a high number of threads.

          Conversely, it'd be much easier for us to consume a version of Hadoop where #threads doesn't grow with #write operations. Or at least not in a linear fashion. We'd be happy to help try out any patches toward this, including the those posted in this issue. It's a matter of whether they're ready to go, I guess.

          Show
          Meng Mao added a comment - Any news on this front? Our team has a vested interest in an alternate solution to the current DFS data write implementation. Couching my argument in a very naive perspective: Our code worked just fine in 0.15, with no apparent inability to scale, even on very large test data sets. We've been stuck trying to get those same tests to work in 0.18 for ages, and at least are now aware of the design changes that are causing failures. What it boils down to for us, is that if Hadoop sticks with the 2 threads per write operation model, then our code will simply outstrip machine resources when we try to scale. So our course of action would have to be to review each mapreduce operation in our pipeline and consider whether it might incur a high number of threads. Conversely, it'd be much easier for us to consume a version of Hadoop where #threads doesn't grow with #write operations. Or at least not in a linear fashion. We'd be happy to help try out any patches toward this, including the those posted in this issue. It's a matter of whether they're ready to go, I guess.
          Hide
          Ankur added a comment -

          The kind of scalability issues reported with the current datanode write approach does make this issue a top priority. I think its high time work started on this. I am trying to see if i can find some time out of my work commitments to work on this. Haven't got a chance yet to play a bit more with Grizzly.

          Show
          Ankur added a comment - The kind of scalability issues reported with the current datanode write approach does make this issue a top priority. I think its high time work started on this. I am trying to see if i can find some time out of my work commitments to work on this. Haven't got a chance yet to play a bit more with Grizzly.
          Hide
          Doug Cutting added a comment -

          The first thing I'd try is to see if one can move DFS data access to use Hadoop's RPC. Hadoop RPC servers already use async io, and it would be simple to add async support to the RPC client. For security reasons and others, Hadoop should not add another networking stack. We should rather consolidate on a single stack. Eventually we may move everything to a different RPC system, but until then we should try to use a single system everywhere and make that move all at once.

          Show
          Doug Cutting added a comment - The first thing I'd try is to see if one can move DFS data access to use Hadoop's RPC. Hadoop RPC servers already use async io, and it would be simple to add async support to the RPC client. For security reasons and others, Hadoop should not add another networking stack. We should rather consolidate on a single stack. Eventually we may move everything to a different RPC system, but until then we should try to use a single system everywhere and make that move all at once.
          Hide
          Raghu Angadi added a comment -

          Another option that might be simpler : to take async network I/O handling that is written specifically for RPC out and enhance/modify it so that it works for RPC, Data transfers, clients, disk I/O etc. Kind of bare-bones NIO framework.

          Show
          Raghu Angadi added a comment - Another option that might be simpler : to take async network I/O handling that is written specifically for RPC out and enhance/modify it so that it works for RPC, Data transfers, clients, disk I/O etc. Kind of bare-bones NIO framework.
          Hide
          Doug Cutting added a comment -

          > to take async network I/O handling that is written specifically for RPC out and enhance/modify it [ ... ]

          Much of what RPC adds is also needed for data transfer: authentication, authorization, protocol versioning, etc. So the question is, does RPC have overheads that are unacceptable for data transfer? We might model data transfer as exchanges of, e.g., ~100kB buffer objects. A key concern would be extra buffer copies. We'd need to perhaps add support for a special kind RPC parameter which is written as a part of the response. Then the client could pass down a buffer or stream to read the response data into.

          Show
          Doug Cutting added a comment - > to take async network I/O handling that is written specifically for RPC out and enhance/modify it [ ... ] Much of what RPC adds is also needed for data transfer: authentication, authorization, protocol versioning, etc. So the question is, does RPC have overheads that are unacceptable for data transfer? We might model data transfer as exchanges of, e.g., ~100kB buffer objects. A key concern would be extra buffer copies. We'd need to perhaps add support for a special kind RPC parameter which is written as a part of the response. Then the client could pass down a buffer or stream to read the response data into.
          Hide
          Raghu Angadi added a comment -

          Right. Data transfers could (eventually) be handled by RPCs for the reasons you mentioned. To do that we need :

          • 1. new features in RPC ("streaming" RPCs, optionally avoiding copies...) and
          • 2. new features in async I/O.

          You are suggesting it is probably better to do both here rather than doing 2 first and then 1. Implementing both might be better based on time requirements and what stage the discussion of replacing Hadoop RPC with another one is.

          Show
          Raghu Angadi added a comment - Right. Data transfers could (eventually) be handled by RPCs for the reasons you mentioned. To do that we need : 1. new features in RPC ("streaming" RPCs, optionally avoiding copies...) and 2. new features in async I/O. You are suggesting it is probably better to do both here rather than doing 2 first and then 1. Implementing both might be better based on time requirements and what stage the discussion of replacing Hadoop RPC with another one is.
          Hide
          Raghu Angadi added a comment -

          Implementing both is something we should certainly consider as the first option.

          Show
          Raghu Angadi added a comment - Implementing both is something we should certainly consider as the first option.
          Hide
          Doug Cutting added a comment -

          I'm not sure we need "streaming" RPCs or whether buffer-by-buffer calls are sufficient. My hunch would be to try buffer-by-buffer calls first. Support for output parameters is probably required to avoid extra buffer copies, but one could first try it without even that to see if we're in the ballpark.

          As for async, I don't follow your question. It would seem simpler to me to add an async call pattern to RPC than to build a new async stack. Perhaps we could signal it in a protocol by declaring a method that returns a SelectionKey. Such methods would return as soon as their parameters are written, and the return value could be used to listen for the response.

          None of the RPC systems we might switch to today yet support such features, to my knowledge. So, we could

          • wait until they do and we've switched to that RPC system.
          • add these today to our RPC system, start using it today, and then port what we've done to a different RPC system when we switch if needed
          • add these features ourselves to a different RPC system now, then port HDFS to use that system

          The second of these appeals to me, since I think we'll both learn more about what features we need and solve our immediate needs sooner. Much of the work would probably not be wasted, since I suspect that whatever RPC system we use long-term we will need to implement some low-level transport features, since our requirements are more extreme and specific than most folks.

          Show
          Doug Cutting added a comment - I'm not sure we need "streaming" RPCs or whether buffer-by-buffer calls are sufficient. My hunch would be to try buffer-by-buffer calls first. Support for output parameters is probably required to avoid extra buffer copies, but one could first try it without even that to see if we're in the ballpark. As for async, I don't follow your question. It would seem simpler to me to add an async call pattern to RPC than to build a new async stack. Perhaps we could signal it in a protocol by declaring a method that returns a SelectionKey. Such methods would return as soon as their parameters are written, and the return value could be used to listen for the response. None of the RPC systems we might switch to today yet support such features, to my knowledge. So, we could wait until they do and we've switched to that RPC system. add these today to our RPC system, start using it today, and then port what we've done to a different RPC system when we switch if needed add these features ourselves to a different RPC system now, then port HDFS to use that system The second of these appeals to me, since I think we'll both learn more about what features we need and solve our immediate needs sooner. Much of the work would probably not be wasted, since I suspect that whatever RPC system we use long-term we will need to implement some low-level transport features, since our requirements are more extreme and specific than most folks.
          Hide
          Raghu Angadi added a comment -

          unassigned since I am actively working on this.

          Show
          Raghu Angadi added a comment - unassigned since I am actively working on this.
          Hide
          Raghu Angadi added a comment -

          I meant to say '... NOT actively working on this'.

          Show
          Raghu Angadi added a comment - I meant to say '... NOT actively working on this'.
          Hide
          dhruba borthakur added a comment -

          One option that comes to my mind is to have async RPC calls where the caller does not need to wait for a response. From all other programing angles, it looks like a regular RPC call, except that there is no guarantee whether the data actually made it to the other side or not. The difference is tat the caller does not incur the cost of a round-triptime to the server.

          The client can make these async RPC calls to transfer large amounts of data. The Data Transfer protocol already requires the receiver to send acks (which can be send as async RPCs as well) back to the client.

          Show
          dhruba borthakur added a comment - One option that comes to my mind is to have async RPC calls where the caller does not need to wait for a response. From all other programing angles, it looks like a regular RPC call, except that there is no guarantee whether the data actually made it to the other side or not. The difference is tat the caller does not incur the cost of a round-triptime to the server. The client can make these async RPC calls to transfer large amounts of data. The Data Transfer protocol already requires the receiver to send acks (which can be send as async RPCs as well) back to the client.
          Hide
          Raghu Angadi added a comment -

          These are all good options for fixing data transfer threads on Datanode. The options are :

          • (a) lower level : provide an async IO framework that can be used by Datanode, Clients (and even RPC).
          • (b) higher level : enhance RPC to support data transfer requirements, especially w.r.t performance.

          The main focus of this Jira is (a). Should I file another jira for (b) and link it here? These two have some overlap but approach, side benefits, and application level changes required are quite different. If done well, RPC might actually be built over (a).

          I think it might be better to let these two evolve separately in different jiras. Of course only one of them might get into Hadoop.

          Show
          Raghu Angadi added a comment - These are all good options for fixing data transfer threads on Datanode. The options are : (a) lower level : provide an async IO framework that can be used by Datanode, Clients (and even RPC). (b) higher level : enhance RPC to support data transfer requirements, especially w.r.t performance. The main focus of this Jira is (a). Should I file another jira for (b) and link it here? These two have some overlap but approach, side benefits, and application level changes required are quite different. If done well, RPC might actually be built over (a). I think it might be better to let these two evolve separately in different jiras. Of course only one of them might get into Hadoop.
          Hide
          Doug Cutting added a comment -

          I think before we can commit to an async strategy for HDFS we'll need to perform some experiments. And before we commit async RPCs to trunk they ought to be used, to test their adequacy. So, it could be done as two separate issues, but they're best developed in parallel. For example, there may be no point in committing async RPC extensions if buffer-by-buffer access proves impractical in HDFS. We don't need features that are not used. So I think this could be done in a single Jira, or two that are closely coordinated.

          > async RPC calls where the caller does not need to wait for a response

          Yes, I agree. My suggestion above was that we might model this in an interface by declaring methods with a particular return type. On further thought, that wouldn't work, since that method could not be implemented server-side. But it would sure be nice if one didn't have to use meta-programming (e.g., RPC.asyncCall(Class.getMethod(...))) but could instead directly invoke async methods. So perhaps one could declare pairs of methods, like:

          interface FooProtocol {
            Foo getFoo();
            SelectionKey getFooAsync();
          

          The RPC runtime would match methods whose name ends with "Async" and whose return type is SelectionKey with a method of the same name w/o "Async" and a different return type. The client could call getFooAsync() to get a SelectionKey, the server would call getFoo(), and the client would cast the result from the server to a Bar. The ugly part is that the implementation on the server would have to provide some definition of fooAsync() in order to compile, but it would never actually be called. Perhaps to avoid this we could add a client-specific interface:

          interface FooProtocol {
            Foo getFoo();
          }
          interface FooClient extends FooProtocol {
            SelectionKey getFooAsync();
          }
          

          Then pass both classes to RPC#getProxy(). It would return a FooClient, but use FooProtocol to talk to the server. The server would only implement FooProtocol. Could that work?

          Show
          Doug Cutting added a comment - I think before we can commit to an async strategy for HDFS we'll need to perform some experiments. And before we commit async RPCs to trunk they ought to be used, to test their adequacy. So, it could be done as two separate issues, but they're best developed in parallel. For example, there may be no point in committing async RPC extensions if buffer-by-buffer access proves impractical in HDFS. We don't need features that are not used. So I think this could be done in a single Jira, or two that are closely coordinated. > async RPC calls where the caller does not need to wait for a response Yes, I agree. My suggestion above was that we might model this in an interface by declaring methods with a particular return type. On further thought, that wouldn't work, since that method could not be implemented server-side. But it would sure be nice if one didn't have to use meta-programming (e.g., RPC.asyncCall(Class.getMethod(...))) but could instead directly invoke async methods. So perhaps one could declare pairs of methods, like: interface FooProtocol { Foo getFoo(); SelectionKey getFooAsync(); The RPC runtime would match methods whose name ends with "Async" and whose return type is SelectionKey with a method of the same name w/o "Async" and a different return type. The client could call getFooAsync() to get a SelectionKey, the server would call getFoo(), and the client would cast the result from the server to a Bar. The ugly part is that the implementation on the server would have to provide some definition of fooAsync() in order to compile, but it would never actually be called. Perhaps to avoid this we could add a client-specific interface: interface FooProtocol { Foo getFoo(); } interface FooClient extends FooProtocol { SelectionKey getFooAsync(); } Then pass both classes to RPC#getProxy(). It would return a FooClient, but use FooProtocol to talk to the server. The server would only implement FooProtocol. Could that work?
          Hide
          Raghu Angadi added a comment -

          > So, it could be done as two separate issues, but they're best developed in parallel. [...]

          +1. yes, they are best developed in parallel. That is better than discussing in one jira since these two are quite independent and we will essentially have two threads in the same jira.

          While we compare code changes and performance, the users who are most affected might want to try as soon as either of these is available.

          Show
          Raghu Angadi added a comment - > So, it could be done as two separate issues, but they're best developed in parallel. [...] +1. yes, they are best developed in parallel. That is better than discussing in one jira since these two are quite independent and we will essentially have two threads in the same jira. While we compare code changes and performance, the users who are most affected might want to try as soon as either of these is available.
          Hide
          Raghu Angadi added a comment -

          file HADOOP-4386 for RPCs. Each of you can copy relevant comments there or I can do it if it is ok with you.

          Show
          Raghu Angadi added a comment - file HADOOP-4386 for RPCs. Each of you can copy relevant comments there or I can do it if it is ok with you.
          Hide
          steve_l added a comment -

          Trustin Lee has been doing some benchmarking of NIO library performance:
          http://gleamynode.net/articles/2232/performance-comparison-between-nio-frameworks

          Show
          steve_l added a comment - Trustin Lee has been doing some benchmarking of NIO library performance: http://gleamynode.net/articles/2232/performance-comparison-between-nio-frameworks
          Hide
          Jonathan Gray added a comment -

          Over in HBase land, we're hitting issues with this. We've been recommending users to up their max xceivers, but now we're starting to see OOMEs of varying sorts (like unable to create new native thread) as each xceiver is taking up at least one thread.

          To be efficient with random access, we must keep connections/handles open to our files. However this does not scale with the current DataNode (and DFSClient) implementations.

          It seems Netty would be preferable, LGPL license should be okay.

          There are some HBase guys who are willing to spend time working on this, but it seems there are some splits about how to proceed. The idea of larger changes seems to have silenced the issue, so maybe we can try an iterative approach and solve the primary issue first of not having a thread per connection in the DataNode. And then look at optimizing DFSClient.

          Looking for some feedback so we can get started. Thanks.

          Show
          Jonathan Gray added a comment - Over in HBase land, we're hitting issues with this. We've been recommending users to up their max xceivers, but now we're starting to see OOMEs of varying sorts (like unable to create new native thread) as each xceiver is taking up at least one thread. To be efficient with random access, we must keep connections/handles open to our files. However this does not scale with the current DataNode (and DFSClient) implementations. It seems Netty would be preferable, LGPL license should be okay. There are some HBase guys who are willing to spend time working on this, but it seems there are some splits about how to proceed. The idea of larger changes seems to have silenced the issue, so maybe we can try an iterative approach and solve the primary issue first of not having a thread per connection in the DataNode. And then look at optimizing DFSClient. Looking for some feedback so we can get started. Thanks.
          Hide
          Raghu Angadi added a comment -

          > The idea of larger changes seems to have silenced the issue, so maybe we can try an iterative approach and solve the primary issue first of not having a thread per connection in the DataNode.

          I don't think there is much disagreement here. Of course it will be a large change even for just DataNodes. That is unavoidable and it is ok. RPC based solution is also discussed but we filed another jira (HADOOP-4386) for it. Which ever solution is ready first can go!

          I think fix on Datanodes alone is fine and extremely useful. If I am implementing this now, I would implement our own bare bones event handler that uses a pool of threads to handle network and disk i/o for each "connection". It could always be moved to a different NIO-Framework, since most of them have pretty similar structure. Other implementations are also welcome.

          For this HBase, initial patch could move only the readers to this thread pool (so that more critical write path is not affected initially).

          Show
          Raghu Angadi added a comment - > The idea of larger changes seems to have silenced the issue, so maybe we can try an iterative approach and solve the primary issue first of not having a thread per connection in the DataNode. I don't think there is much disagreement here. Of course it will be a large change even for just DataNodes. That is unavoidable and it is ok. RPC based solution is also discussed but we filed another jira ( HADOOP-4386 ) for it. Which ever solution is ready first can go! I think fix on Datanodes alone is fine and extremely useful. If I am implementing this now, I would implement our own bare bones event handler that uses a pool of threads to handle network and disk i/o for each "connection". It could always be moved to a different NIO-Framework, since most of them have pretty similar structure. Other implementations are also welcome. For this HBase, initial patch could move only the readers to this thread pool (so that more critical write path is not affected initially).
          Hide
          Raghu Angadi added a comment -


          IOW, this jira is not waiting on a design decision. It is just waiting for a (prototype) implementation.

          Show
          Raghu Angadi added a comment - IOW, this jira is not waiting on a design decision. It is just waiting for a (prototype) implementation.
          Hide
          Jonathan Gray added a comment -

          Thanks Raghu. Will talk to the others and we'll have at it.

          Also, seems we can't use Netty because of LGPL license: http://alek.xspaces.org/2005/01/15/using-lgpl-d-code-jakarta-wiki

          Show
          Jonathan Gray added a comment - Thanks Raghu. Will talk to the others and we'll have at it. Also, seems we can't use Netty because of LGPL license: http://alek.xspaces.org/2005/01/15/using-lgpl-d-code-jakarta-wiki
          Hide
          steve_l added a comment -

          This issue crops up in Hibernate use, which is why there is a special exemption for LPGL code with "the hibernate clause" in which the copyright owner clarifies that import/use of their LGPL JAR doesn't make LGPL apply to the rest of the codebase.

          http://www.mail-archive.com/legal-discuss@apache.org/msg00009.html

          Show
          steve_l added a comment - This issue crops up in Hibernate use, which is why there is a special exemption for LPGL code with "the hibernate clause" in which the copyright owner clarifies that import/use of their LGPL JAR doesn't make LGPL apply to the rest of the codebase. http://www.mail-archive.com/legal-discuss@apache.org/msg00009.html
          Hide
          Andrew Purtell added a comment -

          I'll look at this. (I'm HBase.) If anyone else is working on this, please contact me.

          Show
          Andrew Purtell added a comment - I'll look at this. (I'm HBase.) If anyone else is working on this, please contact me.
          Hide
          Ankur added a comment -

          what's the approach you are planning to take. I might be able to spend some time on this one.

          Show
          Ankur added a comment - what's the approach you are planning to take. I might be able to spend some time on this one.
          Hide
          Andrew Purtell added a comment -

          Hi Ankur. I have already made an initial conservative refactoring pass and am testing the result.

          The plan is to use selectable channels set to blocking mode, a work queue, and a pool of worker threads to handle xceiver connections. The hope is much fewer actual threads can do equivalent work as 2048 or 4096 dedicated per connection xceiver threads. Meanwhile, as little of the code as possible would need to be changed. However this is only benificial if, once an xceiver is engaged, the transaction usually completes quickly. If problematic load patterns (HBase) are predominantely slowly reading or writing clients, as opposed to simply a flood of requests, then there's no choice but to make the channels fully async and at this point essentially reimplement the datanode protocol handler.

          Show
          Andrew Purtell added a comment - Hi Ankur. I have already made an initial conservative refactoring pass and am testing the result. The plan is to use selectable channels set to blocking mode, a work queue, and a pool of worker threads to handle xceiver connections. The hope is much fewer actual threads can do equivalent work as 2048 or 4096 dedicated per connection xceiver threads. Meanwhile, as little of the code as possible would need to be changed. However this is only benificial if, once an xceiver is engaged, the transaction usually completes quickly. If problematic load patterns (HBase) are predominantely slowly reading or writing clients, as opposed to simply a flood of requests, then there's no choice but to make the channels fully async and at this point essentially reimplement the datanode protocol handler.
          Hide
          Raghu Angadi added a comment -

          Thanks for working on the patch.

          > However this is only benificial if, once an xceiver is engaged, the transaction usually completes quickly

          This is usually not the case. Many of these threads stay active in the order of 10s of seconds to minutes. These threads are involved in disk i/o as well. As long as the patch makes sure that a thread is never blocked on network I/O and blocks only on disk i/o, that is essentially what is needed.

          Currently DN threads block more often on network I/O which is the core of the problem.

          Show
          Raghu Angadi added a comment - Thanks for working on the patch. > However this is only benificial if, once an xceiver is engaged, the transaction usually completes quickly This is usually not the case. Many of these threads stay active in the order of 10s of seconds to minutes. These threads are involved in disk i/o as well. As long as the patch makes sure that a thread is never blocked on network I/O and blocks only on disk i/o, that is essentially what is needed. Currently DN threads block more often on network I/O which is the core of the problem.
          Hide
          Raghu Angadi added a comment -

          As long as the patch makes sure that a thread is never blocked on network I/O and blocks only on disk i/o, that is essentially what is needed.

          To relax this a little bit more, for now, it is ok to block during initial hand-shake (this is the main "protocol" part) to minimize structural changes. The initial hand-shake does not depend on client's I/O behavior and should finish fast.

          That said, please go ahead and attach any patch that you think improves the current situation. The above requirements are not very strict.

          Show
          Raghu Angadi added a comment - As long as the patch makes sure that a thread is never blocked on network I/O and blocks only on disk i/o, that is essentially what is needed. To relax this a little bit more, for now, it is ok to block during initial hand-shake (this is the main "protocol" part) to minimize structural changes. The initial hand-shake does not depend on client's I/O behavior and should finish fast. That said, please go ahead and attach any patch that you think improves the current situation. The above requirements are not very strict.
          Hide
          Andrew Purtell added a comment -

          Thanks for the feedback Raghu. So my current patch will not be effective. The datanode makes a lot of on demand connections for bulk transfer through several code paths that all assume blocking. This all really needs to be refactored into the state tracking async model. Let me take a look at this to see if I have the proper free time required. It's approximately a 50% reimplementation of the datanode.

          Show
          Andrew Purtell added a comment - Thanks for the feedback Raghu. So my current patch will not be effective. The datanode makes a lot of on demand connections for bulk transfer through several code paths that all assume blocking. This all really needs to be refactored into the state tracking async model. Let me take a look at this to see if I have the proper free time required. It's approximately a 50% reimplementation of the datanode.
          Hide
          Raghu Angadi added a comment -


          True. It will be a rewrite of primary data paths. I would say it is 75-80% for write path and rest for read path. I would port just the read path for initial implementation and testing.

          Show
          Raghu Angadi added a comment - True. It will be a rewrite of primary data paths. I would say it is 75-80% for write path and rest for read path. I would port just the read path for initial implementation and testing.
          Hide
          Ankur added a comment - - edited

          What do folks think on the usage of Netty licensed under LGPL 2.1 ?

          Show
          Ankur added a comment - - edited What do folks think on the usage of Netty licensed under LGPL 2.1 ?
          Hide
          Andrew Purtell added a comment -

          Isn't ASF MINA the successor to Netty?

          Show
          Andrew Purtell added a comment - Isn't ASF MINA the successor to Netty?
          Hide
          Ankur added a comment -

          I had a chance to look at the code in the trunck branch and it looks like some re-factorings have already been done to make RPC support large data transfers. RPC is essentially stateless, so I am not sure how we plan to use or modify it to support stateful semantics required by Datanode protocol. May be I am missing a key design trick here.

          The simplest thing that I can think of is to use java NIO directly to implement the DataNode protocol. Ofcourse this would be a re-write of DataNode and related clasess but would avoid the use of another networking stack.

          Show
          Ankur added a comment - I had a chance to look at the code in the trunck branch and it looks like some re-factorings have already been done to make RPC support large data transfers. RPC is essentially stateless, so I am not sure how we plan to use or modify it to support stateful semantics required by Datanode protocol. May be I am missing a key design trick here. The simplest thing that I can think of is to use java NIO directly to implement the DataNode protocol. Ofcourse this would be a re-write of DataNode and related clasess but would avoid the use of another networking stack.
          Hide
          Raghu Angadi added a comment -

          The simplest thing that I can think of is to use java NIO directly to implement the DataNode protocol. Ofcourse this would be a re-write of DataNode and related clasess but would avoid the use of another networking stack.

          That is the proposal here. For your initial prototype, you don't need to rewrite much of DataNode. We could essentially make BlockSender.sendBlock() to utilize async IO. Once that looks fine, we could port other parts.

          Show
          Raghu Angadi added a comment - The simplest thing that I can think of is to use java NIO directly to implement the DataNode protocol. Ofcourse this would be a re-write of DataNode and related clasess but would avoid the use of another networking stack. That is the proposal here. For your initial prototype, you don't need to rewrite much of DataNode. We could essentially make BlockSender.sendBlock() to utilize async IO. Once that looks fine, we could port other parts.
          Hide
          Trustin Lee added a comment -

          MINA is the successor of Netty 2.x. Netty 3.x has been written from scratch based on my experience with MINA, so Netty 3 is not the successor of MINA. They are independent from each other.

          Regarding LGPL, the main problem with depending on LGPL library in an ASF project is that an ASF project is required to explicitly state there's a dependency with incompatible license and not to distribute LGPL library JAR. Given that FSF already clarified the linkage issue in Java, this is not a legal issue but an ASF policy issue IMHO. There was an in-depth discussion about this at legal-discuss.

          To work around this issue, you need to make Hadoop build without LGPL library by default and make it build the component that depends on a LGPL library only when a user specified a certain build flag (e.g. -Dwith-lgpl-dependency). The technique was used in the MINA project, which optionally depends on RXTX (LGPL'd library.) Of course, RXTX.jar is fetched only when such a flag is specified to satisfy the ASF policy thanks to Maven.

          Or, you can make the LGPL'd library simply a system requirement and explicitly state that somewhere in the download page or NOTICE.txt and you should not include the JAR of the LGPL'd library in the distribution so that a user download it manually.

          Show
          Trustin Lee added a comment - MINA is the successor of Netty 2.x. Netty 3.x has been written from scratch based on my experience with MINA, so Netty 3 is not the successor of MINA. They are independent from each other. Regarding LGPL, the main problem with depending on LGPL library in an ASF project is that an ASF project is required to explicitly state there's a dependency with incompatible license and not to distribute LGPL library JAR. Given that FSF already clarified the linkage issue in Java, this is not a legal issue but an ASF policy issue IMHO. There was an in-depth discussion about this at legal-discuss. To work around this issue, you need to make Hadoop build without LGPL library by default and make it build the component that depends on a LGPL library only when a user specified a certain build flag (e.g. -Dwith-lgpl-dependency). The technique was used in the MINA project, which optionally depends on RXTX (LGPL'd library.) Of course, RXTX.jar is fetched only when such a flag is specified to satisfy the ASF policy thanks to Maven. Or, you can make the LGPL'd library simply a system requirement and explicitly state that somewhere in the download page or NOTICE.txt and you should not include the JAR of the LGPL'd library in the distribution so that a user download it manually.
          Hide
          Bo Shi added a comment -

          According to the netty website, the project is now licensed under the Apache license 2.0.

          http://www.jboss.org/netty/downloads.html

          From the mail chatter, looks like this happened only a couple days ago.

          Show
          Bo Shi added a comment - According to the netty website, the project is now licensed under the Apache license 2.0. http://www.jboss.org/netty/downloads.html From the mail chatter, looks like this happened only a couple days ago.

            People

            • Assignee:
              Unassigned
              Reporter:
              Raghu Angadi
            • Votes:
              6 Vote for this issue
              Watchers:
              55 Start watching this issue

              Dates

              • Created:
                Updated:

                Development