Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.8.1
    • Fix Version/s: None
    • Component/s: core
    • Labels:

      Description

      It would be nice to implement a quota system in Kafka to improve our support for highly multi-tenant usage. The goal of this system would be to prevent one naughty user from accidently overloading the whole cluster.

      There are several quantities we would want to track:
      1. Requests pers second
      2. Bytes written per second
      3. Bytes read per second

      There are two reasonable groupings we would want to aggregate and enforce these thresholds at:
      1. Topic level
      2. Client level (e.g. by client id from the request)

      When a request hits one of these limits we will simply reject it with a QUOTA_EXCEEDED exception.

      To avoid suddenly breaking things without warning, we should ideally support two thresholds: a soft threshold at which we produce some kind of warning and a hard threshold at which we give the error. The soft threshold could just be defined as 80% (or whatever) of the hard threshold.

      There are nuances to getting this right. If you measure second-by-second a single burst may exceed the threshold, so we need a sustained measurement over a period of time.

      Likewise when do we stop giving this error? To make this work right we likely need to charge against the quota for request attempts not just successful requests. Otherwise a client that is overloading the server will just flap on and off--i.e. we would disable them for a period of time but when we re-enabled them they would likely still be abusing us.

      It would be good to a wiki design on how this would all work as a starting point for discussion.

        Activity

        Hide
        Jay Kreps added a comment -

        Another alternative which might make thing simpler would be to make the quota configuration per partition. This would avoid having to adapt it if the partition count changed. This is not ideal since I think people naturally think about data size at the topic level but at the moment all our size-based retention is done at the partition level so you could argue that this is more consistent.

        I think the other alternative which is a little bit more work is to store the per-topic quota in ZK and calculate quota/part-count to get the per partition limit. This per-partition limit would have to be updated when either the quota changed or the partition count changed. This might require a small bit of refactoring which I would be happy to walk you through if you end up going that route.

        Show
        Jay Kreps added a comment - Another alternative which might make thing simpler would be to make the quota configuration per partition. This would avoid having to adapt it if the partition count changed. This is not ideal since I think people naturally think about data size at the topic level but at the moment all our size-based retention is done at the partition level so you could argue that this is more consistent. I think the other alternative which is a little bit more work is to store the per-topic quota in ZK and calculate quota/part-count to get the per partition limit. This per-partition limit would have to be updated when either the quota changed or the partition count changed. This might require a small bit of refactoring which I would be happy to walk you through if you end up going that route.
        Hide
        Abhinav Anand added a comment -

        Hi Jay Kreps,
        Thats great, so I have a basic poc setup for the new producer and with ack=1. With the performance being at par with fire and forget producers, this would handle most of our use cases.

        I had one more query with regard to the configuration management for the quotas. As the number of partition is updated, we either update the quota configuration in zookeeper or if the partition count is maintained as a state variable on all broker, we can use it to dynamically calculate the quota. I like the idea of updating the quota configuration, every time the partitions are added. Let me know your inputs on this.

        Regards,
        Abhinav

        Show
        Abhinav Anand added a comment - Hi Jay Kreps , Thats great, so I have a basic poc setup for the new producer and with ack=1. With the performance being at par with fire and forget producers, this would handle most of our use cases. I had one more query with regard to the configuration management for the quotas. As the number of partition is updated, we either update the quota configuration in zookeeper or if the partition count is maintained as a state variable on all broker, we can use it to dynamically calculate the quota. I like the idea of updating the quota configuration, every time the partitions are added. Let me know your inputs on this. Regards, Abhinav
        Hide
        Jay Kreps added a comment -

        Hey Abhinav Anand, that is right--in general for a fire and forget producer we can only close the connection as no acknowledgement is returned. I think that is the right behavior here, although as in other error cases it is somewhat confusing as to why that happened.

        Starting with the new java producer we have implemented, though, there isn't much reason to use ack=0 vs ack=1 as the performance difference is extremely small since we improved the way we do I/O to eliminate any blocking.

        Show
        Jay Kreps added a comment - Hey Abhinav Anand , that is right--in general for a fire and forget producer we can only close the connection as no acknowledgement is returned. I think that is the right behavior here, although as in other error cases it is somewhat confusing as to why that happened. Starting with the new java producer we have implemented, though, there isn't much reason to use ack=0 vs ack=1 as the performance difference is extremely small since we improved the way we do I/O to eliminate any blocking.
        Hide
        Abhinav Anand added a comment -

        Hi Jay Kreps,

        I am trying to create a poc for this. Though I am not able to handle the scenario with producers where ack set to 0 (fire and forget producers). The approach that i see in the APIs is to close the connection to the producer, but this doesn't convey the reason for the connection close. I am not exactly sure if i am missing anything. I have tried scratching my head on this, though couldn't dig anything interesting in the codebase. It would great if you can shed some light on this ?

        Regards,
        Abhinav

        Show
        Abhinav Anand added a comment - Hi Jay Kreps , I am trying to create a poc for this. Though I am not able to handle the scenario with producers where ack set to 0 (fire and forget producers). The approach that i see in the APIs is to close the connection to the producer, but this doesn't convey the reason for the connection close. I am not exactly sure if i am missing anything. I have tried scratching my head on this, though couldn't dig anything interesting in the codebase. It would great if you can shed some light on this ? Regards, Abhinav
        Hide
        Abhinav Anand added a comment -

        Hi Jay Kreps,
        Thanks for the inputs. I am planning to start with the basic use case of per-topic bytes-written. Few questions that would help me make the code conform to the standards.
        1. As the number of partitions change, is the partition count maintained as a state variable on all broker or should we figure this out from zookeeper ?
        2. The kafka server uses the Yammer metrics, though the clients have their own metrics library (with the SampledStat etc). Is there any specific reason to do so ? I would avoid changing the metric design on the broker side.

        Regards,
        Abhinav

        Show
        Abhinav Anand added a comment - Hi Jay Kreps , Thanks for the inputs. I am planning to start with the basic use case of per-topic bytes-written. Few questions that would help me make the code conform to the standards. 1. As the number of partitions change, is the partition count maintained as a state variable on all broker or should we figure this out from zookeeper ? 2. The kafka server uses the Yammer metrics, though the clients have their own metrics library (with the SampledStat etc). Is there any specific reason to do so ? I would avoid changing the metric design on the broker side. Regards, Abhinav
        Hide
        Jay Kreps added a comment - - edited

        Hey Abhinav Anand a couple of suggestions:
        1. You could imagine a fairly complete quota system would involve all kinds of granularities at which you could enforce the quota (at the IP level, at the user level, etc). There are also all kinds of things we can quota: requests, bytes in, bytes out, etc. However for now let's just keep it simple. Let's just start with a per-topic bytes-written quota. I think this gives you 70% of what you want and will give us a chance to learn about it operationally before attempting something more complicated.
        2. The quota should be specified at the topic level but enforced at the partition level. I.e. if you specify 10MB/sec on a topic with 10 partitions then what we will enforce would be 1MB/sec per partition.
        3. We should make use of the topic-level configs to implement this. I.e. add a new configuration in LogConfig that defaults to an infinite quota.
        4. One piece of work that was done in anticipation of quotas was to combine the metrics and quota systems. This metrics package is in use on the clients now, but not yet on the server (it is under clients/src/main/org/apache/kafka/common/metrics I think). At a high-level the idea is to be able to enforce quotas on exactly the same things we monitor with metrics to make the reporting side of things easier. This code may actually do most of what the QuotaManager would have done, i.e. it will maintain all the metrics and each metric can have an optional quota associated, if the metric exceeds the quota it will throw an exception. Check this out and see if it makes sense in the way you were thinking of using it.

        Show
        Jay Kreps added a comment - - edited Hey Abhinav Anand a couple of suggestions: 1. You could imagine a fairly complete quota system would involve all kinds of granularities at which you could enforce the quota (at the IP level, at the user level, etc). There are also all kinds of things we can quota: requests, bytes in, bytes out, etc. However for now let's just keep it simple. Let's just start with a per-topic bytes-written quota. I think this gives you 70% of what you want and will give us a chance to learn about it operationally before attempting something more complicated. 2. The quota should be specified at the topic level but enforced at the partition level. I.e. if you specify 10MB/sec on a topic with 10 partitions then what we will enforce would be 1MB/sec per partition. 3. We should make use of the topic-level configs to implement this. I.e. add a new configuration in LogConfig that defaults to an infinite quota. 4. One piece of work that was done in anticipation of quotas was to combine the metrics and quota systems. This metrics package is in use on the clients now, but not yet on the server (it is under clients/src/main/org/apache/kafka/common/metrics I think). At a high-level the idea is to be able to enforce quotas on exactly the same things we monitor with metrics to make the reporting side of things easier. This code may actually do most of what the QuotaManager would have done, i.e. it will maintain all the metrics and each metric can have an optional quota associated, if the metric exceeds the quota it will throw an exception. Check this out and see if it makes sense in the way you were thinking of using it.
        Hide
        Abhinav Anand added a comment - - edited

        Hi Jay Kreps,

        We also want to avoid using zookeeper as dependency. With the number of producers being pretty high, I am planning to follow the approach that has been proposed in this thread. I can create a design to get my approach reviewed. I am creating a QuotaManager utility to decide the quota limits for each client at partition-topic level. and saving all the client topic/partition level information as metrics. Will throw QuotaExceeded exception and will catch it on the producer itself.
        Few questions ... Any known issues with high number of metrics ? Any thing that I should be aware of to avoid affecting producer performance ?
        Let me know any suggestions that you have, I will try to incorporate in the design.

        Regards,
        Abhinav

        Show
        Abhinav Anand added a comment - - edited Hi Jay Kreps , We also want to avoid using zookeeper as dependency. With the number of producers being pretty high, I am planning to follow the approach that has been proposed in this thread. I can create a design to get my approach reviewed. I am creating a QuotaManager utility to decide the quota limits for each client at partition-topic level. and saving all the client topic/partition level information as metrics. Will throw QuotaExceeded exception and will catch it on the producer itself. Few questions ... Any known issues with high number of metrics ? Any thing that I should be aware of to avoid affecting producer performance ? Let me know any suggestions that you have, I will try to incorporate in the design. Regards, Abhinav
        Hide
        Jay Kreps added a comment -

        Hey Abhinav Anand, I would rather not do explicit leases. The problem is that zookeeper is a very scarce resource so you have to then quota the lease updates themselves--basically it just seems complex. In any case, whether the mechanism is leases or error, the client needs some mechanism to throttle itself when it reaches the maximum of it's leased capacity or gets an error. I think the errors can do this naturally. Our new java producer actually already has a notion of a retry backoff, and I think we could generalize this to handle this case fairly easily. This will cause the client to produce backpressure when it hits its maximum.

        Show
        Jay Kreps added a comment - Hey Abhinav Anand , I would rather not do explicit leases. The problem is that zookeeper is a very scarce resource so you have to then quota the lease updates themselves--basically it just seems complex. In any case, whether the mechanism is leases or error, the client needs some mechanism to throttle itself when it reaches the maximum of it's leased capacity or gets an error. I think the errors can do this naturally. Our new java producer actually already has a notion of a retry backoff, and I think we could generalize this to handle this case fairly easily. This will cause the client to produce backpressure when it hits its maximum.
        Hide
        Abhinav Anand added a comment -

        H Jay Kreps Jonathan Creasy,
        Guys don't see much activity on this thread. We are looking into solving this problem in our company and trying to figure out ways to do it best. I looked into the suggestions proposed in this thread and I believe maintaining a state about observed rate of flow on each broker makes a very clear case.
        Are we only looking into solving this from the broker side by throwing quota exceeded exception?? Because in this scenario, we have no control over the producers and they can keep on sending the data to the Kafka brokers and bombard the network even after getting the quota exceeded warning and exceptions.
        We have few ideas on lease based service, where the producer requests for the lease and only sends data within the allotted quota. To send higher traffic, he would be required to renew his lease with the new bandwidth request ( we are trying to achieve this through a watcher node in zookeeper). Though this solution has issues, when a rogue producer sends messages to the system. We are in the design phase of our solution and it would be great to get inputs from you guys. It would be great if we can design something we can contribute back.

        Regards,
        Abhinav

        Show
        Abhinav Anand added a comment - H Jay Kreps Jonathan Creasy , Guys don't see much activity on this thread. We are looking into solving this problem in our company and trying to figure out ways to do it best. I looked into the suggestions proposed in this thread and I believe maintaining a state about observed rate of flow on each broker makes a very clear case. Are we only looking into solving this from the broker side by throwing quota exceeded exception?? Because in this scenario, we have no control over the producers and they can keep on sending the data to the Kafka brokers and bombard the network even after getting the quota exceeded warning and exceptions. We have few ideas on lease based service, where the producer requests for the lease and only sends data within the allotted quota. To send higher traffic, he would be required to renew his lease with the new bandwidth request ( we are trying to achieve this through a watcher node in zookeeper). Though this solution has issues, when a rogue producer sends messages to the system. We are in the design phase of our solution and it would be great to get inputs from you guys. It would be great if we can design something we can contribute back. Regards, Abhinav
        Hide
        Swapnil Ghike added a comment -

        Hey Prashanth/Jonathan, have you had the time to run with this? If you're busy, I would be interested in taking this up.

        Show
        Swapnil Ghike added a comment - Hey Prashanth/Jonathan, have you had the time to run with this? If you're busy, I would be interested in taking this up.
        Hide
        Jonathan Creasy added a comment -

        Go for it, I knew someone would likely get to this before I got back to it, I didn't really do anything on it yet.

        Show
        Jonathan Creasy added a comment - Go for it, I knew someone would likely get to this before I got back to it, I didn't really do anything on it yet.
        Hide
        Prashanth Menon added a comment -

        Hey Jonathan, have you made any head way on this? Let me know, I'd like to give this a go if you're tied up

        Show
        Prashanth Menon added a comment - Hey Jonathan, have you made any head way on this? Let me know, I'd like to give this a go if you're tied up
        Show
        Jonathan Creasy added a comment - Obviously a work in progress. https://cwiki.apache.org/confluence/display/KAFKA/KAFKA-656+-+Quota+Design
        Hide
        Jonathan Creasy added a comment -

        I'll get started on that Wiki, I see that 554 has been closed

        Show
        Jonathan Creasy added a comment - I'll get started on that Wiki, I see that 554 has been closed
        Hide
        Jay Kreps added a comment -

        Yes, it would definitely make sense to use a metric for it since that will make it easier to monitor how close you are to the limit.

        One related patch is the dynamic per-topic config patch. I have a feeling that these quotas would definitely be the kind of thing you would want to update dynamically. See KAFKA-554.

        If you want to take a stab at it, that would be fantastic and I would be happy to help however I can. It would probably be good to start with a simple wiki of how it would work and get consensus on that.

        Here is what I was thinking, we could add a class something like
        class Quotas

        { def record(client: String, topic: String, bytesToRead: Long, bytesToWrite: Long) }

        The record() method would record the work done, and if we are over quota for that topic or client throw a QuotaExceededException. (We might need to split the record and the check, not sure).

        This class can be integrated in KafkaApis to do the appropriate checks for each API. We should probably apply the quota to all client-facing apis, even things like metadata fetch which do no real read or write. These would just count against your total request counter and could have the bytes arguments both set to 0.

        Show
        Jay Kreps added a comment - Yes, it would definitely make sense to use a metric for it since that will make it easier to monitor how close you are to the limit. One related patch is the dynamic per-topic config patch. I have a feeling that these quotas would definitely be the kind of thing you would want to update dynamically. See KAFKA-554 . If you want to take a stab at it, that would be fantastic and I would be happy to help however I can. It would probably be good to start with a simple wiki of how it would work and get consensus on that. Here is what I was thinking, we could add a class something like class Quotas { def record(client: String, topic: String, bytesToRead: Long, bytesToWrite: Long) } The record() method would record the work done, and if we are over quota for that topic or client throw a QuotaExceededException. (We might need to split the record and the check, not sure). This class can be integrated in KafkaApis to do the appropriate checks for each API. We should probably apply the quota to all client-facing apis, even things like metadata fetch which do no real read or write. These would just count against your total request counter and could have the bytes arguments both set to 0.
        Hide
        Jonathan Creasy added a comment -

        I could implement the checks as meters in the Metrics class, then evaluate the meters vs. the configured limits. I could define a Healthcheck as a means to report clients/topics that are violating their quota.

        http://metrics.codahale.com/manual/core/#man-core-healthchecks

        Show
        Jonathan Creasy added a comment - I could implement the checks as meters in the Metrics class, then evaluate the meters vs. the configured limits. I could define a Healthcheck as a means to report clients/topics that are violating their quota. http://metrics.codahale.com/manual/core/#man-core-healthchecks
        Hide
        Jonathan Creasy added a comment -

        Thinking about it that way, keeping the quotas and enforcements per-server is much easier to implement both in terms of code, and in terms of a user setting the proper values for their environment.

        Doing it per partition and per client I think would be better than per server or per topic. And only tracking it on a broker by broker basis will be sufficient because as a user, I would set a level for what my broker could handle, and it might be different for different brokers if I have non-homogeneous set of servers.

        Show
        Jonathan Creasy added a comment - Thinking about it that way, keeping the quotas and enforcements per-server is much easier to implement both in terms of code, and in terms of a user setting the proper values for their environment. Doing it per partition and per client I think would be better than per server or per topic. And only tracking it on a broker by broker basis will be sufficient because as a user, I would set a level for what my broker could handle, and it might be different for different brokers if I have non-homogeneous set of servers.
        Hide
        Jay Kreps added a comment -

        Yeah this a bit of a dilemma. Doing it cluster-wide with low latency is pretty hard. Arguably the thing you want to protect is really the per-server load. That is to say the limit is that we can't have one machine taking more than X messages/sec--though X might be fine if spread over enough servers. However since in a sense the number of servers is something of an implementation detail it makes it harder to express to the user what the speed limit is (after all if we add more servers from their pov the speed limit just went up if it is a per-server number). Maybe the sane way to do it is in terms of per-partition load rather than servers or topic overall. Thoughts?

        Show
        Jay Kreps added a comment - Yeah this a bit of a dilemma. Doing it cluster-wide with low latency is pretty hard. Arguably the thing you want to protect is really the per-server load. That is to say the limit is that we can't have one machine taking more than X messages/sec--though X might be fine if spread over enough servers. However since in a sense the number of servers is something of an implementation detail it makes it harder to express to the user what the speed limit is (after all if we add more servers from their pov the speed limit just went up if it is a per-server number). Maybe the sane way to do it is in terms of per-partition load rather than servers or topic overall. Thoughts?
        Hide
        Jonathan Creasy added a comment -

        That makes sense. Thinking about this last night, we probably need to keep the quota stats in ZK in order to enforce them across the cluster, so that would make using Metrics harder also.

        Show
        Jonathan Creasy added a comment - That makes sense. Thinking about this last night, we probably need to keep the quota stats in ZK in order to enforce them across the cluster, so that would make using Metrics harder also.
        Hide
        Jay Kreps added a comment -

        Yes, that is basic idea. The concern with ewma is just the opacity of it as an SLA. I kind of prefer picking a hard window of (say) 1-5 seconds and caping the requests/messages/etc in that window to a hard, configurable limit (e.g. 1000k) per client or topic. This is somewhat more clear about why you are in violation. EWMA also means a weighting over an infinite history of prior values which is not intuitive. For example a long history of zero messages/second does not justify a 5 second burst of exceptional load. The value of ewma is to smooth the estimate when data is sparse (otherwise you end up measuring "sample variance" and for low traffic things get a line that jumps around erratically). However this is okay for SLA violation because anyone approaching their SLA doesn't have a data sparsity problem--they are making tons of requests.

        Show
        Jay Kreps added a comment - Yes, that is basic idea. The concern with ewma is just the opacity of it as an SLA. I kind of prefer picking a hard window of (say) 1-5 seconds and caping the requests/messages/etc in that window to a hard, configurable limit (e.g. 1000k) per client or topic. This is somewhat more clear about why you are in violation. EWMA also means a weighting over an infinite history of prior values which is not intuitive. For example a long history of zero messages/second does not justify a 5 second burst of exceptional load. The value of ewma is to smooth the estimate when data is sparse (otherwise you end up measuring "sample variance" and for low traffic things get a line that jumps around erratically). However this is okay for SLA violation because anyone approaching their SLA doesn't have a data sparsity problem--they are making tons of requests.
        Hide
        Jonathan Creasy added a comment -

        Kafka is using the Yammer/Coda Hale Metrics library now right?

        Would it be sufficient to track the three quantities by topic and client ID and take action if the 1/5/15-min load for that metric exceeded the thresholds defined? That is an EWMA so it would rise and taper off over time.

        Perhaps we could use an exponential back-off so that if you exceeded it once it would recover quickly and then after that take longer too cool-off before allowing the client again.

        Show
        Jonathan Creasy added a comment - Kafka is using the Yammer/Coda Hale Metrics library now right? Would it be sufficient to track the three quantities by topic and client ID and take action if the 1/5/15-min load for that metric exceeded the thresholds defined? That is an EWMA so it would rise and taper off over time. Perhaps we could use an exponential back-off so that if you exceeded it once it would recover quickly and then after that take longer too cool-off before allowing the client again.

          People

          • Assignee:
            Unassigned
            Reporter:
            Jay Kreps
          • Votes:
            2 Vote for this issue
            Watchers:
            11 Start watching this issue

            Dates

            • Created:
              Updated:

              Development