Hadoop HDFS
  1. Hadoop HDFS
  2. HDFS-249

RPC support for large data transfers.

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Currently HDFS has a socket level protocol for serving HDFS data to clients. Clients do not use RPCs to read or write data. Fundamentally there is no reason why this data transfer can not use RPCs.

      This jira is place holder for any porting Datanode transfers to RPC. This topic has been discussed in varying detail many times, the latest being in the context of HADOOP-3856. There are quite a few issues to be resolved both at API level and at implementation level.

      We should probably copy some of the comments from HADOOP-3856 to here.

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          I think there are a couple of things here. Perhaps they should become sub-issues.

          • async RPC calls. One should be able to invoke an RPC but not block waiting for the response, instead using a 'select'-style api to recieve responses asynchronously.
          • non-copied parameters and values. One should be able to have results from an RPC read directly from the socket into a user-provided object (a parameter?) avoiding an extra copy. On the server side, one should also be able to transfer data directly from a file to the socket.
          Show
          Doug Cutting added a comment - I think there are a couple of things here. Perhaps they should become sub-issues. async RPC calls. One should be able to invoke an RPC but not block waiting for the response, instead using a 'select'-style api to recieve responses asynchronously. non-copied parameters and values. One should be able to have results from an RPC read directly from the socket into a user-provided object (a parameter?) avoiding an extra copy. On the server side, one should also be able to transfer data directly from a file to the socket.
          Hide
          Raghu Angadi added a comment -

          I think so too.

          Some of the issues that I can think of regd implementing 'non-copied' parameters:

          • how is user's copier code run. Will it hold up the RPC handler during the call?
          • what happens when data can not be written or read without blocking. Will the connection be locked by this call, will a new connection be created for other RPCs
          Show
          Raghu Angadi added a comment - I think so too. Some of the issues that I can think of regd implementing 'non-copied' parameters: how is user's copier code run. Will it hold up the RPC handler during the call? what happens when data can not be written or read without blocking. Will the connection be locked by this call, will a new connection be created for other RPCs
          Hide
          Devaraj Das added a comment -

          We should experiment with this feature for MapReduce shuffle as well (maybe as a follow-up jira). Shuffle has more or less the same sort of requirements.

          Locking connection seems inefficient to me except only during initiation/termination. Connection to the same server should be serialized. Did you mean some other type of blocking?

          Show
          Devaraj Das added a comment - We should experiment with this feature for MapReduce shuffle as well (maybe as a follow-up jira). Shuffle has more or less the same sort of requirements. Locking connection seems inefficient to me except only during initiation/termination. Connection to the same server should be serialized. Did you mean some other type of blocking?
          Hide
          Raghu Angadi added a comment -

          Ideally no RPC should be blocked because of another 'zero copy RPC' (where RPC layer does not have control how fast the data is written or read).

          Show
          Raghu Angadi added a comment - Ideally no RPC should be blocked because of another 'zero copy RPC' (where RPC layer does not have control how fast the data is written or read).
          Hide
          Doug Cutting added a comment -

          Raghu> what happens when data can not be written or read without blocking.

          That's indeed the rub. RPC multiplexes over a shared connection all traffic between to JVMs. But if we want to avoid a buffer copy with FileChannel#transferTo, we'd have to lock that connection for an unbounded time.

          TransferTo and transferFrom are not async operations, but blocking operations. We cannot use them without a thread per connection, which is also one of the things we're trying to avoid. So, unless I'm missing something, use of transferTo and transferFrom mandates both a socket and a thread per open file. I don't yet see a happy middle ground. It seems that in order to eliminate extra sockets and threads we're forced to do at least one buffer copy. Am I missing something?

          Devaraj> We should experiment with this feature for MapReduce shuffle as well (maybe as a follow-up jira).

          +1 That's indeed the other non-RPC-based protocol that would be great to replace with RPC.

          Show
          Doug Cutting added a comment - Raghu> what happens when data can not be written or read without blocking. That's indeed the rub. RPC multiplexes over a shared connection all traffic between to JVMs. But if we want to avoid a buffer copy with FileChannel#transferTo, we'd have to lock that connection for an unbounded time. TransferTo and transferFrom are not async operations, but blocking operations. We cannot use them without a thread per connection, which is also one of the things we're trying to avoid. So, unless I'm missing something, use of transferTo and transferFrom mandates both a socket and a thread per open file. I don't yet see a happy middle ground. It seems that in order to eliminate extra sockets and threads we're forced to do at least one buffer copy. Am I missing something? Devaraj> We should experiment with this feature for MapReduce shuffle as well (maybe as a follow-up jira). +1 That's indeed the other non-RPC-based protocol that would be great to replace with RPC.
          Hide
          Raghu Angadi added a comment -

          > TransferTo and transferFrom are not async operations, but blocking operations.

          It is mixed. Disk i/o is blocking, but socket i/o obeys blocking setting of the socket. So if you are transferring from a file to socket, read from the file is blocking (but not readFully()), and write to the socket is non-blocking.

          > It seems that in order to eliminate extra sockets and threads we're forced to do at least one buffer copy. Am I missing something?

          Not necessarily. The main intention in HADOOP-3856 (that could apply here, if not initially) is that Datanode will have a fixed number of threads per partition, say 5. These threads invoke transferTo(). As long as 5 threads can keep the disks busy, this is essentially doing as best as thread-per-connection could do. We will of course make '5' configurable with a good default.

          will check MR Shuffle protocol to see how it works now.

          Show
          Raghu Angadi added a comment - > TransferTo and transferFrom are not async operations, but blocking operations. It is mixed. Disk i/o is blocking, but socket i/o obeys blocking setting of the socket. So if you are transferring from a file to socket, read from the file is blocking (but not readFully()), and write to the socket is non-blocking. > It seems that in order to eliminate extra sockets and threads we're forced to do at least one buffer copy. Am I missing something? Not necessarily. The main intention in HADOOP-3856 (that could apply here, if not initially) is that Datanode will have a fixed number of threads per partition, say 5. These threads invoke transferTo(). As long as 5 threads can keep the disks busy, this is essentially doing as best as thread-per-connection could do. We will of course make '5' configurable with a good default. will check MR Shuffle protocol to see how it works now.
          Hide
          Raghu Angadi added a comment -

          I should add that using transferTo or not does not actually change the blocking dynamic. In side the RPC, user code still needs to do blocking read or write to disk.

          Show
          Raghu Angadi added a comment - I should add that using transferTo or not does not actually change the blocking dynamic. In side the RPC, user code still needs to do blocking read or write to disk.
          Hide
          dhruba borthakur added a comment -

          > async RPC calls. One should be able to invoke an RPC but not block waiting for the response, instead using a 'select'-style a

          Isn't it enough to require that an asyncRPC is more like a sendMsg, in the sense that it sends a message without expecting any response? Such a design would satisfy the requirements of doing read/writes of user data. My assumption is that such a RPC design would have lesser code complexity versus a design that requires responses to be accumulated asynchronously and delivered to the client application.

          In this sense, asyncRPC is not completely non-blocking: it can experience blocking while accessing sockets and files; but the "async" nature is that it is not waiting for a application-level response from the remote server.

          Show
          dhruba borthakur added a comment - > async RPC calls. One should be able to invoke an RPC but not block waiting for the response, instead using a 'select'-style a Isn't it enough to require that an asyncRPC is more like a sendMsg, in the sense that it sends a message without expecting any response? Such a design would satisfy the requirements of doing read/writes of user data. My assumption is that such a RPC design would have lesser code complexity versus a design that requires responses to be accumulated asynchronously and delivered to the client application. In this sense, asyncRPC is not completely non-blocking: it can experience blocking while accessing sockets and files; but the "async" nature is that it is not waiting for a application-level response from the remote server.
          Hide
          Doug Cutting added a comment -

          > Isn't it enough to require that an asyncRPC is more like a sendMsg, in the sense that it sends a message without expecting any response?

          If you send an async request to read some data, you want the data, but you don't want to block. For example, during the shuffle, a tasktracker might send requests to many other tasktrackers to read map outputs, but does not want to block waiting for that data to be returned. Rather it could send requests in one thread and receive response data in another.

          For writes, the client typically wants an ack response that data was successfully written.

          Sends w/o response might be useful for status updates, where an error would be ignored, but in most other cases I think we do care about the response.

          Show
          Doug Cutting added a comment - > Isn't it enough to require that an asyncRPC is more like a sendMsg, in the sense that it sends a message without expecting any response? If you send an async request to read some data, you want the data, but you don't want to block. For example, during the shuffle, a tasktracker might send requests to many other tasktrackers to read map outputs, but does not want to block waiting for that data to be returned. Rather it could send requests in one thread and receive response data in another. For writes, the client typically wants an ack response that data was successfully written. Sends w/o response might be useful for status updates, where an error would be ignored, but in most other cases I think we do care about the response.
          Hide
          Raghu Angadi added a comment -

          >[...] but in most other cases I think we do care about the response.

          I think so. Currently even while DFSClient is writing data, the pipeline does need to know if there was an error while sending or receiving acks to break the pipeline appropriately. Once we have an API for async RPC calls, it is not much more to return a 'Future' that can be inquired for success of failure.

          Show
          Raghu Angadi added a comment - > [...] but in most other cases I think we do care about the response. I think so. Currently even while DFSClient is writing data, the pipeline does need to know if there was an error while sending or receiving acks to break the pipeline appropriately. Once we have an API for async RPC calls, it is not much more to return a 'Future' that can be inquired for success of failure.
          Hide
          Ankur added a comment -

          Is anyone working on this? if yes then are we going ahead with the approach of modifying hadoop RPC and add async method to the protocol as Doug described on HADOOP-3856?

          Show
          Ankur added a comment - Is anyone working on this? if yes then are we going ahead with the approach of modifying hadoop RPC and add async method to the protocol as Doug described on HADOOP-3856 ?

            People

            • Assignee:
              Unassigned
              Reporter:
              Raghu Angadi
            • Votes:
              0 Vote for this issue
              Watchers:
              20 Start watching this issue

              Dates

              • Created:
                Updated:

                Development