Uploaded image for project: 'Hadoop HDFS'
  1. Hadoop HDFS
  2. HDFS-5499

Provide way to throttle per FileSystem read/write bandwidth

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      In some cases it might be worth to throttle read/writer bandwidth on per JVM basis so that clients do not spawn too many thread and start data movement causing other JVMs to starve. Ability to throttle read/write bandwidth on per FileSystem would help avoid such issues.

      Challenge seems to be how well this can be fit into FileSystem code. If one enables throttling around FileSystem APIs, then any hidden data transfer within cluster using them might also be affected. Eg. copying job jar during job submission, localizing resources for distributed cache and such.

      1. HDFS-5499.1.patch
        40 kB
        Lohit Vijayarenu

        Activity

        Hide
        lohit Lohit Vijayarenu added a comment -

        We have been thinking something along this line in the patch, which can throttle HDFS read/write and Hftp reads. Posting here for any inputs from people who might have thought about this use case and what they think about this.

        Show
        lohit Lohit Vijayarenu added a comment - We have been thinking something along this line in the patch, which can throttle HDFS read/write and Hftp reads. Posting here for any inputs from people who might have thought about this use case and what they think about this.
        Hide
        wheat9 Haohui Mai added a comment -

        What about building a new InputStream or OutputStream using the decorator pattern to implement throttling?

        Show
        wheat9 Haohui Mai added a comment - What about building a new InputStream or OutputStream using the decorator pattern to implement throttling?
        Hide
        lohit Lohit Vijayarenu added a comment -

        Haohui Mai Thanks. Yes, that is one option as well. But concern of other hidden components getting throttled would still remain for this if this was set as a clusterwide config, right?

        Show
        lohit Lohit Vijayarenu added a comment - Haohui Mai Thanks. Yes, that is one option as well. But concern of other hidden components getting throttled would still remain for this if this was set as a clusterwide config, right?
        Hide
        wheat9 Haohui Mai added a comment -

        It's problematic to do throttling using a cluster-wide configuration. For trusted programs this approach works fine. For untrusted programs, there's no way to guarantee that these programs will respect this configuration. These programs can take all available I/O bandwidth to make themselves run faster.

        Another concern is that you might not be able to make optimal decisions at the filesystem layer. For example, in many cases you don't want any throttling for LocalFileSystem at all. Another example is that touching ByteRangeInputStream can throttle webhdfs accidentally, since the code is shared by Hftp / Hsftp / WebHdfs, etc.

        I assume that you're running your jobs in a trusted environment. Does implementing a decorator class for FileSystem solve your problem? The class can wraps all Input/OutputStream with an adaptor which implements throttling. The code needs to use the new API for throttling, but it seems to me that it is a cleaner and simpler route to take.

        Show
        wheat9 Haohui Mai added a comment - It's problematic to do throttling using a cluster-wide configuration. For trusted programs this approach works fine. For untrusted programs, there's no way to guarantee that these programs will respect this configuration. These programs can take all available I/O bandwidth to make themselves run faster. Another concern is that you might not be able to make optimal decisions at the filesystem layer. For example, in many cases you don't want any throttling for LocalFileSystem at all. Another example is that touching ByteRangeInputStream can throttle webhdfs accidentally, since the code is shared by Hftp / Hsftp / WebHdfs, etc. I assume that you're running your jobs in a trusted environment. Does implementing a decorator class for FileSystem solve your problem? The class can wraps all Input/OutputStream with an adaptor which implements throttling. The code needs to use the new API for throttling, but it seems to me that it is a cleaner and simpler route to take.
        Hide
        stevel@apache.org Steve Loughran added a comment -

        I've looked at it a bit within the context of YARN.

        YARN containers are where this would be ideal, as then you'd be able to request IO capacity as well as CPU and RAM. For that to work, the throttling would have to be outside the App, as you are trying to limit code whether or not it wants to be, and because you probably want to give it more bandwidth if the system is otherwise idle. Self-throttling doesn't pick up spare IO

        • you can use cgroups in YARN to throttle local disk IO through the file:// URLs or the java filesystem APIs -such as for MR temp data
        • you can't c-group throttle HDFS per YARN container, which would be the ideal use case for it. The IO is taking place in the DN, and cgroups only limits IO in the throttled process group.
        • implementing it in the DN would require a lot more complex code there to prioritise work based on block ID (sole identifier that goes around everywhere) or input source (local sockets for HBase IO vs TCP stack)
        • One you go to a heterogenous filesystem you need to think about IO load per storage layer as well as/alongside per-volume
        • There's also generic RPC request throttle to prevent DoS against the NN and other HDFS services. That would need to be server side, but once implemented in the RPC code be universal.

        You also need to define what is the load you are trying to throttle, pure RPCs/second, read bandwidth, write bandwidth, seeks or IOPs. Once a file is lined up for sequential reading, you'd almost want it to stream through the next blocks until a high priority request came through, but operations like a seek which would involve a disk head movement backwards would be something to throttle (hence you need to be storage type aware as SSD seeks costs less). You also need to consider that although the cost of writes is high, it's usually being done with the goal of preserving data -and you don't want to impact durability.

        Show
        stevel@apache.org Steve Loughran added a comment - I've looked at it a bit within the context of YARN. YARN containers are where this would be ideal, as then you'd be able to request IO capacity as well as CPU and RAM. For that to work, the throttling would have to be outside the App, as you are trying to limit code whether or not it wants to be, and because you probably want to give it more bandwidth if the system is otherwise idle. Self-throttling doesn't pick up spare IO you can use cgroups in YARN to throttle local disk IO through the file:// URLs or the java filesystem APIs -such as for MR temp data you can't c-group throttle HDFS per YARN container, which would be the ideal use case for it. The IO is taking place in the DN, and cgroups only limits IO in the throttled process group. implementing it in the DN would require a lot more complex code there to prioritise work based on block ID (sole identifier that goes around everywhere) or input source (local sockets for HBase IO vs TCP stack) One you go to a heterogenous filesystem you need to think about IO load per storage layer as well as/alongside per-volume There's also generic RPC request throttle to prevent DoS against the NN and other HDFS services. That would need to be server side, but once implemented in the RPC code be universal. You also need to define what is the load you are trying to throttle, pure RPCs/second, read bandwidth, write bandwidth, seeks or IOPs. Once a file is lined up for sequential reading, you'd almost want it to stream through the next blocks until a high priority request came through, but operations like a seek which would involve a disk head movement backwards would be something to throttle (hence you need to be storage type aware as SSD seeks costs less). You also need to consider that although the cost of writes is high, it's usually being done with the goal of preserving data -and you don't want to impact durability.
        Hide
        andrew.wang Andrew Wang added a comment -

        Cross-posting my comments from the mailing-list thread here just in case. This was in reply to Steve's above comment.

        ========

        My research project (Cake, published at SoCC '12) was trying to provide SLAs for mixed workloads of latency-sensitive and throughput-bound applications, e.g. HBase running alongside MR. This was challenging because seeks are a real killer. Basically, we had to strongly limit MR I/O to keep worst-case seek latency down, and did so by putting schedulers on the RPC queues in HBase and HDFS to restrict queuing in the OS and disk where we lacked preemption.

        Regarding citations of note, most academics consider throughput-sharing to be a solved problem. It's not dissimilar from normal time slicing, you try to ensure fairness over some coarse timescale. I think cgroups and ioprio_set essentially provide this.

        Mixing throughput and latency though is difficult, and my conclusion is that there isn't a really great solution for spinning disks besides physical isolation. As we all know, you can get either IOPS or bandwidth, but not both, and it's not a linear tradeoff between the two. If you're interested in this though, I can dig up some related work from my Cake paper.

        However, since it seems that we're more concerned with throughput-bound apps, we might be okay just using cgroups and ioprio_set to do time-slicing. I actually hacked up some code a while ago which passed a client-provided priority byte to the DN, which used it to set the I/O priority of the handling DataXceiver accordingly. This isn't the most outlandish idea, since we've put QoS fields in our RPC protocol for instance; this would just be another byte. Short-circuit reads are outside this paradigm, but then you can use cgroup controls instead.

        My casual conversations with Googlers indicate that there isn't any special Borg/Omega sauce either, just that they heavily prioritize DFS I/O over non-DFS. Maybe that's another approach: if we can separate block management in HDFS, MR tasks could just write their output to a raw HDFS block, thus bringing a lot of I/O back into the fold of "datanode as I/O manager" for a machine.

        Overall, I strongly agree with you that it's important to first define what our goals are regarding I/O QoS. The general case is a tarpit, so it'd be good to carve off useful things that can be done now (like Lohit's direction of per-stream/FS throughput throttling with trusted clients) and then carefully grow the scope as we find more usecases we can confidently solve.

        Show
        andrew.wang Andrew Wang added a comment - Cross-posting my comments from the mailing-list thread here just in case. This was in reply to Steve's above comment. ======== My research project (Cake, published at SoCC '12) was trying to provide SLAs for mixed workloads of latency-sensitive and throughput-bound applications, e.g. HBase running alongside MR. This was challenging because seeks are a real killer. Basically, we had to strongly limit MR I/O to keep worst-case seek latency down, and did so by putting schedulers on the RPC queues in HBase and HDFS to restrict queuing in the OS and disk where we lacked preemption. Regarding citations of note, most academics consider throughput-sharing to be a solved problem. It's not dissimilar from normal time slicing, you try to ensure fairness over some coarse timescale. I think cgroups and ioprio_set essentially provide this. Mixing throughput and latency though is difficult, and my conclusion is that there isn't a really great solution for spinning disks besides physical isolation. As we all know, you can get either IOPS or bandwidth, but not both, and it's not a linear tradeoff between the two. If you're interested in this though, I can dig up some related work from my Cake paper. However, since it seems that we're more concerned with throughput-bound apps, we might be okay just using cgroups and ioprio_set to do time-slicing. I actually hacked up some code a while ago which passed a client-provided priority byte to the DN, which used it to set the I/O priority of the handling DataXceiver accordingly. This isn't the most outlandish idea, since we've put QoS fields in our RPC protocol for instance; this would just be another byte. Short-circuit reads are outside this paradigm, but then you can use cgroup controls instead. My casual conversations with Googlers indicate that there isn't any special Borg/Omega sauce either, just that they heavily prioritize DFS I/O over non-DFS. Maybe that's another approach: if we can separate block management in HDFS, MR tasks could just write their output to a raw HDFS block, thus bringing a lot of I/O back into the fold of "datanode as I/O manager" for a machine. Overall, I strongly agree with you that it's important to first define what our goals are regarding I/O QoS. The general case is a tarpit, so it'd be good to carve off useful things that can be done now (like Lohit's direction of per-stream/FS throughput throttling with trusted clients) and then carefully grow the scope as we find more usecases we can confidently solve.
        Hide
        haosdent@gmail.com haosdent added a comment -

        The QoS of Hadoop may be necessary, so I also focus on this recently. I found that cgroup only could limit the disk bandwidth(couldn't include I/O happends on cache) of thread or process. I am also developing a library(jcgroup) which make user use cgroup more convient on the JVM.

        Show
        haosdent@gmail.com haosdent added a comment - The QoS of Hadoop may be necessary, so I also focus on this recently. I found that cgroup only could limit the disk bandwidth(couldn't include I/O happends on cache) of thread or process. I am also developing a library( jcgroup ) which make user use cgroup more convient on the JVM.

          People

          • Assignee:
            Unassigned
            Reporter:
            lohit Lohit Vijayarenu
          • Votes:
            1 Vote for this issue
            Watchers:
            12 Start watching this issue

            Dates

            • Created:
              Updated:

              Development