Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.8.1
    • Fix Version/s: None
    • Component/s: core
    • Labels:

      Description

      It would be nice to implement a quota system in Kafka to improve our support for highly multi-tenant usage. The goal of this system would be to prevent one naughty user from accidently overloading the whole cluster.

      There are several quantities we would want to track:
      1. Requests pers second
      2. Bytes written per second
      3. Bytes read per second

      There are two reasonable groupings we would want to aggregate and enforce these thresholds at:
      1. Topic level
      2. Client level (e.g. by client id from the request)

      When a request hits one of these limits we will simply reject it with a QUOTA_EXCEEDED exception.

      To avoid suddenly breaking things without warning, we should ideally support two thresholds: a soft threshold at which we produce some kind of warning and a hard threshold at which we give the error. The soft threshold could just be defined as 80% (or whatever) of the hard threshold.

      There are nuances to getting this right. If you measure second-by-second a single burst may exceed the threshold, so we need a sustained measurement over a period of time.

      Likewise when do we stop giving this error? To make this work right we likely need to charge against the quota for request attempts not just successful requests. Otherwise a client that is overloading the server will just flap on and off--i.e. we would disable them for a period of time but when we re-enabled them they would likely still be abusing us.

      It would be good to a wiki design on how this would all work as a starting point for discussion.

        Activity

        Hide
        Jonathan Creasy added a comment -

        Kafka is using the Yammer/Coda Hale Metrics library now right?

        Would it be sufficient to track the three quantities by topic and client ID and take action if the 1/5/15-min load for that metric exceeded the thresholds defined? That is an EWMA so it would rise and taper off over time.

        Perhaps we could use an exponential back-off so that if you exceeded it once it would recover quickly and then after that take longer too cool-off before allowing the client again.

        Show
        Jonathan Creasy added a comment - Kafka is using the Yammer/Coda Hale Metrics library now right? Would it be sufficient to track the three quantities by topic and client ID and take action if the 1/5/15-min load for that metric exceeded the thresholds defined? That is an EWMA so it would rise and taper off over time. Perhaps we could use an exponential back-off so that if you exceeded it once it would recover quickly and then after that take longer too cool-off before allowing the client again.
        Hide
        Jay Kreps added a comment -

        Yes, that is basic idea. The concern with ewma is just the opacity of it as an SLA. I kind of prefer picking a hard window of (say) 1-5 seconds and caping the requests/messages/etc in that window to a hard, configurable limit (e.g. 1000k) per client or topic. This is somewhat more clear about why you are in violation. EWMA also means a weighting over an infinite history of prior values which is not intuitive. For example a long history of zero messages/second does not justify a 5 second burst of exceptional load. The value of ewma is to smooth the estimate when data is sparse (otherwise you end up measuring "sample variance" and for low traffic things get a line that jumps around erratically). However this is okay for SLA violation because anyone approaching their SLA doesn't have a data sparsity problem--they are making tons of requests.

        Show
        Jay Kreps added a comment - Yes, that is basic idea. The concern with ewma is just the opacity of it as an SLA. I kind of prefer picking a hard window of (say) 1-5 seconds and caping the requests/messages/etc in that window to a hard, configurable limit (e.g. 1000k) per client or topic. This is somewhat more clear about why you are in violation. EWMA also means a weighting over an infinite history of prior values which is not intuitive. For example a long history of zero messages/second does not justify a 5 second burst of exceptional load. The value of ewma is to smooth the estimate when data is sparse (otherwise you end up measuring "sample variance" and for low traffic things get a line that jumps around erratically). However this is okay for SLA violation because anyone approaching their SLA doesn't have a data sparsity problem--they are making tons of requests.
        Hide
        Jonathan Creasy added a comment -

        That makes sense. Thinking about this last night, we probably need to keep the quota stats in ZK in order to enforce them across the cluster, so that would make using Metrics harder also.

        Show
        Jonathan Creasy added a comment - That makes sense. Thinking about this last night, we probably need to keep the quota stats in ZK in order to enforce them across the cluster, so that would make using Metrics harder also.
        Hide
        Jay Kreps added a comment -

        Yeah this a bit of a dilemma. Doing it cluster-wide with low latency is pretty hard. Arguably the thing you want to protect is really the per-server load. That is to say the limit is that we can't have one machine taking more than X messages/sec--though X might be fine if spread over enough servers. However since in a sense the number of servers is something of an implementation detail it makes it harder to express to the user what the speed limit is (after all if we add more servers from their pov the speed limit just went up if it is a per-server number). Maybe the sane way to do it is in terms of per-partition load rather than servers or topic overall. Thoughts?

        Show
        Jay Kreps added a comment - Yeah this a bit of a dilemma. Doing it cluster-wide with low latency is pretty hard. Arguably the thing you want to protect is really the per-server load. That is to say the limit is that we can't have one machine taking more than X messages/sec--though X might be fine if spread over enough servers. However since in a sense the number of servers is something of an implementation detail it makes it harder to express to the user what the speed limit is (after all if we add more servers from their pov the speed limit just went up if it is a per-server number). Maybe the sane way to do it is in terms of per-partition load rather than servers or topic overall. Thoughts?
        Hide
        Jonathan Creasy added a comment -

        Thinking about it that way, keeping the quotas and enforcements per-server is much easier to implement both in terms of code, and in terms of a user setting the proper values for their environment.

        Doing it per partition and per client I think would be better than per server or per topic. And only tracking it on a broker by broker basis will be sufficient because as a user, I would set a level for what my broker could handle, and it might be different for different brokers if I have non-homogeneous set of servers.

        Show
        Jonathan Creasy added a comment - Thinking about it that way, keeping the quotas and enforcements per-server is much easier to implement both in terms of code, and in terms of a user setting the proper values for their environment. Doing it per partition and per client I think would be better than per server or per topic. And only tracking it on a broker by broker basis will be sufficient because as a user, I would set a level for what my broker could handle, and it might be different for different brokers if I have non-homogeneous set of servers.
        Hide
        Jonathan Creasy added a comment -

        I could implement the checks as meters in the Metrics class, then evaluate the meters vs. the configured limits. I could define a Healthcheck as a means to report clients/topics that are violating their quota.

        http://metrics.codahale.com/manual/core/#man-core-healthchecks

        Show
        Jonathan Creasy added a comment - I could implement the checks as meters in the Metrics class, then evaluate the meters vs. the configured limits. I could define a Healthcheck as a means to report clients/topics that are violating their quota. http://metrics.codahale.com/manual/core/#man-core-healthchecks
        Hide
        Jay Kreps added a comment -

        Yes, it would definitely make sense to use a metric for it since that will make it easier to monitor how close you are to the limit.

        One related patch is the dynamic per-topic config patch. I have a feeling that these quotas would definitely be the kind of thing you would want to update dynamically. See KAFKA-554.

        If you want to take a stab at it, that would be fantastic and I would be happy to help however I can. It would probably be good to start with a simple wiki of how it would work and get consensus on that.

        Here is what I was thinking, we could add a class something like
        class Quotas

        { def record(client: String, topic: String, bytesToRead: Long, bytesToWrite: Long) }

        The record() method would record the work done, and if we are over quota for that topic or client throw a QuotaExceededException. (We might need to split the record and the check, not sure).

        This class can be integrated in KafkaApis to do the appropriate checks for each API. We should probably apply the quota to all client-facing apis, even things like metadata fetch which do no real read or write. These would just count against your total request counter and could have the bytes arguments both set to 0.

        Show
        Jay Kreps added a comment - Yes, it would definitely make sense to use a metric for it since that will make it easier to monitor how close you are to the limit. One related patch is the dynamic per-topic config patch. I have a feeling that these quotas would definitely be the kind of thing you would want to update dynamically. See KAFKA-554 . If you want to take a stab at it, that would be fantastic and I would be happy to help however I can. It would probably be good to start with a simple wiki of how it would work and get consensus on that. Here is what I was thinking, we could add a class something like class Quotas { def record(client: String, topic: String, bytesToRead: Long, bytesToWrite: Long) } The record() method would record the work done, and if we are over quota for that topic or client throw a QuotaExceededException. (We might need to split the record and the check, not sure). This class can be integrated in KafkaApis to do the appropriate checks for each API. We should probably apply the quota to all client-facing apis, even things like metadata fetch which do no real read or write. These would just count against your total request counter and could have the bytes arguments both set to 0.
        Hide
        Jonathan Creasy added a comment -

        I'll get started on that Wiki, I see that 554 has been closed

        Show
        Jonathan Creasy added a comment - I'll get started on that Wiki, I see that 554 has been closed
        Show
        Jonathan Creasy added a comment - Obviously a work in progress. https://cwiki.apache.org/confluence/display/KAFKA/KAFKA-656+-+Quota+Design
        Hide
        Prashanth Menon added a comment -

        Hey Jonathan, have you made any head way on this? Let me know, I'd like to give this a go if you're tied up

        Show
        Prashanth Menon added a comment - Hey Jonathan, have you made any head way on this? Let me know, I'd like to give this a go if you're tied up
        Hide
        Jonathan Creasy added a comment -

        Go for it, I knew someone would likely get to this before I got back to it, I didn't really do anything on it yet.

        Show
        Jonathan Creasy added a comment - Go for it, I knew someone would likely get to this before I got back to it, I didn't really do anything on it yet.
        Hide
        Swapnil Ghike added a comment -

        Hey Prashanth/Jonathan, have you had the time to run with this? If you're busy, I would be interested in taking this up.

        Show
        Swapnil Ghike added a comment - Hey Prashanth/Jonathan, have you had the time to run with this? If you're busy, I would be interested in taking this up.

          People

          • Assignee:
            Unassigned
            Reporter:
            Jay Kreps
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:

              Development