Cassandra
  1. Cassandra
  2. CASSANDRA-1035

Implement User/Keyspace throughput Scheduler

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Fix Version/s: 0.7 beta 1
    • Component/s: None
    • Labels:
      None

      Description

      To support multiple applications on top of a single Cassandra cluster (and to protect against badly behaving clients) having a very simple scheduler for client operations would be very beneficial.

      Since all tasks are short lived, a sufficient scheduler would probably only need to manage the queue of incoming requests, and weight them based on an assigned ID. The ID could be dynamically determined by using ip, userid or keyspace for instance, and then each Runnable would be assigned an ID.

      1. 1035-v5.patch
        35 kB
        Nirmal Ranganathan
      2. 1035-v4.txt
        33 kB
        Jonathan Ellis
      3. 1035-v3.patch
        35 kB
        Nirmal Ranganathan
      4. 1035-v2.txt
        36 kB
        Jonathan Ellis
      5. Cassandra-1035.patch
        40 kB
        Nirmal Ranganathan
      6. 0005-Add-options-for-throttling.patch
        23 kB
        Nirmal Ranganathan
      7. 0003-Avro-related-changes-for-RequestScheduler.patch
        7 kB
        Nirmal Ranganathan
      8. 0002-Thrift-related-changes-for-RequestScheduler-added-a-.patch
        4 kB
        Nirmal Ranganathan
      9. 0001-Adding-the-RequestScheduler-abstraction-and-a-simple.patch
        16 kB
        Nirmal Ranganathan
      10. 0004-Test-case-for-RoundRobinScheduler.patch
        6 kB
        Nirmal Ranganathan

        Activity

        Hide
        Jonathan Ellis added a comment -

        I don't think scheduling of MUTATION-STAGE can solve this; if a user or app is pouring enough ops in to affect QOS of others, then it's going to cause backpressure which is of necessity per-node not per-user-or-app.

        I think there would need to be some kind of rate-limiting at the coordinator node instead, since that's the only place you can backpressure user-or-app instead of entire peer nodes.

        Show
        Jonathan Ellis added a comment - I don't think scheduling of MUTATION-STAGE can solve this; if a user or app is pouring enough ops in to affect QOS of others, then it's going to cause backpressure which is of necessity per-node not per-user-or-app. I think there would need to be some kind of rate-limiting at the coordinator node instead, since that's the only place you can backpressure user-or-app instead of entire peer nodes.
        Hide
        Ryan King added a comment -

        Agreed that this should be done at the coordinator node.

        Show
        Ryan King added a comment - Agreed that this should be done at the coordinator node.
        Hide
        Stu Hood added a comment -

        Throttling on the coordinator makes a lot more sense, good point.

        I began exploring plugging in alternative ThreadPools in Thrift and Avro, but making the implementation consistent and trying to avoid patches to those projects is enough reason to drop that idea.

        Instead, how about converting StorageProxy into a non-static class, and pooling StorageProxies? To get a StorageProxy, you would check one out from an "IRequestScheduler" (name tbd). Since StorageProxy is stateless, the default impl of IRS could always return the same StorageProxy, without any queueing. An alternative IRS could implement the weighted scheduling described above.

        Thoughts?

        Show
        Stu Hood added a comment - Throttling on the coordinator makes a lot more sense, good point. I began exploring plugging in alternative ThreadPools in Thrift and Avro, but making the implementation consistent and trying to avoid patches to those projects is enough reason to drop that idea. Instead, how about converting StorageProxy into a non-static class, and pooling StorageProxies? To get a StorageProxy, you would check one out from an "IRequestScheduler" (name tbd). Since StorageProxy is stateless, the default impl of IRS could always return the same StorageProxy, without any queueing. An alternative IRS could implement the weighted scheduling described above. Thoughts?
        Hide
        Ryan King added a comment -

        I've always thought it was a little weird (or that I misunderstood something) that there wasn't a separate stage there. I would think it'd make sense to have a stage just for protocol handling and a separate one for the storage proxy stuff.

        Show
        Ryan King added a comment - I've always thought it was a little weird (or that I misunderstood something) that there wasn't a separate stage there. I would think it'd make sense to have a stage just for protocol handling and a separate one for the storage proxy stuff.
        Hide
        Stu Hood added a comment -

        The reason there isn't a stage is that Thrift and Avro independently manage their own client threads/selectors inside of a black box. They will attempt to get copies of StorageProxy from within their thread pool, so we can block their threads.

        Show
        Stu Hood added a comment - The reason there isn't a stage is that Thrift and Avro independently manage their own client threads/selectors inside of a black box. They will attempt to get copies of StorageProxy from within their thread pool, so we can block their threads.
        Hide
        Jonathan Ellis added a comment -

        so you would have some fairly large number of SP instances, say 1000, and app X would get 800, and app Y and Z would get 100 each, to tune QOS? That seems reasonable.

        Show
        Jonathan Ellis added a comment - so you would have some fairly large number of SP instances, say 1000, and app X would get 800, and app Y and Z would get 100 each, to tune QOS? That seems reasonable.
        Hide
        Jonathan Ellis added a comment -

        of course that assumes each app is doing the same amount of work per SP call, which is dubious in the presence of mutliget + batch_mutate.

        Show
        Jonathan Ellis added a comment - of course that assumes each app is doing the same amount of work per SP call, which is dubious in the presence of mutliget + batch_mutate.
        Hide
        Jeremy Hanna added a comment -

        For multi-tenant clusters, this will be very important so we're hoping that we can help get this done prior to the 0.7 release.

        Show
        Jeremy Hanna added a comment - For multi-tenant clusters, this will be very important so we're hoping that we can help get this done prior to the 0.7 release.
        Hide
        Nirmal Ranganathan added a comment -

        Having researched all the options, the only viable ones seem to be able to schedule based on keyspace. User scheduling can happen, but requires some change with the Auth API/classes. Just providers to get the username per se.

        There would be a configuration option to schedule based on user/keyspace/none (this would be the identifier)

        Open Question:
        1 - Scheduler returns TimedOutException if no available tokens for user/keyspace (considering a token based approach)
        2 - Scheduler blocks thread/request until a token is available and services the request or times out whichever happens first.

        Since we don't control the threads/thread-scheduling, the scheduler will have to maintain some sort of bucketing system and perform wait/notify, for a round-robin approach. Ideas welcome.

        Since this is node based, there's nothing stopping a client from hitting up another controller node and the request being re-routed back to the initial node if that has the data. CASSANDRA-685 would solve that, but I'm not sure of it's status.

        Show
        Nirmal Ranganathan added a comment - Having researched all the options, the only viable ones seem to be able to schedule based on keyspace. User scheduling can happen, but requires some change with the Auth API/classes. Just providers to get the username per se. There would be a configuration option to schedule based on user/keyspace/none (this would be the identifier) Open Question: 1 - Scheduler returns TimedOutException if no available tokens for user/keyspace (considering a token based approach) 2 - Scheduler blocks thread/request until a token is available and services the request or times out whichever happens first. Since we don't control the threads/thread-scheduling, the scheduler will have to maintain some sort of bucketing system and perform wait/notify, for a round-robin approach. Ideas welcome. Since this is node based, there's nothing stopping a client from hitting up another controller node and the request being re-routed back to the initial node if that has the data. CASSANDRA-685 would solve that, but I'm not sure of it's status.
        Hide
        Jonathan Ellis added a comment -

        the auth api is super alpha. if keyspace-based scheduling can be adequate for you then I strongly suggest going with that for 0.7.

        i don't see any benefit to throwing TOE. if client retries immediately it makes thing worse. simply blocking seems more natural imo.

        Show
        Jonathan Ellis added a comment - the auth api is super alpha. if keyspace-based scheduling can be adequate for you then I strongly suggest going with that for 0.7. i don't see any benefit to throwing TOE. if client retries immediately it makes thing worse. simply blocking seems more natural imo.
        Hide
        Nirmal Ranganathan added a comment -

        0001 contains the scheduler related new classes.
        0002 has the configuration related changes
        0003 has the CassandraServer related changes

        Show
        Nirmal Ranganathan added a comment - 0001 contains the scheduler related new classes. 0002 has the configuration related changes 0003 has the CassandraServer related changes
        Hide
        Stu Hood added a comment -

        Awesome work: this is a great first cut!

        • Should FairShareScheduler be renamed to RoundRobinScheduler, since it doesn't deal with time slices? (sorry, nitpick)
        • FairShareScheduler.getQueue is racy between contains/put/get: the 'queues' object should probably be replaced with NonBlockingHashMap, and putIfAbsent should be used instead of contains/put
        • Comments in conf/cassandra.yaml should indicate that this is specifically for client requests

        Also, similar changes need to be made to avro.CassandraDaemon and avro.CassandraServer: you may be able to separate RequestScheduler initialization into a method of service.AbstractCassandraDaemon for use in both subclasses.

        Show
        Stu Hood added a comment - Awesome work: this is a great first cut! Should FairShareScheduler be renamed to RoundRobinScheduler, since it doesn't deal with time slices? (sorry, nitpick) FairShareScheduler.getQueue is racy between contains/put/get: the 'queues' object should probably be replaced with NonBlockingHashMap, and putIfAbsent should be used instead of contains/put Comments in conf/cassandra.yaml should indicate that this is specifically for client requests Also, similar changes need to be made to avro.CassandraDaemon and avro.CassandraServer: you may be able to separate RequestScheduler initialization into a method of service.AbstractCassandraDaemon for use in both subclasses.
        Hide
        Nirmal Ranganathan added a comment -

        Renamed FairShare to RoundRobin and added the NonBlockingHashmap.

        0001 - Scheduler addition + configuration changes
        0002 - Thrift changes
        0003 - Avro Changes
        0004 - Test case for RoundRobinScheduler

        Show
        Nirmal Ranganathan added a comment - Renamed FairShare to RoundRobin and added the NonBlockingHashmap. 0001 - Scheduler addition + configuration changes 0002 - Thrift changes 0003 - Avro Changes 0004 - Test case for RoundRobinScheduler
        Hide
        Stu Hood added a comment - - edited

        Adding 0005 which fixes two small problems: Nirmal, can you double check the fixes?

        EDIT: Shoot... apparently I was so busy bikeshedding that I missed the big problem: the schedule() method doesn't limit the number of threads that it lets run simultaneously per id, so it won't actually perform any throttling. This will probably require a bit of refactoring. Sorry!

        Show
        Stu Hood added a comment - - edited Adding 0005 which fixes two small problems: Nirmal, can you double check the fixes? EDIT: Shoot... apparently I was so busy bikeshedding that I missed the big problem: the schedule() method doesn't limit the number of threads that it lets run simultaneously per id, so it won't actually perform any throttling. This will probably require a bit of refactoring. Sorry!
        Hide
        Nirmal Ranganathan added a comment -

        Yeah. the problem with the throttling is that there's no existing hooks to find out if a request has completed. We could however set a hard limit with some timing built in.

        Show
        Nirmal Ranganathan added a comment - Yeah. the problem with the throttling is that there's no existing hooks to find out if a request has completed. We could however set a hard limit with some timing built in.
        Hide
        Nirmal Ranganathan added a comment -

        Updating with the throttling options. It's ready for review now

        Show
        Nirmal Ranganathan added a comment - Updating with the throttling options. It's ready for review now
        Hide
        Stu Hood added a comment -

        +1 from me: throughput isn't affected with NoScheduler, and drops off by around 8% with unbounded throttling.

        stress.py default reads (1mm rows, 5 cols, 50 threads)

        nosched
        154 secs

        roundrobin (throttle_limit = 25)
        172 secs

        roundrobin (throttle_limit = 80/default)
        166 secs

        roundrobin (throttle_limit = 1000)
        167 secs

        Show
        Stu Hood added a comment - +1 from me: throughput isn't affected with NoScheduler, and drops off by around 8% with unbounded throttling. stress.py default reads (1mm rows, 5 cols, 50 threads) nosched 154 secs roundrobin (throttle_limit = 25) 172 secs roundrobin (throttle_limit = 80/default) 166 secs roundrobin (throttle_limit = 1000) 167 secs
        Hide
        Jonathan Ellis added a comment -

        can you rebase to latest trunk, please?

        Show
        Jonathan Ellis added a comment - can you rebase to latest trunk, please?
        Hide
        Nirmal Ranganathan added a comment -

        Rebased with trunk and have squashed all the various commits into one.

        Show
        Nirmal Ranganathan added a comment - Rebased with trunk and have squashed all the various commits into one.
        Hide
        Nirmal Ranganathan added a comment -

        There was another conflict with changes made to trunk late yesterday/today, rebasing to the latest updated trunk. Looks like it's mostly import statements that were causing merge conflicts.

        Show
        Nirmal Ranganathan added a comment - There was another conflict with changes made to trunk late yesterday/today, rebasing to the latest updated trunk. Looks like it's mostly import statements that were causing merge conflicts.
        Hide
        Jonathan Ellis added a comment -

        can't we simplify the wait/notify business in RRS with a blocking queue?

        (patch with some cosmetic cleanup attached.)

        Show
        Jonathan Ellis added a comment - can't we simplify the wait/notify business in RRS with a blocking queue? (patch with some cosmetic cleanup attached.)
        Hide
        Jonathan Ellis added a comment -

        also: having both a constructor and an initialize() method is clunky. let's stick to one or the other.

        Show
        Jonathan Ellis added a comment - also: having both a constructor and an initialize() method is clunky. let's stick to one or the other.
        Hide
        Jonathan Ellis added a comment -

        finally, I note that I've removed comments that just say "see the interface where this is defined" like

        + /* (non-Javadoc)
        + * @see org.apache.cassandra.scheduler.RequestScheduler#queue(java.lang.Thread, java.lang.Integer)
        + *
        + * Queue nothing as we don't want any scheduling to happen
        + */

        since they are basically the equivalent of

        i++; // increment i

        i.e., they don't add anything to our understanding of what's going on

        Show
        Jonathan Ellis added a comment - finally, I note that I've removed comments that just say "see the interface where this is defined" like + /* (non-Javadoc) + * @see org.apache.cassandra.scheduler.RequestScheduler#queue(java.lang.Thread, java.lang.Integer) + * + * Queue nothing as we don't want any scheduling to happen + */ since they are basically the equivalent of i++; // increment i i.e., they don't add anything to our understanding of what's going on
        Hide
        Nirmal Ranganathan added a comment -

        The wait/notify is controlled by a counting semaphore, which essentially acts like a blocking queue. Blocking queue would not fit this scenario, because we are queueing requests based on a queue per user/keyspace and then round robin thru each queue taking an element to process, or skipping if there's none. If we had to use a blocking queue, we would need to add those items into the queue, here the semaphore acts as a lock/count tracker.

        The reason behind the constructor and initialize was, the class needs to be class loaded during configuration, because the specific instance is provided via configuration. I just felt starting a thread at that point would change the purpose of the DatabaseDescriptor and hence had the initialize called in the thrift/avro server. If its ok to go into the DatabaseDescriptor, we can move the whole initialize section into the constructor.

        Show
        Nirmal Ranganathan added a comment - The wait/notify is controlled by a counting semaphore, which essentially acts like a blocking queue. Blocking queue would not fit this scenario, because we are queueing requests based on a queue per user/keyspace and then round robin thru each queue taking an element to process, or skipping if there's none. If we had to use a blocking queue, we would need to add those items into the queue, here the semaphore acts as a lock/count tracker. The reason behind the constructor and initialize was, the class needs to be class loaded during configuration, because the specific instance is provided via configuration. I just felt starting a thread at that point would change the purpose of the DatabaseDescriptor and hence had the initialize called in the thrift/avro server. If its ok to go into the DatabaseDescriptor, we can move the whole initialize section into the constructor.
        Hide
        Jonathan Ellis added a comment -

        Right, it looks like what you want is just a SynchronousQueue per keyspace. Manual wait/notify is more code and error-prone (see the javadoc for wait() – "A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup...")

        Go ahead and just initialize the scheduler in DD, we already do this with e.g. the partitioner.

        Show
        Jonathan Ellis added a comment - Right, it looks like what you want is just a SynchronousQueue per keyspace. Manual wait/notify is more code and error-prone (see the javadoc for wait() – "A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup...") Go ahead and just initialize the scheduler in DD, we already do this with e.g. the partitioner.
        Hide
        Nirmal Ranganathan added a comment -

        Ah ok, that works. Thanks that's made it much cleaner. I've removed the wait/notify and used a SynchronousQueue. I've also moved the initialization to DD and removed initialize().

        Show
        Nirmal Ranganathan added a comment - Ah ok, that works. Thanks that's made it much cleaner. I've removed the wait/notify and used a SynchronousQueue. I've also moved the initialization to DD and removed initialize().
        Hide
        Jonathan Ellis added a comment -

        v4 removes the second while (true) loop and takes schedule() private as an implementation detail.

        aren't queuesize, taskcount, and release unnecessary now? (what is the point in blocking for the taskcount semaphore instead of the synchronousqueue?)

        Show
        Jonathan Ellis added a comment - v4 removes the second while (true) loop and takes schedule() private as an implementation detail. aren't queuesize, taskcount, and release unnecessary now? (what is the point in blocking for the taskcount semaphore instead of the synchronousqueue?)
        Hide
        Nirmal Ranganathan added a comment -

        The taskcount is still needed. That's what sets the throttling. We won't be able to do that using the syncqueue. Because of the multiple queues per user/keyspace.

        Ex: Keyspace A may have incoming requests and B may not have any, in that case if we block on the syncqueue, it would block on B and not be able to process A's requests, until a B request arrives.
        More over when there's a flood of requests, the taskcount blocks until a release is called, whereas the syncqueue would just process everything. Note however, this won't be effective for async requests. But either way, it throttles the incoming requests.
        Also the RR is kinda fair RR in the sense of, 100 requests for A (throttle at say 40) wouldn't necessarily block a request from B, assuming all the requests arrive simultaneously.

        The queuesize was added to avoid a busy wait scenario, when there are no requests.

        Show
        Nirmal Ranganathan added a comment - The taskcount is still needed. That's what sets the throttling. We won't be able to do that using the syncqueue. Because of the multiple queues per user/keyspace. Ex: Keyspace A may have incoming requests and B may not have any, in that case if we block on the syncqueue, it would block on B and not be able to process A's requests, until a B request arrives. More over when there's a flood of requests, the taskcount blocks until a release is called, whereas the syncqueue would just process everything. Note however, this won't be effective for async requests. But either way, it throttles the incoming requests. Also the RR is kinda fair RR in the sense of, 100 requests for A (throttle at say 40) wouldn't necessarily block a request from B, assuming all the requests arrive simultaneously. The queuesize was added to avoid a busy wait scenario, when there are no requests.
        Hide
        Nirmal Ranganathan added a comment -

        Reordered the semaphores in RR.schedule() a little in v5

        Show
        Nirmal Ranganathan added a comment - Reordered the semaphores in RR.schedule() a little in v5
        Hide
        Stu Hood added a comment -

        v5 is ever so slightly slower than the wait/notify approach, but correctness and simplicity triumph. +1 from me!

        stress.py default reads (1mm rows, 5 cols, 50 threads)

        nosched
        139 secs

        roundrobin (throttle_limit = 80/default)
        153 secs

        Show
        Stu Hood added a comment - v5 is ever so slightly slower than the wait/notify approach, but correctness and simplicity triumph. +1 from me! stress.py default reads (1mm rows, 5 cols, 50 threads) nosched 139 secs roundrobin (throttle_limit = 80/default) 153 secs
        Hide
        Jonathan Ellis added a comment -

        rebased and committed

        Show
        Jonathan Ellis added a comment - rebased and committed
        Hide
        Hudson added a comment -
        Show
        Hudson added a comment - Integrated in Cassandra #491 (See http://hudson.zones.apache.org/hudson/job/Cassandra/491/ )

          People

          • Assignee:
            Nirmal Ranganathan
            Reporter:
            Stu Hood
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development