Cassandra
  1. Cassandra
  2. CASSANDRA-4718

More-efficient ExecutorService for improved throughput

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Fix Version/s: 2.1 rc1
    • Component/s: None
    • Labels:

      Description

      Currently all our execution stages dequeue tasks one at a time. This can result in contention between producers and consumers (although we do our best to minimize this by using LinkedBlockingQueue).

      One approach to mitigating this would be to make consumer threads do more work in "bulk" instead of just one task per dequeue. (Producer threads tend to be single-task oriented by nature, so I don't see an equivalent opportunity there.)

      BlockingQueue has a drainTo(collection, int) method that would be perfect for this. However, no ExecutorService in the jdk supports using drainTo, nor could I google one.

      What I would like to do here is create just such a beast and wire it into (at least) the write and read stages. (Other possible candidates for such an optimization, such as the CommitLog and OutboundTCPConnection, are not ExecutorService-based and will need to be one-offs.)

      AbstractExecutorService may be useful. The implementations of ICommitLogExecutorService may also be useful. (Despite the name these are not actual ExecutorServices, although they share the most important properties of one.)

      1. E600M_summary_key_s.svg
        8 kB
        Benedict
      2. E100M_summary_key_s.svg
        8 kB
        Benedict
      3. E10M_summary_key_s.svg
        7 kB
        Benedict
      4. jason_run3.svg
        7 kB
        Benedict
      5. jason_run2.svg
        7 kB
        Benedict
      6. jason_run1.svg
        7 kB
        Benedict
      7. austin_diskbound_read.svg
        12 kB
        Benedict
      8. stress_2014May16.txt
        10 kB
        Jason Brown
      9. stress_2014May15.txt
        11 kB
        Jason Brown
      10. aws_read.svg
        8 kB
        Benedict
      11. jason_write.svg
        8 kB
        Benedict
      12. jason_read_latency.svg
        11 kB
        Benedict
      13. jason_read.svg
        8 kB
        Benedict
      14. aws.svg
        9 kB
        Benedict
      15. belliotsmith_branches-stress.out.txt
        10 kB
        Jason Brown
      16. backpressure-stress.out.txt
        13 kB
        Jason Brown
      17. 4718-v1.patch
        23 kB
        Jason Brown
      18. v1-stress.out
        11 kB
        Jason Brown
      19. stress op rate with various queues.ods
        37 kB
        Benedict
      20. op costs of various queues.ods
        28 kB
        Benedict
      21. baq vs trunk.png
        44 kB
        Ryan McGuire
      22. PerThreadQueue.java
        3 kB
        Vijay

        Issue Links

          Activity

          Jonathan Ellis created issue -
          Hide
          Jonathan Ellis added a comment -

          (See CASSANDRA-1632 for the genesis of this idea.)

          Show
          Jonathan Ellis added a comment - (See CASSANDRA-1632 for the genesis of this idea.)
          Hide
          Michaël Figuière added a comment -

          I'd suggest to make the maxElements to "drain" somewhat proportional to the size of the queue, using maybe some thresholds. This would mostly be about taking only one element per thread from the queue when it's almost empty to avoid serializing tasks processing, and draining several when it's more full. The expected behavior would be to keep latency low at light/nominal load and increasing throughput at heavy load.

          Show
          Michaël Figuière added a comment - I'd suggest to make the maxElements to "drain" somewhat proportional to the size of the queue, using maybe some thresholds. This would mostly be about taking only one element per thread from the queue when it's almost empty to avoid serializing tasks processing, and draining several when it's more full. The expected behavior would be to keep latency low at light/nominal load and increasing throughput at heavy load.
          Hide
          Jonathan Ellis added a comment -

          That's reasonable, but it's worth noting that Queue.size() is not always constant time (notably in JDK7 LinkedTransferQueue, which we will want to evaluate vs LBQ).

          Show
          Jonathan Ellis added a comment - That's reasonable, but it's worth noting that Queue.size() is not always constant time (notably in JDK7 LinkedTransferQueue, which we will want to evaluate vs LBQ).
          Hide
          Michaël Figuière added a comment -

          Right, an Atomic counter should then be more appropriate to track the current amount of tasks in queue. It won't be updated atomically with the actual size of the queue but no big deal for this purpose...

          Show
          Michaël Figuière added a comment - Right, an Atomic counter should then be more appropriate to track the current amount of tasks in queue. It won't be updated atomically with the actual size of the queue but no big deal for this purpose...
          Hide
          Jonathan Ellis added a comment -

          I posted a quick translation of LBQ to LTQ at https://github.com/jbellis/cassandra/branches/ltq.

          Show
          Jonathan Ellis added a comment - I posted a quick translation of LBQ to LTQ at https://github.com/jbellis/cassandra/branches/ltq .
          Hide
          Brandon Williams added a comment - - edited

          Performance tests conclude that LTQ is significantly (15%+) slower for both reads and writes

          Show
          Brandon Williams added a comment - - edited Performance tests conclude that LTQ is significantly (15%+) slower for both reads and writes
          Hide
          Nate McCall added a comment -
          Show
          Nate McCall added a comment - Sorta related - Guava's Futures class has some interesting grouping functionality: http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Futures.html#allAsList(java.lang.Iterable )
          Hide
          Vijay added a comment - - edited

          How about Per thread queue? was curious about this ticket and ran a test.
          I have to also note that the stages content when we have a lot of CPU's (was testing with 64 core machines)

          Show
          Vijay added a comment - - edited How about Per thread queue? was curious about this ticket and ran a test. I have to also note that the stages content when we have a lot of CPU's (was testing with 64 core machines)
          Vijay made changes -
          Field Original Value New Value
          Attachment PerThreadQueue.java [ 12559762 ]
          Hide
          Vijay added a comment -

          Sorry take I take it back.... it is not good @ smaller numbers.... Had a bug in my previous run.

          Making JIT happy
          TimeTaken: 230
          Real test starts!
          running per thread exec Test!
          TimeTaken: 165
          running Plain old exec Test!
          TimeTaken: 54
          Test with multiple of 10!
          TimeTaken: 1519
          running Plain old exec Test!
          TimeTaken: 690
          Test with multiple of 100!
          TimeTaken: 15357
          running Plain old exec Test!
          TimeTaken: 30131
          
          Show
          Vijay added a comment - Sorry take I take it back.... it is not good @ smaller numbers.... Had a bug in my previous run. Making JIT happy TimeTaken: 230 Real test starts! running per thread exec Test! TimeTaken: 165 running Plain old exec Test! TimeTaken: 54 Test with multiple of 10! TimeTaken: 1519 running Plain old exec Test! TimeTaken: 690 Test with multiple of 100! TimeTaken: 15357 running Plain old exec Test! TimeTaken: 30131
          Vijay made changes -
          Attachment PerThreadQueue.java [ 12559762 ]
          Vijay made changes -
          Attachment PerThreadQueue.java [ 12559767 ]
          Gavin made changes -
          Workflow no-reopen-closed, patch-avail [ 12727091 ] patch-available, re-open possible [ 12753701 ]
          Gavin made changes -
          Workflow patch-available, re-open possible [ 12753701 ] reopen-resolved, no closed status, patch-avail, testing [ 12758882 ]
          Hide
          Jonathan Ellis added a comment - - edited

          Another quick translation, this time to Jetty's BlockingArrayQueue: https://github.com/jbellis/cassandra/tree/baq

          (Edit: fixed branch url)

          Show
          Jonathan Ellis added a comment - - edited Another quick translation, this time to Jetty's BlockingArrayQueue: https://github.com/jbellis/cassandra/tree/baq (Edit: fixed branch url)
          Hide
          Ryan McGuire added a comment - - edited

          I ran a performance test comparing jonathan's baq branch with trunk and saw a substantial increase in read performance. See http://goo.gl/ZeUS6 or see attached png.

          Show
          Ryan McGuire added a comment - - edited I ran a performance test comparing jonathan's baq branch with trunk and saw a substantial increase in read performance. See http://goo.gl/ZeUS6 or see attached png.
          Ryan McGuire made changes -
          Attachment baq vs trunk.png [ 12569008 ]
          Hide
          T Jake Luciani added a comment -

          Stu Hood mentions http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html
          which is only in java7 but that's required for 2.0 no?

          Show
          T Jake Luciani added a comment - Stu Hood mentions http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html which is only in java7 but that's required for 2.0 no?
          Hide
          Nitsan Wakart added a comment -

          OK, off the bat without knowing anything about nothing (following on from twitter chat with Jake Luciani ‏@tjake):
          I understand your process to be a pipeline of work being performed in stages such that tasks have to complete one stage to go to the next. You need some fast queue mechanism to deliver tasks between stages with minimum contention. Two approaches come to mind:
          1. To build on existing frameworks you will probably want to look at the Disruptor (https://github.com/LMAX-Exchange/disruptor) and thread affinity (https://github.com/peter-lawrey/Java-Thread-Affinity) this is going to work well for rigid processes with an abundance of CPUs to play with, but with planning can also work well for more restrictive environments.
          2. Go wild and build something new!!! the typical work pool model will have all the workers contending for work, this will create contention over the head of the queue. We can try for a new algorithm where we have some way of removing this contention for example by having an work arbitration thread deliver work to the least busy worker or some other crazy scheme. I plan to give it a go and see what happens
          Option 1 is sure to deliver improved performance over your JDK variety of collections. Thread affinity will help deliver more predictable results but will require some layout algorithm for the available resources(i.e. make a different plan for a dual socket 8 core than a 4 socket 4 core machine). Affinity is likely to prove important in any case, but I am not aware of any public framework making use of it.
          An overlooked aspect of the Disruptor is the fact that it is a queue and an object pool merged into one. To make the most of it you will need to have your pipelined events form some super structure which will cycle through the process and then get reused.
          In any case, here's an initial brain dump, happy to get involved in the solution if this sounds interesting.

          Show
          Nitsan Wakart added a comment - OK, off the bat without knowing anything about nothing (following on from twitter chat with Jake Luciani ‏@tjake): I understand your process to be a pipeline of work being performed in stages such that tasks have to complete one stage to go to the next. You need some fast queue mechanism to deliver tasks between stages with minimum contention. Two approaches come to mind: 1. To build on existing frameworks you will probably want to look at the Disruptor ( https://github.com/LMAX-Exchange/disruptor ) and thread affinity ( https://github.com/peter-lawrey/Java-Thread-Affinity ) this is going to work well for rigid processes with an abundance of CPUs to play with, but with planning can also work well for more restrictive environments. 2. Go wild and build something new!!! the typical work pool model will have all the workers contending for work, this will create contention over the head of the queue. We can try for a new algorithm where we have some way of removing this contention for example by having an work arbitration thread deliver work to the least busy worker or some other crazy scheme. I plan to give it a go and see what happens Option 1 is sure to deliver improved performance over your JDK variety of collections. Thread affinity will help deliver more predictable results but will require some layout algorithm for the available resources(i.e. make a different plan for a dual socket 8 core than a 4 socket 4 core machine). Affinity is likely to prove important in any case, but I am not aware of any public framework making use of it. An overlooked aspect of the Disruptor is the fact that it is a queue and an object pool merged into one. To make the most of it you will need to have your pipelined events form some super structure which will cycle through the process and then get reused. In any case, here's an initial brain dump, happy to get involved in the solution if this sounds interesting.
          Hide
          Jonathan Ellis added a comment -

          Let me give a little more color as to what our existing stages are. Most of these are ThreadPoolExecutors connected by LinkedBlockingQueue.

          A client sends each request to a node in the cluster called the Coordinator. The coordinator stages are

          1. Request: either Thrift or Netty reads the request from the client
          2. StorageProxy: the coordinator validates the request and decides which replicas need to be contacted
          3. MessagingService (out): the coordinator sends the requests to the appropriate replicas
          4. MessagingService (in): the coordinator reads the reply
          5. Response: the coordinator processes callbacks for the reply
          6. StorageProxy: this thread will have been waiting on a Future or a Condition for the callbacks, and can now reply to the client

          When a replica receives a message, it also goes through a few stages:

          1. MessagingService (in): the replica reads the coordinator's request
          2. Read or Write: fetch or append the data specified by the request
          3. MessagingService (out): the replica sends the result to the coordinator

          So the obstacles I see to incorporating Disruptor are

          • MessagingService. This is an exception to the rule in that it is not actually a ThreadPoolExecutor; we have a custom thread pool per replica that does some gymnastics to keep its queue from growing indefinitely when a replica gets behind (CASSANDRA-3005). MS uses blocking sockets; long ago, we observed this to give better performance than NIO. I'd be willing to evaluate redoing this on e.g. Netty, but:
          • More generally, requests are not constant-size, which makes disruptor Entry re-use difficult
          • The read stage is basically forced to be a separate thread pool because of blocking i/o from disk
          • StorageProxy is not yet asynchronous

          Addressing the last of these is straightforward, but the others give me pause.

          What I'd like to do is pick part of the system and see if converting that to Disruptor gives a big enough win to be worth pursuing with a full-scale conversion, but given how Disruptor wants to manage everything I'm not sure how to do that either!

          Show
          Jonathan Ellis added a comment - Let me give a little more color as to what our existing stages are. Most of these are ThreadPoolExecutors connected by LinkedBlockingQueue. A client sends each request to a node in the cluster called the Coordinator. The coordinator stages are Request: either Thrift or Netty reads the request from the client StorageProxy: the coordinator validates the request and decides which replicas need to be contacted MessagingService (out): the coordinator sends the requests to the appropriate replicas MessagingService (in): the coordinator reads the reply Response: the coordinator processes callbacks for the reply StorageProxy: this thread will have been waiting on a Future or a Condition for the callbacks, and can now reply to the client When a replica receives a message, it also goes through a few stages: MessagingService (in): the replica reads the coordinator's request Read or Write: fetch or append the data specified by the request MessagingService (out): the replica sends the result to the coordinator So the obstacles I see to incorporating Disruptor are MessagingService. This is an exception to the rule in that it is not actually a ThreadPoolExecutor; we have a custom thread pool per replica that does some gymnastics to keep its queue from growing indefinitely when a replica gets behind ( CASSANDRA-3005 ). MS uses blocking sockets; long ago, we observed this to give better performance than NIO. I'd be willing to evaluate redoing this on e.g. Netty, but: More generally, requests are not constant-size, which makes disruptor Entry re-use difficult The read stage is basically forced to be a separate thread pool because of blocking i/o from disk StorageProxy is not yet asynchronous Addressing the last of these is straightforward, but the others give me pause. What I'd like to do is pick part of the system and see if converting that to Disruptor gives a big enough win to be worth pursuing with a full-scale conversion, but given how Disruptor wants to manage everything I'm not sure how to do that either!
          Hide
          Darach Ennis added a comment -

          +1 to Nitsan Wakart's observations w.r.t. disruptor and thread affinity, although YMMV on Mac OS X w.r.t. affinity.
          I would take Java-Thread-Affinity with a pinch of salt on OS X w.r.t claims of something equivalent to true affinity. Sounds like disruptor is a good candidate here though given Jonathan Ellis's added color.

          An aside, LMAX have recently open sourced a coalescing ring buffer that uses similar underlying techniques to
          the disruptor. It's designed for event streams that tend to benefit from coalescing or combining in some way. Perhaps this may suit some use cases. If not it may still serve as inspiration / color.

          https://github.com/LMAX-Exchange/LMAXCollections/tree/master/CoalescingRingBuffer/src/com/lmax/collections/coalescing/ring/buffer

          Show
          Darach Ennis added a comment - +1 to Nitsan Wakart 's observations w.r.t. disruptor and thread affinity, although YMMV on Mac OS X w.r.t. affinity. I would take Java-Thread-Affinity with a pinch of salt on OS X w.r.t claims of something equivalent to true affinity. Sounds like disruptor is a good candidate here though given Jonathan Ellis 's added color. An aside, LMAX have recently open sourced a coalescing ring buffer that uses similar underlying techniques to the disruptor. It's designed for event streams that tend to benefit from coalescing or combining in some way. Perhaps this may suit some use cases. If not it may still serve as inspiration / color. https://github.com/LMAX-Exchange/LMAXCollections/tree/master/CoalescingRingBuffer/src/com/lmax/collections/coalescing/ring/buffer
          Hide
          Jonathan Ellis added a comment -

          LMAX have recently open sourced a coalescing ring buffer that uses similar underlying techniques to the disruptor

          More for the curious: http://nickzeeb.wordpress.com/2013/03/07/the-coalescing-ring-buffer/

          "It is a component that we have written in Java to efficiently buffer messages between a producer and a consumer thread where only the latest value for a given topic is of interest. All other messages can be discarded immediately."

          Show
          Jonathan Ellis added a comment - LMAX have recently open sourced a coalescing ring buffer that uses similar underlying techniques to the disruptor More for the curious: http://nickzeeb.wordpress.com/2013/03/07/the-coalescing-ring-buffer/ "It is a component that we have written in Java to efficiently buffer messages between a producer and a consumer thread where only the latest value for a given topic is of interest. All other messages can be discarded immediately."
          Hide
          Nitsan Wakart added a comment -

          I'd suggest this is a less mature offering than the Disruptor at this point. Still, worth a read.

          Show
          Nitsan Wakart added a comment - I'd suggest this is a less mature offering than the Disruptor at this point. Still, worth a read.
          Hide
          Piotr Kołaczkowski added a comment -

          Another thing to consider might be using a high-performance Actor library e.g. Akka.

          I did a quick microbenchmark to see what is the latency of just passing a single message through several stages, in 3 variants:

          1. Sync: one threadpool per stage, where some coordinator thread just moves message from one ExecutorService to another, after the stage finished processing
          2. Async: one threadpool per stage, where every stage directly asynchronously pushes its result into the next stage
          3. Akka: one Akka actor per stage, where every stage directly asynchronously pushes its result into the next stage

          The clear winner is Akka:

          2 stages: 
          Sync:     38717 ns
          Async:    36159 ns
          Akka:     12969 ns
          
          4 stages: 
          Sync:     65793 ns
          Async:    49964 ns
          Akka:     18516 ns
          
          8 stages: 
          Sync:    162256 ns
          Async:   100009 ns
          Akka:      9237 ns
          
          16 stages: 
          Sync:    296951 ns
          Async:   183588 ns
          Akka:     13574 ns
          
          32 stages: 
          Sync:    572605 ns
          Async:   361959 ns
          Akka:     23344 ns
          

          Code of the benchmark:

          package pl.pk.messaging
          
          import java.util.concurrent.{CountDownLatch, Executors}
          import akka.actor.{Props, ActorSystem, Actor, ActorRef}
          
          
          class Message {
            var counter = 0
            val latch = new CountDownLatch(1)
          }
          
          abstract class MultistageThreadPoolProcessor(stageCount: Int) {
          
            val stages =
              for (i <- 1 to stageCount) yield Executors.newCachedThreadPool()
          
            def shutdown() {
              stages.foreach(_.shutdown())
            }
          
          }
          
          /** Synchronously processes a message through the stages.
            * The message is passed stage-to-stage by the coordinator thread. */
          class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) {
          
            def process() {
          
              val message = new Message
          
              val task = new Runnable() {
                def run() { message.counter += 1 }
              }
          
              for (executor <- stages)
                executor.submit(task).get()
            }
          }
          
          /** Asynchronously processes a message through the stages.
            * Every stage after finishing its processing of the message
            * passes the message directly to the next stage, without bothering the coordinator thread. */
          class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) {
          
            def process() {
          
              val message = new Message
          
              val task = new Runnable() {
                def run() {
                  message.counter += 1
                  if (message.counter >= stages.size)
                    message.latch.countDown()
                  else
                    stages(message.counter).submit(this)
                }
              }
          
              stages(0).submit(task)
              message.latch.await()
            }
          }
          
          /** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools and queues.
            * Every stage after finishing its processing of the message
            * passes the message directly to the next stage, without bothering the coordinator thread. */
          class AkkaProcessor(stageCount: Int) {
          
            val system = ActorSystem()
          
            val stages: IndexedSeq[ActorRef] = {
              for (i <- 1 to stageCount) yield system.actorOf(Props(new Actor {
                def receive = {
                  case m: Message =>
                    m.counter += 1
                    if (m.counter >= stages.size)
                      m.latch.countDown()
                    else
                      stages(m.counter) ! m
                }
              }))
            }
          
            def process() {
              val message = new Message
              stages(0) ! message
              message.latch.await()
            }
          
            def shutdown() {
              system.shutdown()
            }
          
          }
          
          
          
          object MessagingBenchmark extends App {
          
            def measureLatency(count: Int, f: () => Any): Double = {
              val start = System.nanoTime()
              for (i <- 1 to count)
                f()
              val end = System.nanoTime()
              (end - start).toDouble / count
            }
          
            val messageCount = 200000
            for (stageCount <- List(2,4,8,16,32))
            {
              printf("\n%d stages: \n", stageCount)
              val syncProcessor = new SyncThreadPoolProcessor(stageCount)
              val asyncProcessor = new AsyncThreadPoolProcessor(stageCount)
              val akkaProcessor = new AkkaProcessor(stageCount)
          
              printf("Sync:  %8.0f ns\n", measureLatency(messageCount, syncProcessor.process))
              printf("Async: %8.0f ns\n", measureLatency(messageCount, asyncProcessor.process))
              printf("Akka:  %8.0f ns\n", measureLatency(messageCount, akkaProcessor.process))
          
              syncProcessor.shutdown()
              asyncProcessor.shutdown()
              akkaProcessor.shutdown()
            }
          }
          
          Show
          Piotr Kołaczkowski added a comment - Another thing to consider might be using a high-performance Actor library e.g. Akka. I did a quick microbenchmark to see what is the latency of just passing a single message through several stages, in 3 variants: 1. Sync: one threadpool per stage, where some coordinator thread just moves message from one ExecutorService to another, after the stage finished processing 2. Async: one threadpool per stage, where every stage directly asynchronously pushes its result into the next stage 3. Akka: one Akka actor per stage, where every stage directly asynchronously pushes its result into the next stage The clear winner is Akka: 2 stages: Sync: 38717 ns Async: 36159 ns Akka: 12969 ns 4 stages: Sync: 65793 ns Async: 49964 ns Akka: 18516 ns 8 stages: Sync: 162256 ns Async: 100009 ns Akka: 9237 ns 16 stages: Sync: 296951 ns Async: 183588 ns Akka: 13574 ns 32 stages: Sync: 572605 ns Async: 361959 ns Akka: 23344 ns Code of the benchmark: package pl.pk.messaging import java.util.concurrent.{CountDownLatch, Executors} import akka.actor.{Props, ActorSystem, Actor, ActorRef} class Message { var counter = 0 val latch = new CountDownLatch(1) } abstract class MultistageThreadPoolProcessor(stageCount: Int) { val stages = for (i <- 1 to stageCount) yield Executors.newCachedThreadPool() def shutdown() { stages.foreach(_.shutdown()) } } /** Synchronously processes a message through the stages. * The message is passed stage-to-stage by the coordinator thread. */ class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) { def process() { val message = new Message val task = new Runnable() { def run() { message.counter += 1 } } for (executor <- stages) executor.submit(task).get() } } /** Asynchronously processes a message through the stages. * Every stage after finishing its processing of the message * passes the message directly to the next stage, without bothering the coordinator thread. */ class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) { def process() { val message = new Message val task = new Runnable() { def run() { message.counter += 1 if (message.counter >= stages.size) message.latch.countDown() else stages(message.counter).submit(this) } } stages(0).submit(task) message.latch.await() } } /** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools and queues. * Every stage after finishing its processing of the message * passes the message directly to the next stage, without bothering the coordinator thread. */ class AkkaProcessor(stageCount: Int) { val system = ActorSystem() val stages: IndexedSeq[ActorRef] = { for (i <- 1 to stageCount) yield system.actorOf(Props(new Actor { def receive = { case m: Message => m.counter += 1 if (m.counter >= stages.size) m.latch.countDown() else stages(m.counter) ! m } })) } def process() { val message = new Message stages(0) ! message message.latch.await() } def shutdown() { system.shutdown() } } object MessagingBenchmark extends App { def measureLatency(count: Int, f: () => Any): Double = { val start = System.nanoTime() for (i <- 1 to count) f() val end = System.nanoTime() (end - start).toDouble / count } val messageCount = 200000 for (stageCount <- List(2,4,8,16,32)) { printf("\n%d stages: \n", stageCount) val syncProcessor = new SyncThreadPoolProcessor(stageCount) val asyncProcessor = new AsyncThreadPoolProcessor(stageCount) val akkaProcessor = new AkkaProcessor(stageCount) printf("Sync: %8.0f ns\n", measureLatency(messageCount, syncProcessor.process)) printf("Async: %8.0f ns\n", measureLatency(messageCount, asyncProcessor.process)) printf("Akka: %8.0f ns\n", measureLatency(messageCount, akkaProcessor.process)) syncProcessor.shutdown() asyncProcessor.shutdown() akkaProcessor.shutdown() } }
          Hide
          Piotr Kołaczkowski added a comment - - edited

          Interesting thing, that after boosting the number of threads that invoke the process() method from 1 to 16, Akka gets slower, while thread-pool per stage approach gets faster.

          16 user threads invoking process(), 4 core i7 with HT (8-virtual cores):

          Huh, Sergio Bossa found a bug in my multithreaded benchmark...

          Show
          Piotr Kołaczkowski added a comment - - edited Interesting thing, that after boosting the number of threads that invoke the process() method from 1 to 16, Akka gets slower, while thread-pool per stage approach gets faster. 16 user threads invoking process(), 4 core i7 with HT (8-virtual cores): Huh, Sergio Bossa found a bug in my multithreaded benchmark...
          Hide
          Piotr Kołaczkowski added a comment - - edited

          I made another version of benchmark, according to Sergio's suggestions. Now it uses the following message processing graph:

                           
             /------ stage 0 processor 0  ----\         /----           ----\                  /---           ---\
             +------ stage 0 processor 1  ----+         +----           ----+                  +---           ---+
          >--+------ stage 0 processor 2  ----+---->----+----  STAGE 1  ----+------>- ... --->-+---  STAGE m  ---+----->
             +------ ...                  ----+         +----           ----+                  +---           ---+
             \------ stage 0 processor n  ----/         \----           ----/                  \---           ---/
          

          128 threads are concurrently trying to get messages through all the stages and measure average latency, including the time required for the message to enter stage 0.
          Thread-pool stages are built from fixed size thread pools with n=8, because there are 8 cores.
          Actor-based stages are build from 128 actors each with a RoundRobinRouter in front of every stage.

          Average latencies:

          3 stages: 
          Sync:    364687 ns
          Async:   210766 ns
          Akka:    201842 ns
          
          4 stages: 
          Sync:    492581 ns
          Async:   221118 ns
          Akka:    239407 ns
          
          5 stages: 
          Sync:    671733 ns
          Async:   245370 ns
          Akka:    283798 ns
          
          6 stages: 
          Sync:    781759 ns
          Async:   262742 ns
          Akka:    309384 ns
          

          So Akka comes slightly slower than async thread pools.

          If someone wants to play with my code, here is the up-to-date version:

          import java.util.concurrent.{CountDownLatch, Executors}
          import akka.actor.{Props, ActorSystem, Actor, ActorRef}
          import akka.routing.{SmallestMailboxRouter, RoundRobinRouter}
          
          
          class Message {
            var counter = 0
            val latch = new CountDownLatch(1)
          }
          
          abstract class MultistageThreadPoolProcessor(stageCount: Int) {
          
            val stages =
              for (i <- 1 to stageCount) yield Executors.newFixedThreadPool(8)
          
            def shutdown() {
              stages.foreach(_.shutdown())
            }
          
          }
          
          /** Synchronously processes a message through the stages.
            * The message is passed stage-to-stage by the coordinator thread. */
          class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) {
          
            def process() {
          
              val message = new Message
          
              val task = new Runnable() {
                def run() { message.counter += 1 }
              }
          
              for (executor <- stages)
                executor.submit(task).get()
            }
          }
          
          /** Asynchronously processes a message through the stages.
            * Every stage after finishing its processing of the message
            * passes the message directly to the next stage, without bothering the coordinator thread. */
          class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) {
          
            def process() {
          
              val message = new Message
          
              val task = new Runnable() {
                def run() {
                  message.counter += 1
                  if (message.counter >= stages.size)
                    message.latch.countDown()
                  else
                    stages(message.counter).submit(this)
                }
              }
          
              stages(0).submit(task)
              message.latch.await()
            }
          }
          
          /** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools and queues.
            * Every stage after finishing its processing of the message
            * passes the message directly to the next stage, without bothering the coordinator thread. */
          class AkkaProcessor(stageCount: Int) {
          
            val system = ActorSystem()
          
            val stages: IndexedSeq[ActorRef] = {
              for (i <- 1 to stageCount) yield
                system.actorOf(Props(createActor()).withRouter(RoundRobinRouter(nrOfInstances = 128)))
            }
          
            def createActor(): Actor = {
              new Actor {
          
                def receive = {
                  case m: Message =>
                    m.counter += 1
                    if (m.counter >= stages.size)
                      m.latch.countDown()
                    else
                      stages(m.counter) ! m
                }
              }
            }
          
            def process() {
              val message = new Message
              stages(0) ! message
              message.latch.await()
            }
          
            def shutdown() {
              system.shutdown()
            }
          
          }
          
          
          
          object MessagingBenchmark extends App {
          
            def measureLatency(count: Int, f: () => Any): Double = {
              val start = System.nanoTime()
              for (i <- 1 to count)
                f()
              val end = System.nanoTime()
              (end - start).toDouble / count
            }
          
            def measureLatency(threadCount: Int, messageCount: Int, f: () => Any): Double = {
          
              class RequestThread extends Thread {
                var latency: Double = 0.0
                override def run() { latency = measureLatency(messageCount, f) }
              }
          
              val threads =
                for (i <- 1 to threadCount) yield new RequestThread()
          
              threads.foreach(_.start())
              threads.foreach(_.join())
          
              threads.map(_.latency).sum / threads.size
            }
          
          
            val messageCount = 50000
            for (stageCount <- List(3,3,4,5,6,7,8,16,32))
            {
              printf("\n%d stages: \n", stageCount)
              val syncProcessor = new SyncThreadPoolProcessor(stageCount)
              val asyncProcessor = new AsyncThreadPoolProcessor(stageCount)
              val akkaProcessor = new AkkaProcessor(stageCount)
          
              printf("Sync:  %8.0f ns\n", measureLatency(128, messageCount, syncProcessor.process))
              printf("Async: %8.0f ns\n", measureLatency(128, messageCount, asyncProcessor.process))
              printf("Akka:  %8.0f ns\n", measureLatency(128, messageCount, akkaProcessor.process))
          
              syncProcessor.shutdown()
              asyncProcessor.shutdown()
              akkaProcessor.shutdown()
            }
          }
          
          
          Show
          Piotr Kołaczkowski added a comment - - edited I made another version of benchmark, according to Sergio's suggestions. Now it uses the following message processing graph: /------ stage 0 processor 0 ----\ /---- ----\ /--- ---\ +------ stage 0 processor 1 ----+ +---- ----+ +--- ---+ >--+------ stage 0 processor 2 ----+---->----+---- STAGE 1 ----+------>- ... --->-+--- STAGE m ---+-----> +------ ... ----+ +---- ----+ +--- ---+ \------ stage 0 processor n ----/ \---- ----/ \--- ---/ 128 threads are concurrently trying to get messages through all the stages and measure average latency, including the time required for the message to enter stage 0. Thread-pool stages are built from fixed size thread pools with n=8, because there are 8 cores. Actor-based stages are build from 128 actors each with a RoundRobinRouter in front of every stage. Average latencies: 3 stages: Sync: 364687 ns Async: 210766 ns Akka: 201842 ns 4 stages: Sync: 492581 ns Async: 221118 ns Akka: 239407 ns 5 stages: Sync: 671733 ns Async: 245370 ns Akka: 283798 ns 6 stages: Sync: 781759 ns Async: 262742 ns Akka: 309384 ns So Akka comes slightly slower than async thread pools. If someone wants to play with my code, here is the up-to-date version: import java.util.concurrent.{CountDownLatch, Executors} import akka.actor.{Props, ActorSystem, Actor, ActorRef} import akka.routing.{SmallestMailboxRouter, RoundRobinRouter} class Message { var counter = 0 val latch = new CountDownLatch(1) } abstract class MultistageThreadPoolProcessor(stageCount: Int) { val stages = for (i <- 1 to stageCount) yield Executors.newFixedThreadPool(8) def shutdown() { stages.foreach(_.shutdown()) } } /** Synchronously processes a message through the stages. * The message is passed stage-to-stage by the coordinator thread. */ class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) { def process() { val message = new Message val task = new Runnable() { def run() { message.counter += 1 } } for (executor <- stages) executor.submit(task).get() } } /** Asynchronously processes a message through the stages. * Every stage after finishing its processing of the message * passes the message directly to the next stage, without bothering the coordinator thread. */ class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) { def process() { val message = new Message val task = new Runnable() { def run() { message.counter += 1 if (message.counter >= stages.size) message.latch.countDown() else stages(message.counter).submit(this) } } stages(0).submit(task) message.latch.await() } } /** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools and queues. * Every stage after finishing its processing of the message * passes the message directly to the next stage, without bothering the coordinator thread. */ class AkkaProcessor(stageCount: Int) { val system = ActorSystem() val stages: IndexedSeq[ActorRef] = { for (i <- 1 to stageCount) yield system.actorOf(Props(createActor()).withRouter(RoundRobinRouter(nrOfInstances = 128))) } def createActor(): Actor = { new Actor { def receive = { case m: Message => m.counter += 1 if (m.counter >= stages.size) m.latch.countDown() else stages(m.counter) ! m } } } def process() { val message = new Message stages(0) ! message message.latch.await() } def shutdown() { system.shutdown() } } object MessagingBenchmark extends App { def measureLatency(count: Int, f: () => Any): Double = { val start = System.nanoTime() for (i <- 1 to count) f() val end = System.nanoTime() (end - start).toDouble / count } def measureLatency(threadCount: Int, messageCount: Int, f: () => Any): Double = { class RequestThread extends Thread { var latency: Double = 0.0 override def run() { latency = measureLatency(messageCount, f) } } val threads = for (i <- 1 to threadCount) yield new RequestThread() threads.foreach(_.start()) threads.foreach(_.join()) threads.map(_.latency).sum / threads.size } val messageCount = 50000 for (stageCount <- List(3,3,4,5,6,7,8,16,32)) { printf("\n%d stages: \n", stageCount) val syncProcessor = new SyncThreadPoolProcessor(stageCount) val asyncProcessor = new AsyncThreadPoolProcessor(stageCount) val akkaProcessor = new AkkaProcessor(stageCount) printf("Sync: %8.0f ns\n", measureLatency(128, messageCount, syncProcessor.process)) printf("Async: %8.0f ns\n", measureLatency(128, messageCount, asyncProcessor.process)) printf("Akka: %8.0f ns\n", measureLatency(128, messageCount, akkaProcessor.process)) syncProcessor.shutdown() asyncProcessor.shutdown() akkaProcessor.shutdown() } }
          Hide
          Marcus Eriksson added a comment -

          ftr, im very much -1 on using scala in cassandra (dont know if you suggest that even)

          i know it is supposed to interface nicely with java code, but it generally becomes a huge hairy part of the code base that noone wants to touch

          Show
          Marcus Eriksson added a comment - ftr, im very much -1 on using scala in cassandra (dont know if you suggest that even) i know it is supposed to interface nicely with java code, but it generally becomes a huge hairy part of the code base that noone wants to touch
          Hide
          Piotr Kołaczkowski added a comment - - edited

          I'm not suggesting using scala in C* nor anywhere. It was just quicker for me to write a throw-away benchmark and Akka was already bundled. I hope it is understandable even to those not knowing Scala. BTW: Akka has a native Java API, too. Although, looking at the numbers, I'm not necessarily convinced it would help for that ticket now.

          Show
          Piotr Kołaczkowski added a comment - - edited I'm not suggesting using scala in C* nor anywhere. It was just quicker for me to write a throw-away benchmark and Akka was already bundled. I hope it is understandable even to those not knowing Scala. BTW: Akka has a native Java API, too. Although, looking at the numbers, I'm not necessarily convinced it would help for that ticket now.
          Hide
          Marcus Eriksson added a comment -

          unless the java api has improved alot the last year or so, the code will be horrible

          Show
          Marcus Eriksson added a comment - unless the java api has improved alot the last year or so, the code will be horrible
          Show
          Jonathan Ellis added a comment - Pavel Yaskevich Thoughts on my comment above? https://issues.apache.org/jira/browse/CASSANDRA-4718?focusedCommentId=13629447&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13629447
          Hide
          Pavel Yaskevich added a comment -

          I think even though we aren't in the most cache friendly behavior with variable size RMs we can still utilize better dispatch behavior with low cost synchronization. We can't do anything about blocking I/O operations requiring separate thread but I think it's time to re-evaluate NIO async sockets vs. having thread per in/out connection.

          Show
          Pavel Yaskevich added a comment - I think even though we aren't in the most cache friendly behavior with variable size RMs we can still utilize better dispatch behavior with low cost synchronization. We can't do anything about blocking I/O operations requiring separate thread but I think it's time to re-evaluate NIO async sockets vs. having thread per in/out connection.
          Hide
          Jonathan Ellis added a comment -

          Sounds like a reasonable place to start.

          Show
          Jonathan Ellis added a comment - Sounds like a reasonable place to start.
          Jonathan Ellis made changes -
          Labels performance
          Benedict made changes -
          Attachment op costs of various queues.ods [ 12606276 ]
          Jason Brown made changes -
          Assignee Jason Brown [ jasobrown ]
          Hide
          darion yaphets added a comment -

          LMAX Disruptor's RingBuffer maybe a good idea for lock free component
          But maybe set a bigger size for hold the structure in ring buffer to avoid cover by new one
          And is meaning to use more memory ...

          Show
          darion yaphets added a comment - LMAX Disruptor's RingBuffer maybe a good idea for lock free component But maybe set a bigger size for hold the structure in ring buffer to avoid cover by new one And is meaning to use more memory ...
          Benedict made changes -
          Attachment stress op rate with various queues.ods [ 12607210 ]
          Hide
          Benedict added a comment -

          Disruptors are very difficult to use as a drop in replacement for the executor service, so I tried to knock up some queues that could provide similar performance without ripping apart the whole application. The resulting queues I benchmarked under high load, in isolation, against LinkedBlockingQueue, BlockingArrayQueue and the Disruptor, and plotted the average op costs in the "op costs of various queues" attachment*. As can be seen, these queues and the Disruptor are substantially faster under high load than LinkedBlockingQueue, however it can also be seen that:

          • The average op cost for LinkedBlockingQueue is still very low, in fact only around 300ns at worst
          • BlockingArrayQueue is considerably worse than LinkedBlockingQueue under all conditions

          These suggest both that the overhead attributed to LinkedBlockingQueue for a 1Mop workload (as run above) should be at most a few seconds of the overall cost (probably much less); and that BlockingArrayQueue is unlikely to make any cost incurred by LinkedBlockingQueue substantially better. This made me suspect the previous result might be attributable to random variance, but to be sure I ran a number of ccm -stress tests with the different queues, and plotted the results in "stress op rate with various queues.ods", which show the following:

          1) No meaningful difference between BAQ, LBQ and SlowQueue (though the latter has a clear ~1% slow down)
          2) UltraSlow (~10x slow down, or 2000ns spinning each op) is approximately 5% slower
          3) The faster queue actually slows down the process, by about 9% - more than the queue supposedly much slower than it!

          Anyway, I've been concurrently looking at where I might be able to improve performance independent of this, and have found the following:

          A) Raw performance of local reads is ~6-7x faster than through Stress
          B) Raw performance of local reads run asynchronously is ~4x faster
          C) Raw performance of local reads run asynchronously using the fast queue is ~4.7x faster
          D) Performance of local reads from the Thrift server-side methods is ~3x faster
          E) Performance of remote (i.e. local non-optimised) reads is ~1.5x faster

          In particular (C) is interesting, as it demonstrates the queue really is faster in use, but I've yet to absolutely determine why that translates into an overall decline in throughput. It looks as though it's possible it causes greater congestion in LockSupport.unpark(), but this is a new piece of information, derived from YourKit. As these sorts of methods are difficult to meter accurately I don't necessarily trust it, and haven't had a chance to figure out what I can do with the information. If it is accurate, and I can figure out how to reduce the overhead, we might get a modest speed boost, which will accumulate as we find other places to improve.

          As to the overall problem of improving throughput, it seems to me that there are two big avenues to explore:

          1) the networking (software) overhead is large;
          2) possibly the cost of managing thread liveness (e.g. park/unpark/scheduler costs); though the evidence for this is as yet inconclusive... given the op rate and other evidence it doesn't seem to be synchronization overhead. I'm still trying to pin this down.

          Once the costs here are nailed down as tight as they can go, I'm pretty confident we can get some noticeable improvements to the actual work being done, but since that currently accounts for only a fraction of the time spent (probably less than 20%), I'd rather wait until it was a higher percentage so any improvement is multiplied.

          • These can be replicated by running org.apache.cassandra.concurrent.test.bench.Benchmark on any of the linked branches on github.

          https://github.com/belliottsmith/cassandra/tree/4718-lbq [using LinkedBlockingQueue]
          https://github.com/belliottsmith/cassandra/tree/4718-baq [using BlockingArrayQueue]
          https://github.com/belliottsmith/cassandra/tree/4718-lpbq [using a new high performance queue]
          https://github.com/belliottsmith/cassandra/tree/4718-slow [using a LinkedBlockingQueue with 200ns spinning each op]
          https://github.com/belliottsmith/cassandra/tree/4718-ultraslow [using a LinkedBlockingQueue with 2000ns spinning each op]

          Show
          Benedict added a comment - Disruptors are very difficult to use as a drop in replacement for the executor service, so I tried to knock up some queues that could provide similar performance without ripping apart the whole application. The resulting queues I benchmarked under high load, in isolation, against LinkedBlockingQueue, BlockingArrayQueue and the Disruptor, and plotted the average op costs in the "op costs of various queues" attachment*. As can be seen, these queues and the Disruptor are substantially faster under high load than LinkedBlockingQueue, however it can also be seen that: The average op cost for LinkedBlockingQueue is still very low, in fact only around 300ns at worst BlockingArrayQueue is considerably worse than LinkedBlockingQueue under all conditions These suggest both that the overhead attributed to LinkedBlockingQueue for a 1Mop workload (as run above) should be at most a few seconds of the overall cost (probably much less); and that BlockingArrayQueue is unlikely to make any cost incurred by LinkedBlockingQueue substantially better. This made me suspect the previous result might be attributable to random variance, but to be sure I ran a number of ccm -stress tests with the different queues, and plotted the results in "stress op rate with various queues.ods", which show the following: 1) No meaningful difference between BAQ, LBQ and SlowQueue (though the latter has a clear ~1% slow down) 2) UltraSlow (~10x slow down, or 2000ns spinning each op) is approximately 5% slower 3) The faster queue actually slows down the process, by about 9% - more than the queue supposedly much slower than it! Anyway, I've been concurrently looking at where I might be able to improve performance independent of this, and have found the following: A) Raw performance of local reads is ~6-7x faster than through Stress B) Raw performance of local reads run asynchronously is ~4x faster C) Raw performance of local reads run asynchronously using the fast queue is ~4.7x faster D) Performance of local reads from the Thrift server-side methods is ~3x faster E) Performance of remote (i.e. local non-optimised) reads is ~1.5x faster In particular (C) is interesting, as it demonstrates the queue really is faster in use, but I've yet to absolutely determine why that translates into an overall decline in throughput. It looks as though it's possible it causes greater congestion in LockSupport.unpark(), but this is a new piece of information, derived from YourKit. As these sorts of methods are difficult to meter accurately I don't necessarily trust it, and haven't had a chance to figure out what I can do with the information. If it is accurate, and I can figure out how to reduce the overhead, we might get a modest speed boost, which will accumulate as we find other places to improve. As to the overall problem of improving throughput, it seems to me that there are two big avenues to explore: 1) the networking (software) overhead is large; 2) possibly the cost of managing thread liveness (e.g. park/unpark/scheduler costs); though the evidence for this is as yet inconclusive... given the op rate and other evidence it doesn't seem to be synchronization overhead. I'm still trying to pin this down. Once the costs here are nailed down as tight as they can go, I'm pretty confident we can get some noticeable improvements to the actual work being done, but since that currently accounts for only a fraction of the time spent (probably less than 20%), I'd rather wait until it was a higher percentage so any improvement is multiplied. These can be replicated by running org.apache.cassandra.concurrent.test.bench.Benchmark on any of the linked branches on github. https://github.com/belliottsmith/cassandra/tree/4718-lbq [using LinkedBlockingQueue] https://github.com/belliottsmith/cassandra/tree/4718-baq [using BlockingArrayQueue] https://github.com/belliottsmith/cassandra/tree/4718-lpbq [using a new high performance queue] https://github.com/belliottsmith/cassandra/tree/4718-slow [using a LinkedBlockingQueue with 200ns spinning each op] https://github.com/belliottsmith/cassandra/tree/4718-ultraslow [using a LinkedBlockingQueue with 2000ns spinning each op]
          Hide
          Jonathan Ellis added a comment -

          The faster queue actually slows down the process, by about 9% - more than the queue supposedly much slower than it

          So this actually confirms Ryan's original measurement of C*/BAQ [slow queue] faster than C*/LBQ [fast queue]?

          Show
          Jonathan Ellis added a comment - The faster queue actually slows down the process, by about 9% - more than the queue supposedly much slower than it So this actually confirms Ryan's original measurement of C*/BAQ [slow queue] faster than C*/LBQ [fast queue] ?
          Hide
          Benedict added a comment -

          Not necessarily. I still think that was most likely variance:

          • I have BAQ at same speed as LBQ in application
          • a 2x slow down of LBQ -> 0.01x slow down of application
          • a 10x slow down of LBQ -> 0.05x slow down of application

          => the queue speed is currently only ~1% of application cost. It's possible the faster queue is causing greater contention at a sync point, but this wouldn't work in the opposite direction if the contention at the sync point is low. Either way, if this were true we'd see the artificially slow queues also improve stress performance.

          Ryan also ran some of my tests and found no difference. I wouldn't absolutely rule out the possibility his test was valid, though, as I did not swap out the queues in OutboundTcpConnection for these tests as, at the time, I was concerned about the calls to size() which are expensive for my test queues, and I wanted the queue swap to be on equal terms across the board. I realise now these are only called via JMX, so shouldn't stop me swapping them in.

          I've just tried a quick test of directly (in process) stressing through the MessagingService and found no measureable difference to putting BAQ in the OutboundTcpConnection, though if I swap out across the board it is about 25% slower, which itself is interesting as this is close to a full stress, minus thrift.

          Show
          Benedict added a comment - Not necessarily. I still think that was most likely variance: I have BAQ at same speed as LBQ in application a 2x slow down of LBQ -> 0.01x slow down of application a 10x slow down of LBQ -> 0.05x slow down of application => the queue speed is currently only ~1% of application cost. It's possible the faster queue is causing greater contention at a sync point, but this wouldn't work in the opposite direction if the contention at the sync point is low. Either way, if this were true we'd see the artificially slow queues also improve stress performance. Ryan also ran some of my tests and found no difference. I wouldn't absolutely rule out the possibility his test was valid, though, as I did not swap out the queues in OutboundTcpConnection for these tests as, at the time, I was concerned about the calls to size() which are expensive for my test queues, and I wanted the queue swap to be on equal terms across the board. I realise now these are only called via JMX, so shouldn't stop me swapping them in. I've just tried a quick test of directly (in process) stressing through the MessagingService and found no measureable difference to putting BAQ in the OutboundTcpConnection, though if I swap out across the board it is about 25% slower, which itself is interesting as this is close to a full stress, minus thrift.
          Hide
          Jonathan Ellis added a comment -

          What's the latest on this? I think Jason had some work with FJP he was testing out...

          Show
          Jonathan Ellis added a comment - What's the latest on this? I think Jason had some work with FJP he was testing out...
          Hide
          Jason Brown added a comment -

          Ha, just this week I was untangling my branch for CASSANDRA-1632, which included the FJP work. Should be able to get to this one next week after more performance testing.

          Show
          Jason Brown added a comment - Ha, just this week I was untangling my branch for CASSANDRA-1632 , which included the FJP work. Should be able to get to this one next week after more performance testing.
          Jason Brown made changes -
          Link This issue is related to CASSANDRA-1632 [ CASSANDRA-1632 ]
          Hide
          Jason Brown added a comment -

          OK, looks like my initial stab at switching over to FJP netted about 10-15% throughput increase, and mixed results on the latency scores (sometimes better, sometimes on par with trunk). I'm going run some more perf tests this weekend, and will decide how to proceed early next week - but the initial results do look promising. I've only tested the thrift endpoints so far, but when I retest this weekend, I'll throw in the cql3/native protocol, as well.

          Here's my current working branch: https://github.com/jasobrown/cassandra/tree/4718_fjp . Note, it's very hacked up/WIP as I wanted to confirm the performance benefits before making everything happy (read: metrics pools). Also, I modified Pavel Yaskevich's thrift-disruptor lib, for this: https://github.com/jasobrown/disruptor_thrift_server/tree/4718_fjp.

          Show
          Jason Brown added a comment - OK, looks like my initial stab at switching over to FJP netted about 10-15% throughput increase, and mixed results on the latency scores (sometimes better, sometimes on par with trunk). I'm going run some more perf tests this weekend, and will decide how to proceed early next week - but the initial results do look promising. I've only tested the thrift endpoints so far, but when I retest this weekend, I'll throw in the cql3/native protocol, as well. Here's my current working branch: https://github.com/jasobrown/cassandra/tree/4718_fjp . Note, it's very hacked up/WIP as I wanted to confirm the performance benefits before making everything happy (read: metrics pools). Also, I modified Pavel Yaskevich 's thrift-disruptor lib, for this: https://github.com/jasobrown/disruptor_thrift_server/tree/4718_fjp .
          Hide
          Jason Brown added a comment -

          After several month hiatus, digging into this again. This time around, though, I have real hardware to test on, and thus the results are more consistent across executions (no cloud provider intermittently throttling me).

          Testing the thrift interface (both sync and hsha), the short story is throughput is up ~20% vs. TPE, and 95% / 99%iles are down 40-60%. The 99.9%ile, however, is a bit tricker. In some of my tests it is down almost 80%, and sometimes it is up 40-50%. I need to dig in further to understand what is going on (not sure if it’s because of a shared env, reading across numa cores, and so on). perf and likwid are my friends in this investigation.

          As to testing the native protocol interface, I’ve only tested writes (new 2.1 stress seems broken on reads) and I get double the throughput and 40-50% lower latencies across the board.

          My test cluster consists of three machines, 32 cores each, 2 sockets (2 numa cores), 132G memory, 2.6.39 kernel, plus a similar box that generates the load.

          A couple of notes about this patch:

          • RequestThreadPoolExecutor now decorates a FJP. Previously we had a TPE which contains, of course, a (bounded) queue. The bounded queue helped with back pressure from incoming requests. By using a FJP, there is no queue to help with back pressure as the FJP always enqueue a task (without blocking). Not sure if we still want/need that back pressure here.
          • As ForkJoinPool doesn’t expose much in terms of use metrics (like total completed) compared to ThreadPoolExecutor, the ForkJoinPoolMetrics is similarly barren. Not sure if we want to capture this on our own in DFJP or something like else.
          • I have made similar FJP changes to the disruptor-thrift library, and once this patch is committed, I’ll work with Pavel to make the changes over there and pull in the updated jar.

          As a side note, looks like the quasar project (http://docs.paralleluniverse.co/quasar/) indicates the jsr166e jar has some optimizations (http://blog.paralleluniverse.co/2013/05/02/quasar-pulsar/) over the jdk7 implementation (that are included in jdk8). I pulled in those changes and stress tested, but didn’t see much of a difference for our use case. I can, however, pull them in again if any one feels strongly.

          Show
          Jason Brown added a comment - After several month hiatus, digging into this again. This time around, though, I have real hardware to test on, and thus the results are more consistent across executions (no cloud provider intermittently throttling me). Testing the thrift interface (both sync and hsha), the short story is throughput is up ~20% vs. TPE, and 95% / 99%iles are down 40-60%. The 99.9%ile, however, is a bit tricker. In some of my tests it is down almost 80%, and sometimes it is up 40-50%. I need to dig in further to understand what is going on (not sure if it’s because of a shared env, reading across numa cores, and so on). perf and likwid are my friends in this investigation. As to testing the native protocol interface, I’ve only tested writes (new 2.1 stress seems broken on reads) and I get double the throughput and 40-50% lower latencies across the board. My test cluster consists of three machines, 32 cores each, 2 sockets (2 numa cores), 132G memory, 2.6.39 kernel, plus a similar box that generates the load. A couple of notes about this patch: RequestThreadPoolExecutor now decorates a FJP. Previously we had a TPE which contains, of course, a (bounded) queue. The bounded queue helped with back pressure from incoming requests. By using a FJP, there is no queue to help with back pressure as the FJP always enqueue a task (without blocking). Not sure if we still want/need that back pressure here. As ForkJoinPool doesn’t expose much in terms of use metrics (like total completed) compared to ThreadPoolExecutor, the ForkJoinPoolMetrics is similarly barren. Not sure if we want to capture this on our own in DFJP or something like else. I have made similar FJP changes to the disruptor-thrift library, and once this patch is committed, I’ll work with Pavel to make the changes over there and pull in the updated jar. As a side note, looks like the quasar project ( http://docs.paralleluniverse.co/quasar/ ) indicates the jsr166e jar has some optimizations ( http://blog.paralleluniverse.co/2013/05/02/quasar-pulsar/ ) over the jdk7 implementation (that are included in jdk8). I pulled in those changes and stress tested, but didn’t see much of a difference for our use case. I can, however, pull them in again if any one feels strongly.
          Jason Brown made changes -
          Attachment v1-stress.out [ 12640111 ]
          Attachment 4718-v1.patch [ 12640112 ]
          Jason Brown made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Reviewer Pavel Yaskevich [ xedin ]
          Fix Version/s 2.1 [ 12324159 ]
          Hide
          Benedict added a comment - - edited

          Nice! I wonder if this is a much bigger impact on multi-cpu machines, as I did not see anything like this dramatic improvement. But this is great. Do you have some stress dumps we can look at? Nevermind, I see that you do

          new 2.1 stress seems broken on reads

          Shouldn't be - what problem are you seeing?

          Show
          Benedict added a comment - - edited Nice! I wonder if this is a much bigger impact on multi-cpu machines, as I did not see anything like this dramatic improvement. But this is great. Do you have some stress dumps we can look at? Nevermind, I see that you do new 2.1 stress seems broken on reads Shouldn't be - what problem are you seeing?
          Hide
          Benedict added a comment -

          Just to check: you were tearing down and trashing the data directories between write runs? Because the last result is a bit weird: double the throughput, but it ran for 4 times as long, indicating there was a high variance in throughput (usually means compaction / heavy flushing is taking effect) - but the same workload under thrift had no such spikes...

          Show
          Benedict added a comment - Just to check: you were tearing down and trashing the data directories between write runs? Because the last result is a bit weird: double the throughput, but it ran for 4 times as long, indicating there was a high variance in throughput (usually means compaction / heavy flushing is taking effect) - but the same workload under thrift had no such spikes...
          Hide
          Jason Brown added a comment -

          As to multi-cpu machines, I spent a lot time thinking about the affects of NUMA systems on CAS operations/algs (esp. wrt to FJP, obviously). As I mentioned, I'm using systems with two sockets (two NUMA cores). As you get more sockets (and thus more numa cores) a thread on one core will be reaching across to more cores to do work stealing, thus adding contention to that memory address. Imagine four threads on for sockets all contending for work on a fifth thread. The memory values for that portion of the queue for that fifth thread is now pulled into all four sockets, thus becoming more of a contention point, as well as impacting latency (due to the CAS operation). However, this could be (and hopefully is) less of a cost than bothering with queues, blocking, posix threads, OS interrupts, and everything else that makes standard thread pool executors work.

          Thinking even crazier to optimize the FJP sharing across numa cores, this is when I start thinking about digging up the thread affinity work again, and binding threads of similar types (probably by Stage) to sockets, not just an individual CPU (I think that was my problem before). But then I wonder how much is to be gained on non-NUMA systems or systems where you can't determine if it's got NUMA or not (hello, cloud!) - and at that point I'm happy to realize the gains we have and move forward.

          what problem are you seeing?

          Will ping you offline - too unexciting for this space

          Show
          Jason Brown added a comment - As to multi-cpu machines, I spent a lot time thinking about the affects of NUMA systems on CAS operations/algs (esp. wrt to FJP, obviously). As I mentioned, I'm using systems with two sockets (two NUMA cores). As you get more sockets (and thus more numa cores) a thread on one core will be reaching across to more cores to do work stealing, thus adding contention to that memory address. Imagine four threads on for sockets all contending for work on a fifth thread. The memory values for that portion of the queue for that fifth thread is now pulled into all four sockets, thus becoming more of a contention point, as well as impacting latency (due to the CAS operation). However, this could be (and hopefully is) less of a cost than bothering with queues, blocking, posix threads, OS interrupts, and everything else that makes standard thread pool executors work. Thinking even crazier to optimize the FJP sharing across numa cores, this is when I start thinking about digging up the thread affinity work again, and binding threads of similar types (probably by Stage) to sockets, not just an individual CPU (I think that was my problem before). But then I wonder how much is to be gained on non-NUMA systems or systems where you can't determine if it's got NUMA or not (hello, cloud!) - and at that point I'm happy to realize the gains we have and move forward. what problem are you seeing? Will ping you offline - too unexciting for this space
          Hide
          Jason Brown added a comment -

          you were tearing down and trashing the data directories between write runs

          Yes, was also clearing the page cache, as well.

          Show
          Jason Brown added a comment - you were tearing down and trashing the data directories between write runs Yes, was also clearing the page cache, as well.
          Hide
          Jason Brown added a comment -

          but it ran for 4 times as long, indicating there was a high variance in throughput

          Huh, yeah, you are right, it did run longer. Admittedly my eyes have been ignoring that column (shame on me). Let me run the native protocol test again (and try to figure out the read situation, as well).

          Show
          Jason Brown added a comment - but it ran for 4 times as long, indicating there was a high variance in throughput Huh, yeah, you are right, it did run longer. Admittedly my eyes have been ignoring that column (shame on me). Let me run the native protocol test again (and try to figure out the read situation, as well).
          Hide
          Benedict added a comment -

          I think it is unlikely that the CAS has any meaningful impact. The QPI is very quick. I think this kind of speedup is more likely down to reduced signalling costs (unpark costs ~ 10micros, and work stealing means you have to go to the main queue far less frequently); possibly also the signalling of threads has been directly optimised in FJP. I knocked up a "low-switch" executor but found fairly little benefit on my box, as I can saturate the CPUs very easily (at which point the unpark cost is never incurred). On a many-CPU box, saturating all the cores is difficult, and so it is much more likely you'll be introducing bottlenecks on producers adding to the queue.

          Show
          Benedict added a comment - I think it is unlikely that the CAS has any meaningful impact. The QPI is very quick. I think this kind of speedup is more likely down to reduced signalling costs (unpark costs ~ 10micros, and work stealing means you have to go to the main queue far less frequently); possibly also the signalling of threads has been directly optimised in FJP. I knocked up a "low-switch" executor but found fairly little benefit on my box, as I can saturate the CPUs very easily (at which point the unpark cost is never incurred). On a many-CPU box, saturating all the cores is difficult, and so it is much more likely you'll be introducing bottlenecks on producers adding to the queue.
          Hide
          Benedict added a comment -

          Note that given the way C* works, the work stealing crossing CPU boundaries is really not important - the data is completely randomly hodgepodgedly allocated across the CPUs. Nothing we can do in FJP can fix that

          Show
          Benedict added a comment - Note that given the way C* works, the work stealing crossing CPU boundaries is really not important - the data is completely randomly hodgepodgedly allocated across the CPUs. Nothing we can do in FJP can fix that
          Hide
          Jason Brown added a comment -

          Yeah, I think the extra cost across the QPI bus and such is easily masked by any disk I/O the actual op may have to

          the data is completely randomly hodgepodgedly allocated across the CPUs

          Correct, given the current structure of the app. I can image something more CPU cache friendly, but it's huge change and I suspect that I/O still dwarfs those latency reductions. Nice hack day project ....

          Show
          Jason Brown added a comment - Yeah, I think the extra cost across the QPI bus and such is easily masked by any disk I/O the actual op may have to the data is completely randomly hodgepodgedly allocated across the CPUs Correct, given the current structure of the app. I can image something more CPU cache friendly, but it's huge change and I suspect that I/O still dwarfs those latency reductions. Nice hack day project ....
          Hide
          Benedict added a comment -

          Maybe you're one of the few people I haven't told of my dream of internally hashing C* so that each sub-hash is run by its own core. Guaranteed NUMA behaviour, and we can stop performing any CASes and make all kinds of single threaded optimisations in various places (e.g. no need to do CoW updates to memtables, so massive garbage reduction).

          Bit of a pipedream for the moment though

          Show
          Benedict added a comment - Maybe you're one of the few people I haven't told of my dream of internally hashing C* so that each sub-hash is run by its own core. Guaranteed NUMA behaviour, and we can stop performing any CASes and make all kinds of single threaded optimisations in various places (e.g. no need to do CoW updates to memtables, so massive garbage reduction). Bit of a pipedream for the moment though
          Hide
          Benedict added a comment -

          Jason Brown: Could you upload the full stress outputs for these runs? And also try running a separate stress run with a fixed high threadcount and op count?

          In particular for CQL, the results in the file are a little bit weird. That said, given their consistency for thrift I don't doubt the result is meaningful, but it would be good to understand what we're incorporating a bit better before committing.

          Show
          Benedict added a comment - Jason Brown : Could you upload the full stress outputs for these runs? And also try running a separate stress run with a fixed high threadcount and op count? In particular for CQL, the results in the file are a little bit weird. That said, given their consistency for thrift I don't doubt the result is meaningful, but it would be good to understand what we're incorporating a bit better before committing.
          Hide
          Jason Brown added a comment -

          OK, will give it a shot today. Also, just noticed I did not tune native_transport_max_threads at all (so I have the default of 128). Might play with that a bit, as well.

          Show
          Jason Brown added a comment - OK, will give it a shot today. Also, just noticed I did not tune native_transport_max_threads at all (so I have the default of 128). Might play with that a bit, as well.
          Benedict made changes -
          Reviewer Pavel Yaskevich [ xedin ] Benedict [ benedict ]
          Hide
          Pavel Yaskevich added a comment -

          I still want to review this, why are you re-assigning Benedict ?

          Show
          Pavel Yaskevich added a comment - I still want to review this, why are you re-assigning Benedict ?
          Hide
          Aleksey Yeschenko added a comment -

          We were just doing some house cleaning today on IRC

          [16:49:06] jbellis: belliottsmith: is 4718 really patch available?
          [16:49:14] jbellis: #4718
          [16:49:14] CassBotJr: https://issues.apache.org/jira/browse/CASSANDRA-4718 (Unresolved; 2.1): "More-efficient ExecutorService for improved throughput"
          [16:50:15] belliottsmith: so jasobrown says - he's marked pavel as reviewer so i've kept out of it beyond asking for a little more stress info
          [16:51:28] belliottsmith: honestly though, from last time i looked at it (a while back), it was a pretty simple change

          Show
          Aleksey Yeschenko added a comment - We were just doing some house cleaning today on IRC [16:49:06] jbellis: belliottsmith: is 4718 really patch available? [16:49:14] jbellis: #4718 [16:49:14] CassBotJr: https://issues.apache.org/jira/browse/CASSANDRA-4718 (Unresolved; 2.1): "More-efficient ExecutorService for improved throughput" [16:50:15] belliottsmith: so jasobrown says - he's marked pavel as reviewer so i've kept out of it beyond asking for a little more stress info [16:51:28] belliottsmith: honestly though, from last time i looked at it (a while back), it was a pretty simple change
          Hide
          Jason Brown added a comment -

          Guys, guys, there's plenty of my patch to criticize - you'll each have your fun, I'm sure

          Show
          Jason Brown added a comment - Guys, guys, there's plenty of my patch to criticize - you'll each have your fun, I'm sure
          Hide
          Benedict added a comment -

          I've uploaded a new version of the patch here

          I've refactored the DebuggableForkJoinPool a little to support a limited queue (so that our native transport queue doesn't get too long), and to support the metrics that users may have gotten used to.

          I've tested the branch out very minimally and do see a very modest performance benefit on my box for reads, but that's far from conclusive - however it's quite likely any benefit is more visible on machines with more cores going spare though, as the single queue lock for a standard executor could easily become a point of contention.

          One slight concern I have with this approach is that it in order to make enqueueing tasks less contentious we will need to either fork ForkJoinPool, or see if it is possible to implement an EventLoopGroup backed by a FJP, and use the same FJP to manage the connections as we do the execution of our tasks (as enqueuing tasks from a FJ-worker is contention-free). Given how FJP is intended to be used it is not optimised for enqueueing tasks, and is no more efficient (probably slightly less) than a standard executor. That's a future problem, however.

          Show
          Benedict added a comment - I've uploaded a new version of the patch here I've refactored the DebuggableForkJoinPool a little to support a limited queue (so that our native transport queue doesn't get too long), and to support the metrics that users may have gotten used to. I've tested the branch out very minimally and do see a very modest performance benefit on my box for reads, but that's far from conclusive - however it's quite likely any benefit is more visible on machines with more cores going spare though, as the single queue lock for a standard executor could easily become a point of contention. One slight concern I have with this approach is that it in order to make enqueueing tasks less contentious we will need to either fork ForkJoinPool, or see if it is possible to implement an EventLoopGroup backed by a FJP, and use the same FJP to manage the connections as we do the execution of our tasks (as enqueuing tasks from a FJ-worker is contention-free). Given how FJP is intended to be used it is not optimised for enqueueing tasks, and is no more efficient (probably slightly less) than a standard executor. That's a future problem, however.
          Hide
          Benedict added a comment -

          I've uploaded a slight variant of the patch here - this introduces a special FJP for that processing native transport work, that avoids blocking on enqueue to the pool unless the configured limit has been reached. Instead we schedule a ForkJoinTask that sleeps for 5us, forking any work that has been queued in the interval (and going to sleep only if no work has been seen in the past 5ms). This permits the connection worker threads to return to servicing their connections more promptly.

          It has only a modest effect on my box, but it does give a 5-10% bump in native transport performance.

          Show
          Benedict added a comment - I've uploaded a slight variant of the patch here - this introduces a special FJP for that processing native transport work, that avoids blocking on enqueue to the pool unless the configured limit has been reached. Instead we schedule a ForkJoinTask that sleeps for 5us, forking any work that has been queued in the interval (and going to sleep only if no work has been seen in the past 5ms). This permits the connection worker threads to return to servicing their connections more promptly. It has only a modest effect on my box, but it does give a 5-10% bump in native transport performance.
          Hide
          Jason Brown added a comment -

          Benedict I agree that FJP does not have a native enqueueing mechanism, but I'm not sure adding a Semaphore right in the middle of the class is the right solution. The Semaphore is blocking (by design) and will be contended for across (numa) cores. As an alternative, at least for the native protocol, does it make sense to move the back pressure point earlier in the processing chain? I'm unfamiliar with EventLoopGroup, but any solution is better than forking FJP. Also, I have an idea I'd like to try out .. give me a day or two.

          ... <FJP> is no more efficient (probably slightly less) than a standard executor. That's a future problem, however

          I literally have no idea what this means.

          support the metrics that users may have gotten used to.

          I can argue from running cassandra in production for almost four years that these metrics are not very helpful. At best they indicate that 'something' is amiss ("hey, pending tasks are getting higher"), but cannot give you a real clue as to what is wrong (gc, I/O contention, cpu throttling). As we got these data points largely for free from TPE, I guess it made sense to expose them, but if we have to go out of our way to fabricate a subset of them for FJP, I propose we drop them going forward (for FJP, at least).

          Show
          Jason Brown added a comment - Benedict I agree that FJP does not have a native enqueueing mechanism, but I'm not sure adding a Semaphore right in the middle of the class is the right solution. The Semaphore is blocking (by design) and will be contended for across (numa) cores. As an alternative, at least for the native protocol, does it make sense to move the back pressure point earlier in the processing chain? I'm unfamiliar with EventLoopGroup, but any solution is better than forking FJP. Also, I have an idea I'd like to try out .. give me a day or two. ... <FJP> is no more efficient (probably slightly less) than a standard executor. That's a future problem, however I literally have no idea what this means. support the metrics that users may have gotten used to. I can argue from running cassandra in production for almost four years that these metrics are not very helpful. At best they indicate that 'something' is amiss ("hey, pending tasks are getting higher"), but cannot give you a real clue as to what is wrong (gc, I/O contention, cpu throttling). As we got these data points largely for free from TPE, I guess it made sense to expose them, but if we have to go out of our way to fabricate a subset of them for FJP, I propose we drop them going forward (for FJP, at least).
          Hide
          Benedict added a comment - - edited

          . The Semaphore is blocking (by design)

          It's non-blocking until you run out of permits, at which point it must block. We have many more shared counters than this semaphore, so I highly doubt it will be an issue (if doing nothing but spinning on updating it we could push probably several thousand times our current op-rate, and in reality we will be doing a lot inbetween, so contention is highly unlikely to be an issue, although it will incur a slight QPI penalty - nothing we don't incur all over the place though). That said, I have nothing against only conditionally creating the Semaphore which would eliminate it as a cost anywhere it isn't necessary.

          but any solution is better than forking FJP

          It isn't forked - this is all in the same extension class that you introduced...?

          I literally have no idea what this means.

          FJP uses an exclusive lock for enqueueing work onto the pool, but does more whilst owning the lock, so is likely to take longer within the critical section. The second patch I uploaded attempts to mitigate this for native transport threads as those micros are actually a pretty big deal when dealing with a flood of tiny messages.

          As we got these data points largely for free from TPE, I guess it made sense to expose them, but if we have to go out of our way to fabricate a subset of them for FJP, I propose we drop them going forward (for FJP, at least).

          I don't really mind, but I think you're overestimating the penalty for maintaining these counters.

          Show
          Benedict added a comment - - edited . The Semaphore is blocking (by design) It's non-blocking until you run out of permits, at which point it must block. We have many more shared counters than this semaphore, so I highly doubt it will be an issue (if doing nothing but spinning on updating it we could push probably several thousand times our current op-rate, and in reality we will be doing a lot inbetween, so contention is highly unlikely to be an issue, although it will incur a slight QPI penalty - nothing we don't incur all over the place though). That said, I have nothing against only conditionally creating the Semaphore which would eliminate it as a cost anywhere it isn't necessary. but any solution is better than forking FJP It isn't forked - this is all in the same extension class that you introduced...? I literally have no idea what this means. FJP uses an exclusive lock for enqueueing work onto the pool, but does more whilst owning the lock, so is likely to take longer within the critical section. The second patch I uploaded attempts to mitigate this for native transport threads as those micros are actually a pretty big deal when dealing with a flood of tiny messages. As we got these data points largely for free from TPE, I guess it made sense to expose them, but if we have to go out of our way to fabricate a subset of them for FJP, I propose we drop them going forward (for FJP, at least). I don't really mind, but I think you're overestimating the penalty for maintaining these counters.
          Hide
          Jeremiah Jordan added a comment -

          I can argue from running cassandra in production for almost four years that these metrics are not very helpful. At best they indicate that 'something' is amiss ("hey, pending tasks are getting higher"), but cannot give you a real clue as to what is wrong (gc, I/O contention, cpu throttling). As we got these data points largely for free from TPE, I guess it made sense to expose them, but if we have to go out of our way to fabricate a subset of them for FJP, I propose we drop them going forward (for FJP, at least).

          I would argue they are very useful because they give you that high level "something is wrong", so if its easy to keep them, I am very +1 on that.

          Show
          Jeremiah Jordan added a comment - I can argue from running cassandra in production for almost four years that these metrics are not very helpful. At best they indicate that 'something' is amiss ("hey, pending tasks are getting higher"), but cannot give you a real clue as to what is wrong (gc, I/O contention, cpu throttling). As we got these data points largely for free from TPE, I guess it made sense to expose them, but if we have to go out of our way to fabricate a subset of them for FJP, I propose we drop them going forward (for FJP, at least). I would argue they are very useful because they give you that high level "something is wrong", so if its easy to keep them, I am very +1 on that.
          Hide
          Jason Brown added a comment -

          The Semaphore is blocking

          After reading the correct jdk source class this time, I was mistaken about the blocking (i.e. park) aspect of Semaphore (got caught up in the rest of AbstractQueuedSynchronizer, which Semaphore subclasses and uses internally). Thus, I'll test out your branch as is this afternoon.

          It isn't forked - this is all in the same extension class that you introduced

          I didn't consider this a 'fork' as we're not mucking about with the internals of the FJP itself.

          FJP uses an exclusive lock for enqueueing work onto the pool, but does more whilst owning the lock, so is likely to take longer within the critical section

          Perhaps, but the overall improvement in performance (should we attribute that to the work stealing?) seems compelling enough.

          Show
          Jason Brown added a comment - The Semaphore is blocking After reading the correct jdk source class this time, I was mistaken about the blocking (i.e. park) aspect of Semaphore (got caught up in the rest of AbstractQueuedSynchronizer, which Semaphore subclasses and uses internally). Thus, I'll test out your branch as is this afternoon. It isn't forked - this is all in the same extension class that you introduced I didn't consider this a 'fork' as we're not mucking about with the internals of the FJP itself. FJP uses an exclusive lock for enqueueing work onto the pool, but does more whilst owning the lock, so is likely to take longer within the critical section Perhaps, but the overall improvement in performance (should we attribute that to the work stealing?) seems compelling enough.
          Hide
          Benedict added a comment -

          Perhaps, but the overall improvement in performance (should we attribute that to the work stealing?) seems compelling enough.

          I'm not suggesting we forego the patch because of this concern, I'm raising it as something to bear in mind for the future. As I said, though, to some extent I have addressed this concern with the lowsignal patch I uploaded, although it's debatable how elegant that approach is.

          I didn't consider this a 'fork' as we're not mucking about with the internals of the FJP itself.

          Perhaps we're getting crossed wires and mixing up the patches I have uploaded (no fork), with the suggestion that we may want to investigate forking in future in order to address these issues in a more elegant manner.

          Show
          Benedict added a comment - Perhaps, but the overall improvement in performance (should we attribute that to the work stealing?) seems compelling enough. I'm not suggesting we forego the patch because of this concern, I'm raising it as something to bear in mind for the future. As I said, though, to some extent I have addressed this concern with the lowsignal patch I uploaded, although it's debatable how elegant that approach is. I didn't consider this a 'fork' as we're not mucking about with the internals of the FJP itself. Perhaps we're getting crossed wires and mixing up the patches I have uploaded (no fork), with the suggestion that we may want to investigate forking in future in order to address these issues in a more elegant manner.
          Hide
          Jason Brown added a comment -

          Benedict Can you clarify what you mean by "when dealing with a flood of tiny messages"?

          Show
          Jason Brown added a comment - Benedict Can you clarify what you mean by "when dealing with a flood of tiny messages"?
          Hide
          Benedict added a comment - - edited

          Sure. My box can push around 60Kop/s - this translates to around 60us/op (core time), when unpark() clocks in around 10us, you want to avoid it, especially when you have network streams blocked on the consumer that is handing off the work.

          Show
          Benedict added a comment - - edited Sure. My box can push around 60Kop/s - this translates to around 60us/op (core time), when unpark() clocks in around 10us, you want to avoid it, especially when you have network streams blocked on the consumer that is handing off the work.
          Hide
          Jason Brown added a comment -

          translates to around 60us/up

          Did you mean 60us/op ? Also, are these just requests from (new) cassandra stress? You've described the time to process each 'tiny message' but not what a tiny message is . How are you measuring the time for each request (more for my own curiosity)?

          Show
          Jason Brown added a comment - translates to around 60us/up Did you mean 60us/ op ? Also, are these just requests from (new) cassandra stress? You've described the time to process each 'tiny message' but not what a tiny message is . How are you measuring the time for each request (more for my own curiosity)?
          Hide
          Benedict added a comment -

          Yes, typo - corrected. Yeah, just straight up read or write requests (I can push the same for both, but they tank for writes when we start hitting compaction/flush etc). I'm being as conservative as possible and assuming every core is spending every moment working on one of the ops (in reality it's more like half of that time). I don't have CASSANDRA-7061 yet to have any really accurate numbers to play with.

          As regards costs for unpark(), I've timed them in the past and that's in the ball park of what you'd expect given the literature and general OS behaviour (10us is probably a bit heavier than they often clock in, but a good figure to work with)

          Show
          Benedict added a comment - Yes, typo - corrected. Yeah, just straight up read or write requests (I can push the same for both, but they tank for writes when we start hitting compaction/flush etc). I'm being as conservative as possible and assuming every core is spending every moment working on one of the ops (in reality it's more like half of that time). I don't have CASSANDRA-7061 yet to have any really accurate numbers to play with. As regards costs for unpark(), I've timed them in the past and that's in the ball park of what you'd expect given the literature and general OS behaviour (10us is probably a bit heavier than they often clock in, but a good figure to work with)
          Hide
          Jason Brown added a comment -

          I've run both of Benedict's patches on my test cluster, and the results are quite hopeful (see backpressure-stress.out). Short story is anything we do for the native protocol will at least double the throughput and halve the latency, but that's the low end of the improvements.

          Benedict's second patch is about 5-10% faster than his first, and both are slightly slower than my original patch. As I think it's important to have back pressure for any pool, at a minimum I think we should go with Benedict's first patch. I like the ideas in the second patch, but would like another day to digest the implementation.

          Show
          Jason Brown added a comment - I've run both of Benedict 's patches on my test cluster, and the results are quite hopeful (see backpressure-stress.out). Short story is anything we do for the native protocol will at least double the throughput and halve the latency, but that's the low end of the improvements. Benedict's second patch is about 5-10% faster than his first, and both are slightly slower than my original patch. As I think it's important to have back pressure for any pool, at a minimum I think we should go with Benedict 's first patch. I like the ideas in the second patch, but would like another day to digest the implementation.
          Jason Brown made changes -
          Attachment backpressure-stress.out.txt [ 12642830 ]
          Hide
          Benedict added a comment -

          For comparison, a graph of Jason's results: https://docs.google.com/spreadsheets/d/1mLxyY9syaAlDb1ALGQ-oF7Qo0tQffbcNgFMVPktde88/edit?usp=sharing

          I'd like to do a couple of things here:

          1. Tweak the Low Signal patch to potentially signal more intelligently rather than just always aggregating the last 5us of requests
          2. Try increasing the queue length
          3. Try these tests for a standardized load - the stress functionality we're using is great for giving a good ballpark idea of performance, but it varies the number of ops with each run, so running with a fixed 10M ops per run might be useful (stress could maybe do with an "ops per thread" option, as for the low thread counts this is a lot of work, but for high counts not very much)

          The lowsignal patch looks to outperform at certain thresholds, but underperform at others, and I'm hoping 1 and 2 might help us make it better overall. At high thread counts the difference is almost 20% for writes, which is non-trivial.

          Show
          Benedict added a comment - For comparison, a graph of Jason's results: https://docs.google.com/spreadsheets/d/1mLxyY9syaAlDb1ALGQ-oF7Qo0tQffbcNgFMVPktde88/edit?usp=sharing I'd like to do a couple of things here: Tweak the Low Signal patch to potentially signal more intelligently rather than just always aggregating the last 5us of requests Try increasing the queue length Try these tests for a standardized load - the stress functionality we're using is great for giving a good ballpark idea of performance, but it varies the number of ops with each run, so running with a fixed 10M ops per run might be useful (stress could maybe do with an "ops per thread" option, as for the low thread counts this is a lot of work, but for high counts not very much) The lowsignal patch looks to outperform at certain thresholds, but underperform at others, and I'm hoping 1 and 2 might help us make it better overall. At high thread counts the difference is almost 20% for writes, which is non-trivial.
          Hide
          Chris Burroughs added a comment -

          .bq I would argue they are very useful because they give you that high level "something is wrong", so if its easy to keep them, I am very +1 on that.

          When working with any other jvm system with thread pools I get miss the good queue/backpressure metrics that Cassandra exposes.

          Show
          Chris Burroughs added a comment - .bq I would argue they are very useful because they give you that high level "something is wrong", so if its easy to keep them, I am very +1 on that. When working with any other jvm system with thread pools I get miss the good queue/backpressure metrics that Cassandra exposes.
          Hide
          Benedict added a comment -

          I have a few branches to test out, and I want to test them out an a variety of hardware. Ryan McGuire can you run them on our internal multi-cpu boxes, and an AWS c3.8xlarge 4node cluster to the following spec:

          For each branch run: 20M inserts over 1M unique keys with 30, 90, 270 and 810 threads, then wipe each cluster and perform a single 1M key insert, and then run 20M reads over 1M unique keys with the same thread counts. All told that should take around 3hrs for -mode cql3 native prepared; I'd then like to repeat the tests for -mode thrift smart.

          The branches are:
          https://github.com/belliottsmith/cassandra/tree/4718-lse
          https://github.com/belliottsmith/cassandra/tree/4718-lse-batchnetty
          https://github.com/belliottsmith/cassandra/tree/4718-fjp
          https://github.com/belliottsmith/cassandra/tree/4718-lowsignal
          https://github.com/belliottsmith/cassandra/tree/cassandra-2.1

          Make sure you use my cassandra-2.1 so we're testing like-to-like (they're all rebased to the same version).

          I'll elaborate on the contents of these branches later, but suffice it to say the 4718-lse branch contains a new executor which attempts to reduce signalling costs to near zero by scheduling the correct number of threads to deal with the level of throughput the executor has been dealing with over the previous (short) adjustment window. -batchnetty includes some simple batching of netty messages. 4718-lowsignal is an enhanced version of the patch I uploaded previously to this ticket, and 4718-fjp is largely unchanged.

          On my own box, and on our austin test cluster, I see -lse faster than both -fjp and -lowsignal, however on our austin cluster (which is a not super-modern 4-cpu no-hyperthreading setup) I see both of them slower than stock 2.1, however -lse is only slightly slower, whereas -fjp is around 30% slower. I'll post polished numbers a little later.

          Show
          Benedict added a comment - I have a few branches to test out, and I want to test them out an a variety of hardware. Ryan McGuire can you run them on our internal multi-cpu boxes, and an AWS c3.8xlarge 4node cluster to the following spec: For each branch run: 20M inserts over 1M unique keys with 30, 90, 270 and 810 threads, then wipe each cluster and perform a single 1M key insert, and then run 20M reads over 1M unique keys with the same thread counts. All told that should take around 3hrs for -mode cql3 native prepared; I'd then like to repeat the tests for -mode thrift smart. The branches are: https://github.com/belliottsmith/cassandra/tree/4718-lse https://github.com/belliottsmith/cassandra/tree/4718-lse-batchnetty https://github.com/belliottsmith/cassandra/tree/4718-fjp https://github.com/belliottsmith/cassandra/tree/4718-lowsignal https://github.com/belliottsmith/cassandra/tree/cassandra-2.1 Make sure you use my cassandra-2.1 so we're testing like-to-like (they're all rebased to the same version). I'll elaborate on the contents of these branches later, but suffice it to say the 4718-lse branch contains a new executor which attempts to reduce signalling costs to near zero by scheduling the correct number of threads to deal with the level of throughput the executor has been dealing with over the previous (short) adjustment window. -batchnetty includes some simple batching of netty messages. 4718-lowsignal is an enhanced version of the patch I uploaded previously to this ticket, and 4718-fjp is largely unchanged. On my own box, and on our austin test cluster, I see -lse faster than both -fjp and -lowsignal, however on our austin cluster (which is a not super-modern 4-cpu no-hyperthreading setup) I see both of them slower than stock 2.1, however -lse is only slightly slower, whereas -fjp is around 30% slower. I'll post polished numbers a little later.
          Hide
          Ryan McGuire added a comment - - edited

          Benedict Here's benchmarks from bdplab:

          I'm still working on getting some EC2 stats.

          Show
          Ryan McGuire added a comment - - edited Benedict Here's benchmarks from bdplab: 30 threads 90 threads 270 threads 810 threads (check out that read for lse-batchnetty!) I'm still working on getting some EC2 stats.
          Hide
          Ryan McGuire added a comment -

          Above is with 4 nodes, one of which was the one hosting stress. Here's a 3 node variety, with stress on a separate host:

          Show
          Ryan McGuire added a comment - Above is with 4 nodes, one of which was the one hosting stress. Here's a 3 node variety, with stress on a separate host: 30 threads 90 threads 270 threads 810 threads
          Hide
          Benedict added a comment -

          FTR, there are new branches: 4718-fjp-batchnetty 4718-lse-batchnetty cassandra-2.1-batchnetty

          These are the three real contenders, and I've included the netty batching for all of them so we can get a like-for-like comparison going.

          Show
          Benedict added a comment - FTR, there are new branches: 4718-fjp-batchnetty 4718-lse-batchnetty cassandra-2.1-batchnetty These are the three real contenders, and I've included the netty batching for all of them so we can get a like-for-like comparison going.
          Hide
          Ryan McGuire added a comment - - edited

          Benchmarks from bdplab for the new branches. 3 nodes, separate stress host.

          Show
          Ryan McGuire added a comment - - edited Benchmarks from bdplab for the new branches. 3 nodes, separate stress host. 810 threads 270 threads 90 threads 30 threads
          Hide
          Benedict added a comment -

          Hmm. Frustrating - tweaks that made lse-batchnetty faster on my box (by 20-30%) make it slower on bdplab. Would be good to get some other numbers from different rigs involved to see if we can pin down the sweet spot, and maybe figure out what the cause of the discrepancy is.

          Show
          Benedict added a comment - Hmm. Frustrating - tweaks that made lse-batchnetty faster on my box (by 20-30%) make it slower on bdplab. Would be good to get some other numbers from different rigs involved to see if we can pin down the sweet spot, and maybe figure out what the cause of the discrepancy is.
          Hide
          Jason Brown added a comment - - edited

          attached file (belliotsmith_branches.out) are my results from running the latest branches on the same hardware I described above. The lse-batchnetty definitely is the best performer thus far.

          Show
          Jason Brown added a comment - - edited attached file (belliotsmith_branches.out) are my results from running the latest branches on the same hardware I described above. The lse-batchnetty definitely is the best performer thus far.
          Jason Brown made changes -
          Attachment belliotsmith_branches-stress.out.txt [ 12644190 ]
          Hide
          Benedict added a comment -

          I've attached read throughput graphs for a run on a 2 node AWS c3.8xlarge with 1 load generating node (i've ignored write throughputs for now as the local disks are atrocious, so the numbers are pretty meaningless and all over the place), and graphs for Jason's runs read/write runs, including the read latencies (most representative of networking as they should all in-memory operations)

          It looks like on modern/patched kernels and/or hyperthreaded modern CPUs the lse-batchnetty patch wins hands down - latency and throughput are all substantially better. I'll now try to see if a little bit of tuning can yield better results for all systems, as the prior version of the LSE branch was hitting the same throughput as 2.1-batchnetty does on our internal no-hyperthreading 2.6 clusters. So if we can bring that back up the case should be pretty compelling.

          Show
          Benedict added a comment - I've attached read throughput graphs for a run on a 2 node AWS c3.8xlarge with 1 load generating node (i've ignored write throughputs for now as the local disks are atrocious, so the numbers are pretty meaningless and all over the place), and graphs for Jason's runs read/write runs, including the read latencies (most representative of networking as they should all in-memory operations) It looks like on modern/patched kernels and/or hyperthreaded modern CPUs the lse-batchnetty patch wins hands down - latency and throughput are all substantially better. I'll now try to see if a little bit of tuning can yield better results for all systems, as the prior version of the LSE branch was hitting the same throughput as 2.1-batchnetty does on our internal no-hyperthreading 2.6 clusters. So if we can bring that back up the case should be pretty compelling.
          Benedict made changes -
          Attachment aws.svg [ 12644225 ]
          Attachment jason_read.svg [ 12644226 ]
          Attachment jason_read_latency.svg [ 12644227 ]
          Attachment jason_write.svg [ 12644228 ]
          Hide
          Ryan McGuire added a comment - - edited

          Benedict, fwiw here's EC2 benchmarks, c3.8xlarge - 4 C* nodes - 1 separate stress node hitting those 4 nodes:

          Show
          Ryan McGuire added a comment - - edited Benedict , fwiw here's EC2 benchmarks, c3.8xlarge - 4 C* nodes - 1 separate stress node hitting those 4 nodes: 810 threads 270 threads 90 threads
          Hide
          Benedict added a comment -

          I have uploaded a complete rewrite here

          This on my tests is another 10%+ faster than lse-batchnetty, making it roughly on-par with 2.1-batchnetty on our old hardware, but thoroughly outstripping it on EC2 and my laptop. I still need to introduce some thorough tests for it, and to comment the code thoroughly, but the basic principle is the same only all executors share the same pool of worker threads so that scheduling is easier, and work can be passed more easily between them.

          I will revisit this work again sometime in the next year to see if we can squeeze anything more out of this, especially as we add more optimisations elsewhere - but for now we're reaching diminishing returns.

          Ryan McGuire can you run a comparison of this and just cassandra-2.1-batchnetty on bdplab and EC2, so we can get a final comparison? Jason Brown if you feel like kicking off a run of this latest branch on your hardware so we have as many final data points to compare against that would also be really helpful. I'll get the code commented early tomorrow so we can get this reviewed ASAP.

          Show
          Benedict added a comment - I have uploaded a complete rewrite here This on my tests is another 10%+ faster than lse-batchnetty, making it roughly on-par with 2.1-batchnetty on our old hardware, but thoroughly outstripping it on EC2 and my laptop. I still need to introduce some thorough tests for it, and to comment the code thoroughly, but the basic principle is the same only all executors share the same pool of worker threads so that scheduling is easier, and work can be passed more easily between them. I will revisit this work again sometime in the next year to see if we can squeeze anything more out of this, especially as we add more optimisations elsewhere - but for now we're reaching diminishing returns. Ryan McGuire can you run a comparison of this and just cassandra-2.1-batchnetty on bdplab and EC2, so we can get a final comparison? Jason Brown if you feel like kicking off a run of this latest branch on your hardware so we have as many final data points to compare against that would also be really helpful. I'll get the code commented early tomorrow so we can get this reviewed ASAP.
          Hide
          Benedict added a comment -

          Ryan McGuire it would be great to see equivalent runs for a regular out-of-memory workload as well, just to make sure there aren't any weird results. Thanks!

          Show
          Benedict added a comment - Ryan McGuire it would be great to see equivalent runs for a regular out-of-memory workload as well, just to make sure there aren't any weird results. Thanks!
          Hide
          Jason Brown added a comment - - edited

          Ryan McGuire Also, is it possible for you to fill up the disks with more sstables than available memory? I think we shouyld check how going to disk plays into the performance mix, rather than just reading from page cache for the entire read test. This should introduce another modality into the way the algorithm behaves, one that is probably more realistic to the real world (a mix of page cache hits and disk seeks).

          Benedict This rewrite (4718-sep) is quite extensive wrt prior branches. As this branch is quite complex with many new additions, I will need a good chunk of time tomorrow to review this.

          Show
          Jason Brown added a comment - - edited Ryan McGuire Also, is it possible for you to fill up the disks with more sstables than available memory? I think we shouyld check how going to disk plays into the performance mix, rather than just reading from page cache for the entire read test. This should introduce another modality into the way the algorithm behaves, one that is probably more realistic to the real world (a mix of page cache hits and disk seeks). Benedict This rewrite (4718-sep) is quite extensive wrt prior branches. As this branch is quite complex with many new additions, I will need a good chunk of time tomorrow to review this.
          Hide
          Benedict added a comment -

          Also, is it possible for you to fill up the disks with more sstables than available memory?

          +1 ("regular out-of-memory workload" was a dreadfully worded attempt to express this)

          As this branch is quite complex with many new additions, I will need a good chunk of time tomorrow to review this.

          Let me know if there's anything specific that needs explaining. I will be commenting it before breakfast your time.

          Show
          Benedict added a comment - Also, is it possible for you to fill up the disks with more sstables than available memory? +1 ("regular out-of-memory workload" was a dreadfully worded attempt to express this) As this branch is quite complex with many new additions, I will need a good chunk of time tomorrow to review this. Let me know if there's anything specific that needs explaining. I will be commenting it before breakfast your time.
          Hide
          Benedict added a comment -

          Jason Brown I've updated the repository with a number of minor tweaks/refactors, and improved comments. Let me know if there's anything still unclear.

          Show
          Benedict added a comment - Jason Brown I've updated the repository with a number of minor tweaks/refactors, and improved comments. Let me know if there's anything still unclear.
          Benedict made changes -
          Assignee Jason Brown [ jasobrown ] Benedict [ benedict ]
          Benedict made changes -
          Reviewer Benedict [ benedict ] Jason Brown [ jasobrown ]
          Hide
          Jason Brown added a comment -

          Also, it looks like the period of time that you are running the tests for is very short (about 1 or 2 minutes). Can you let it run for at least 30 minutes or so (if not an hour or more), so we can see the burn in? Everything can look rosy in a 90 second test, but fall apart spectacularly under (closer to) real world conditions.

          Show
          Jason Brown added a comment - Also, it looks like the period of time that you are running the tests for is very short (about 1 or 2 minutes). Can you let it run for at least 30 minutes or so (if not an hour or more), so we can see the burn in? Everything can look rosy in a 90 second test, but fall apart spectacularly under (closer to) real world conditions.
          Hide
          Ryan McGuire added a comment - - edited

          I scaled the test up by a factor of 10. I'll update here as the tests complete:

          5 c3.8xlarge EC2 cluster:

          • 810 threads - cassandra-2.1 timed out in this test, I'll investigate it, but it wasn't one of the branches you asked for anyway.
          • 270 threads

          3 bdplab cluster:

          • 810 threads - had some failures with the second write population here. I'll revisit this when I get a chance (In in meetings today and I have limited debugging time.)
          Show
          Ryan McGuire added a comment - - edited I scaled the test up by a factor of 10. I'll update here as the tests complete: 5 c3.8xlarge EC2 cluster: 810 threads - cassandra-2.1 timed out in this test, I'll investigate it, but it wasn't one of the branches you asked for anyway. 270 threads 3 bdplab cluster: 810 threads - had some failures with the second write population here. I'll revisit this when I get a chance (In in meetings today and I have limited debugging time.)
          Hide
          Benedict added a comment -

          Can you let it run for at least 30 minutes or so

          Let's hold off on that until we have some comparison numbers - agreed it's a good idea, but just want to get some idea of behaviour first

          Show
          Benedict added a comment - Can you let it run for at least 30 minutes or so Let's hold off on that until we have some comparison numbers - agreed it's a good idea, but just want to get some idea of behaviour first
          Hide
          Benedict added a comment -

          For those interested, comparison of the latest version versus 2.1-batchnetty (dropping other branches for comparison now).

          Show
          Benedict added a comment - For those interested, comparison of the latest version versus 2.1-batchnetty (dropping other branches for comparison now).
          Benedict made changes -
          Attachment aws_read.svg [ 12644741 ]
          Hide
          Pavel Yaskevich added a comment -

          The graphs Ryan posted in his previous comment (especially bdplab tests) look pretty close to what I would expect without running the tests and just looking as 4718-sep code. The more latency is introduced to the tasks the less effect would spinning have or in other words there is need to spin is eliminated, because the more time execution takes the more luckily it is that next task is already there waiting in the queue which makes thread parking/unparking no longer a dominant factor in latencies. So I would be very interested to see even longer running tests (especially reads) because that is much closer to the real behavior dominated by network/disk latencies.

          Show
          Pavel Yaskevich added a comment - The graphs Ryan posted in his previous comment (especially bdplab tests) look pretty close to what I would expect without running the tests and just looking as 4718-sep code. The more latency is introduced to the tasks the less effect would spinning have or in other words there is need to spin is eliminated, because the more time execution takes the more luckily it is that next task is already there waiting in the queue which makes thread parking/unparking no longer a dominant factor in latencies. So I would be very interested to see even longer running tests (especially reads) because that is much closer to the real behavior dominated by network/disk latencies.
          Hide
          Jason Brown added a comment -

          Benedict WRT to the 2.1-batchnetty comparison, what did the latencies look like?

          Show
          Jason Brown added a comment - Benedict WRT to the 2.1-batchnetty comparison, what did the latencies look like?
          Hide
          Benedict added a comment - - edited

          Benedict WRT to the 2.1-batchnetty comparison, what did the latencies look like?

          Ryan's graphs are a much better way to view latencies; on the whole they seem universally as good or better (generally much better)

          The more latency is introduced to the tasks the less effect would spinning have or in other words there is need to spin is eliminated

          Yes, also even more important is that the unpark() cost, when amortized over a long running operation, becomes insignificant regardless of if it is incurred; and producers cannot make forward progress anyway because the native-transport queue is full so avoiding paying the unpark cost on the network thread really doesn't achieve us anything. I fully expect there to be very little effect on workloads as the dataset exceeds memory and the row size climbs.

          Show
          Benedict added a comment - - edited Benedict WRT to the 2.1-batchnetty comparison, what did the latencies look like? Ryan's graphs are a much better way to view latencies; on the whole they seem universally as good or better (generally much better) The more latency is introduced to the tasks the less effect would spinning have or in other words there is need to spin is eliminated Yes, also even more important is that the unpark() cost, when amortized over a long running operation, becomes insignificant regardless of if it is incurred; and producers cannot make forward progress anyway because the native-transport queue is full so avoiding paying the unpark cost on the network thread really doesn't achieve us anything. I fully expect there to be very little effect on workloads as the dataset exceeds memory and the row size climbs.
          Hide
          Pavel Yaskevich added a comment -

          Yes, also even more important is that the unpark() cost, when amortized over a long running operation, becomes insignificant regardless of if it is incurred; and producers cannot make forward progress anyway because the native-transport queue is full so avoiding paying the unpark cost on the network thread really doesn't achieve us anything. I fully expect there to be very little effect on workloads as the dataset exceeds memory and the row size climbs.

          Exactly my point which starts right after the bq you have taken. Also most of the use cases is exactly that - data set which exceeds available memory, so I am not really sure if it worth it to commit all this code without any perf improvement for most of the usage scenarios.

          Show
          Pavel Yaskevich added a comment - Yes, also even more important is that the unpark() cost, when amortized over a long running operation, becomes insignificant regardless of if it is incurred; and producers cannot make forward progress anyway because the native-transport queue is full so avoiding paying the unpark cost on the network thread really doesn't achieve us anything. I fully expect there to be very little effect on workloads as the dataset exceeds memory and the row size climbs. Exactly my point which starts right after the bq you have taken. Also most of the use cases is exactly that - data set which exceeds available memory, so I am not really sure if it worth it to commit all this code without any perf improvement for most of the usage scenarios.
          Hide
          Lior Golan added a comment -

          But there are use cases where the full working set is memory resident or close to that. Improving performance in these use cases would reduce the need for caching in front of Cassandra

          Show
          Lior Golan added a comment - But there are use cases where the full working set is memory resident or close to that. Improving performance in these use cases would reduce the need for caching in front of Cassandra
          Hide
          T Jake Luciani added a comment -

          so I am not really sure if it worth it to commit all this code without any perf improvement for most of the usage scenarios.

          What about writes, that's a pretty big scenario this helps improve

          Show
          T Jake Luciani added a comment - so I am not really sure if it worth it to commit all this code without any perf improvement for most of the usage scenarios. What about writes, that's a pretty big scenario this helps improve
          Hide
          Benedict added a comment - - edited

          Also most of the use cases is exactly that - data set which exceeds available memory

          Well, except that we expect in general for recent data to be accessed most often, or data to be accessed according to a zipf distribution, and in both of these cases caching helps to keep a significant portion of the data we're accessing in memory. Also, more users are getting incredibly performant SSDs that can respond to queries in time horizons measured in microseconds, and as this becomes the norm the distinction also becomes less important.

          Show
          Benedict added a comment - - edited Also most of the use cases is exactly that - data set which exceeds available memory Well, except that we expect in general for recent data to be accessed most often, or data to be accessed according to a zipf distribution, and in both of these cases caching helps to keep a significant portion of the data we're accessing in memory. Also, more users are getting incredibly performant SSDs that can respond to queries in time horizons measured in microseconds, and as this becomes the norm the distinction also becomes less important.
          Hide
          Benedict added a comment -

          I've force pushed an updated branch to the repository which is simpler and has some nicer properties (though is ~1% slower at lower thread counts). I'm pretty happy with its current state, although still need to create some thorough executor stress tests.

          In the latest version workers self coordinate descheduling through a simpler scheme than the separate descheduler. The new scheme also permits thread over-provisioning to be corrected incredibly promptly (i.e. almost instantly), eliminating my one concern about this approach (that it could be slightly resource unfriendly in cases of variable workloads when sharing the underlying platform with another service).

          The latest version also delivers more consistency in its throughput rate by using a ConcurrentSkipListMap to order the spinning threads in order of expected schedule time, and by force-scheduling a new worker if a producer encounters a full task queue when not all workers are yet scheduled.

          Show
          Benedict added a comment - I've force pushed an updated branch to the repository which is simpler and has some nicer properties (though is ~1% slower at lower thread counts). I'm pretty happy with its current state, although still need to create some thorough executor stress tests. In the latest version workers self coordinate descheduling through a simpler scheme than the separate descheduler. The new scheme also permits thread over-provisioning to be corrected incredibly promptly (i.e. almost instantly), eliminating my one concern about this approach (that it could be slightly resource unfriendly in cases of variable workloads when sharing the underlying platform with another service). The latest version also delivers more consistency in its throughput rate by using a ConcurrentSkipListMap to order the spinning threads in order of expected schedule time, and by force-scheduling a new worker if a producer encounters a full task queue when not all workers are yet scheduled.
          Hide
          Jonathan Ellis added a comment -

          most of the use cases is exactly that - data set which exceeds available memory

          Right, but we've always targetted "total data larger than memory, hot data more or less fits." So I absolutely think this ticket is relevant for a lot of use cases.

          Show
          Jonathan Ellis added a comment - most of the use cases is exactly that - data set which exceeds available memory Right, but we've always targetted "total data larger than memory, hot data more or less fits." So I absolutely think this ticket is relevant for a lot of use cases.
          Hide
          Benedict added a comment - - edited

          A brief outline of the approach taken by the executor service I've submitted:

          It's premised on the idea that unpark() is a relatively expensive operation, and can block progress on the thread calling it (often it results in transfer of the physical execution to the signalled thread). So we want to avoid performing the operation as much as possible, so long as we do not incur any other penalties as a result of doing so.

          The approach I've taken to avoiding calling unpark() essentially amounts to trying to ensure the correct number of threads are running for servicing the current workload, without either delay of service or any waiting on any of the workers. We achieve this by essentially letting workers schedule themselves, except when, on producing work for the queue, we cannot guarantee that they will do so promptly (in which rare instance we spin up a worker directly) or the queue is full, in which case it costs us little to contribute to firing up workers. This can be roughly described as:

          1. If all workers are currently either sleeping indefinitely or occupied with work, we wake one (or start a new) worker
          2. Before starting any given task, a worker checks if any more work is available on the queue it's processing and tries to hand it off to another unoccupied worker (preferring those that are scheduled to wake up of their own accord in the near future, to avoid signalling it, but waking/starting one if necessary)
          3. Once we finish a task, we either:
            • take another task from the queue we just processed, if any available, and loop back to (2);
            • reassign ourselves to another executor that has work and go to (2);
            • finally, if that fails, we enter a "yield"-spin loop
          4. Each loop we spin for, we sleep a random interval scaled by the number of threads in this loop, so that the rate of wakeup on average is constant regardless of the number of spinning threads. When we wake up we:
            • Check if we should deschedule ourselves (based on the total time spent sleeping by all threads recently - if it exceeds the real time elapsed, we put a worker to sleep indefinitely, preferably ourselves)
            • Try to assign ourselves an executor with work outstanding, and go to (2)

          The actual assignment and queueing of work is itself a little interesting as well: to minimise signalling we have a ConcurrentLinkedQueue which is, by definition, unbounded. We then have a separate synchronisation state which maintains an atomic count of work permits (threads working the pool) and task permits (items on the queue). When we start a worker as a producer we actually don't touch this queue at all, we just start a worker in a spinning state and let it assign itself some work. We do this to avoid signalling any other producers that may be blocked on the queue being full. When as a worker we take work from the queue to either assign to ourselves or another worker we always atomically take both a worker permit and a task permit (or only the latter if we already own a task permit). This allows us to ensure we only wake up threads when they definitely have work to do.

          Show
          Benedict added a comment - - edited A brief outline of the approach taken by the executor service I've submitted: It's premised on the idea that unpark() is a relatively expensive operation, and can block progress on the thread calling it (often it results in transfer of the physical execution to the signalled thread). So we want to avoid performing the operation as much as possible, so long as we do not incur any other penalties as a result of doing so. The approach I've taken to avoiding calling unpark() essentially amounts to trying to ensure the correct number of threads are running for servicing the current workload, without either delay of service or any waiting on any of the workers. We achieve this by essentially letting workers schedule themselves, except when, on producing work for the queue, we cannot guarantee that they will do so promptly (in which rare instance we spin up a worker directly) or the queue is full, in which case it costs us little to contribute to firing up workers. This can be roughly described as: If all workers are currently either sleeping indefinitely or occupied with work, we wake one (or start a new) worker Before starting any given task, a worker checks if any more work is available on the queue it's processing and tries to hand it off to another unoccupied worker (preferring those that are scheduled to wake up of their own accord in the near future, to avoid signalling it, but waking/starting one if necessary) Once we finish a task, we either: take another task from the queue we just processed, if any available, and loop back to (2); reassign ourselves to another executor that has work and go to (2); finally, if that fails, we enter a "yield"-spin loop Each loop we spin for, we sleep a random interval scaled by the number of threads in this loop, so that the rate of wakeup on average is constant regardless of the number of spinning threads. When we wake up we: Check if we should deschedule ourselves (based on the total time spent sleeping by all threads recently - if it exceeds the real time elapsed, we put a worker to sleep indefinitely, preferably ourselves) Try to assign ourselves an executor with work outstanding, and go to (2) The actual assignment and queueing of work is itself a little interesting as well: to minimise signalling we have a ConcurrentLinkedQueue which is, by definition, unbounded. We then have a separate synchronisation state which maintains an atomic count of work permits (threads working the pool) and task permits (items on the queue). When we start a worker as a producer we actually don't touch this queue at all, we just start a worker in a spinning state and let it assign itself some work. We do this to avoid signalling any other producers that may be blocked on the queue being full. When as a worker we take work from the queue to either assign to ourselves or another worker we always atomically take both a worker permit and a task permit (or only the latter if we already own a task permit). This allows us to ensure we only wake up threads when they definitely have work to do.
          Hide
          Pavel Yaskevich added a comment -

          What about writes, that's a pretty big scenario this helps improve

          The latest Ryan's numbers are from write workload.

          Well, except that we expect in general for recent data to be accessed most often, or data to be accessed according to a zipf distribution, and in both of these cases caching helps to keep a significant portion of the data we're accessing in memory. Also, more users are getting incredibly performant SSDs that can respond to queries in time horizons measured in microseconds, and as this becomes the norm the distinction also becomes less important.

          I always thought that Zipf's law is for the scientific data, is it not? SSD could be performant but you can't get the full speed yet as close as you can get currently is 3.13+ with multiqueue support enabled.

          Right, but we've always targetted "total data larger than memory, hot data more or less fits." So I absolutely think this ticket is relevant for a lot of use cases.

          Exactly, "hot data more or less fits" so the problem is that once you get into page page reclaim and disk reads (even SSDs), improvements maid here are no longer doing anything helpful, I think that would be clearly visible on the benchmarks to come.

          Show
          Pavel Yaskevich added a comment - What about writes, that's a pretty big scenario this helps improve The latest Ryan's numbers are from write workload. Well, except that we expect in general for recent data to be accessed most often, or data to be accessed according to a zipf distribution, and in both of these cases caching helps to keep a significant portion of the data we're accessing in memory. Also, more users are getting incredibly performant SSDs that can respond to queries in time horizons measured in microseconds, and as this becomes the norm the distinction also becomes less important. I always thought that Zipf's law is for the scientific data, is it not? SSD could be performant but you can't get the full speed yet as close as you can get currently is 3.13+ with multiqueue support enabled. Right, but we've always targetted "total data larger than memory, hot data more or less fits." So I absolutely think this ticket is relevant for a lot of use cases. Exactly, "hot data more or less fits" so the problem is that once you get into page page reclaim and disk reads (even SSDs), improvements maid here are no longer doing anything helpful, I think that would be clearly visible on the benchmarks to come.
          Hide
          Jonathan Ellis added a comment -

          Exactly, "hot data more or less fits" so the problem is that once you get into page page reclaim and disk reads (even SSDs), improvements maid here are no longer doing anything helpful

          I don't follow you at all. If 90% of reads are already in-cache, this is going to help even if 10% are going to disk.

          Show
          Jonathan Ellis added a comment - Exactly, "hot data more or less fits" so the problem is that once you get into page page reclaim and disk reads (even SSDs), improvements maid here are no longer doing anything helpful I don't follow you at all. If 90% of reads are already in-cache, this is going to help even if 10% are going to disk.
          Hide
          Ryan McGuire added a comment - - edited

          Benedict more "short" tests. Updated here as they complete:

          EC2 c3.8xlarge, cql native:

          bdplab, cql native:

          EC2 c3.8xlarge, thrift smart:

          bdplab, thrift smart:

          Show
          Ryan McGuire added a comment - - edited Benedict more "short" tests. Updated here as they complete: EC2 c3.8xlarge, cql native: 810 threads 270 threads 90 threads bdplab, cql native: 810 threads 270 threads 90 threads EC2 c3.8xlarge, thrift smart: 810 threads 270 threads 90 threads bdplab, thrift smart: 810 threads 270 threads 90 threads
          Hide
          Benedict added a comment -

          I always thought that Zipf's law is for the scientific data, is it not?

          As far as I'm aware a zipf-like distribution is considered a good approximation for many data access patterns. A quick google yields an article showing that much web traffic follows a zipf distribution, but some follows a slightly different exponential distribution: http://www.cs.gmu.edu/~sqchen/publications/sigmetrics07-poster.pdf

          Show
          Benedict added a comment - I always thought that Zipf's law is for the scientific data, is it not? As far as I'm aware a zipf-like distribution is considered a good approximation for many data access patterns. A quick google yields an article showing that much web traffic follows a zipf distribution, but some follows a slightly different exponential distribution: http://www.cs.gmu.edu/~sqchen/publications/sigmetrics07-poster.pdf
          Hide
          Benedict added a comment - - edited

          Thanks Ryan McGuire!

          Those graphs all look pretty good to me. Think it's time to run some of the longer tests to see that performance is still good for other workloads. Let's drop thrift from the equation now.

          I'd suggest something like

          write n=2400000000 -key populate=1..2400000000
          force major compaction
          for each thread count/branch:
          read n=1000000000 -key dist=extr(1..2400000000,2)
          and warm up with one (any) read test run before the rest, so that they all are playing from a roughly level page cache point

          This should create a dataset in the region of 440Gb, but around 75% of requests will be to ~160Gb of it, which should be in the region of the amount of page cache available to the EC2 systems after bloom filters etc. are accounted for

          NB: if you want to play with different distributions, cassandra-stress print lets you see what a spec would yield

          Show
          Benedict added a comment - - edited Thanks Ryan McGuire ! Those graphs all look pretty good to me. Think it's time to run some of the longer tests to see that performance is still good for other workloads. Let's drop thrift from the equation now. I'd suggest something like write n=2400000000 -key populate=1..2400000000 force major compaction for each thread count/branch: read n=1000000000 -key dist=extr(1..2400000000,2) and warm up with one (any) read test run before the rest, so that they all are playing from a roughly level page cache point This should create a dataset in the region of 440Gb, but around 75% of requests will be to ~160Gb of it, which should be in the region of the amount of page cache available to the EC2 systems after bloom filters etc. are accounted for NB: if you want to play with different distributions, cassandra-stress print lets you see what a spec would yield
          Hide
          Pavel Yaskevich added a comment -

          I don't follow you at all. If 90% of reads are already in-cache, this is going to help even if 10% are going to disk.

          But even 10% could be the factor which throws it off as those requests may be occupying the read threads (going to disk) so that the requests for in-memory get behind them. Further, there are problems when memtable flush and compaction occurs (as we all know) that will fill up the IO bandwidth, which may mitigate advantages in the parking algs here.

          Not mentioning that with compression every read results in syscall which forces thread to get parked anyway, so as we have a fixed number of threads (now globally, earlier per stage) it's almost always guaranteed to park most of the threads which yields spinning useless as my previous comment. Based on the graphs Ryan have posted it seems like there is really slow to no advantage for the Thrift, for Netty the biggest win comes from batching not new thread pool implementation.

          As far as I'm aware a zipf-like distribution is considered a good approximation for many data access patterns. A quick google yields an article showing that much web traffic follows a zipf distribution, but some follows a slightly different exponential distribution: http://www.cs.gmu.edu/~sqchen/publications/sigmetrics07-poster.pdf

          If my many you mean scientific usage then I would agree, but how is the web traffic is relevant to us I'm not sure.

          Show
          Pavel Yaskevich added a comment - I don't follow you at all. If 90% of reads are already in-cache, this is going to help even if 10% are going to disk. But even 10% could be the factor which throws it off as those requests may be occupying the read threads (going to disk) so that the requests for in-memory get behind them. Further, there are problems when memtable flush and compaction occurs (as we all know) that will fill up the IO bandwidth, which may mitigate advantages in the parking algs here. Not mentioning that with compression every read results in syscall which forces thread to get parked anyway, so as we have a fixed number of threads (now globally, earlier per stage) it's almost always guaranteed to park most of the threads which yields spinning useless as my previous comment. Based on the graphs Ryan have posted it seems like there is really slow to no advantage for the Thrift, for Netty the biggest win comes from batching not new thread pool implementation. As far as I'm aware a zipf-like distribution is considered a good approximation for many data access patterns. A quick google yields an article showing that much web traffic follows a zipf distribution, but some follows a slightly different exponential distribution: http://www.cs.gmu.edu/~sqchen/publications/sigmetrics07-poster.pdf If my many you mean scientific usage then I would agree, but how is the web traffic is relevant to us I'm not sure.
          Hide
          Jonathan Ellis added a comment -

          Granted that a new executorservice won't help i/o bound workloads, but I knew that when I created the ticket and "must be significantly better for all workloads" is an unrealistically high bar for optimization work. This gives us a pretty huge benefit on at least some workloads (1, 2) and a smaller benefit on others, which I'm quite happy with. Unless the longer benchmarks Ryan is running show dramatically different results, I'm +1.

          I also note that the work here is almost entirely self contained, with the major exception being some new code in Message.Dispatcher. So while it's not as simple as dropping in LTQ or BAQ or FJP, the results are absolutely good enough to be worth a new Executor implementation.

          Show
          Jonathan Ellis added a comment - Granted that a new executorservice won't help i/o bound workloads, but I knew that when I created the ticket and "must be significantly better for all workloads" is an unrealistically high bar for optimization work. This gives us a pretty huge benefit on at least some workloads ( 1 , 2 ) and a smaller benefit on others, which I'm quite happy with. Unless the longer benchmarks Ryan is running show dramatically different results, I'm +1. I also note that the work here is almost entirely self contained, with the major exception being some new code in Message.Dispatcher. So while it's not as simple as dropping in LTQ or BAQ or FJP, the results are absolutely good enough to be worth a new Executor implementation.
          Hide
          Pavel Yaskevich added a comment - - edited

          What I'm saying is so far it gives the benefit only for the really small workset e.g. stress that runs for 1 minute. For the longer running test there is very small to no difference (latest bdplab tests), so we are doing longer running test right now in parallel with Ryan, currently the only valuable change I see here is batching for Netty.

          Show
          Pavel Yaskevich added a comment - - edited What I'm saying is so far it gives the benefit only for the really small workset e.g. stress that runs for 1 minute. For the longer running test there is very small to no difference (latest bdplab tests), so we are doing longer running test right now in parallel with Ryan, currently the only valuable change I see here is batching for Netty.
          Hide
          Jason Brown added a comment - - edited

          Ryan McGuire How many threads are you running thrift with? If you aren't setting it explicitly, (iirc) it gets set to the number of processors, which is far below what anything sane should run with. For our machines, I've been using 512 for writes, and 128 for reads (mirroring what we run with in prod, which is same hardware as the machines I'm testing on, more or less). I think this may explain we we do not see the vast discrepancy between thrift and native protocol ops/second - native protocol defaults to 128 threads.

          Also, are you using sync or hsha for thrift?

          Show
          Jason Brown added a comment - - edited Ryan McGuire How many threads are you running thrift with? If you aren't setting it explicitly, (iirc) it gets set to the number of processors, which is far below what anything sane should run with. For our machines, I've been using 512 for writes, and 128 for reads (mirroring what we run with in prod, which is same hardware as the machines I'm testing on, more or less). I think this may explain we we do not see the vast discrepancy between thrift and native protocol ops/second - native protocol defaults to 128 threads. Also, are you using sync or hsha for thrift?
          Hide
          Sylvain Lebresne added a comment -

          currently the only valuable change I see here is batching for Netty

          I'm being lost in all the benchmark graphs and what they include I'll admit. We've now committed the batching separately with CASSANDRA-5663. Can someone sum up the graphs for
          "current tip of 2.1 (with CASSANDRA-5663)" versus "the same + benedict last patch"? Since we're talking about benchmarks, it shouldn't be too hard to remove batching from the equation and check what value remains.

          Not mentioning that with compression every read results in syscall which forces thread to get parked anyway

          Can we bench with compression maybe? Both to see if any benefits is indeed lost when compression is on, and if compression generally out-perform no-compression or not.

          Show
          Sylvain Lebresne added a comment - currently the only valuable change I see here is batching for Netty I'm being lost in all the benchmark graphs and what they include I'll admit. We've now committed the batching separately with CASSANDRA-5663 . Can someone sum up the graphs for "current tip of 2.1 (with CASSANDRA-5663 )" versus "the same + benedict last patch"? Since we're talking about benchmarks, it shouldn't be too hard to remove batching from the equation and check what value remains. Not mentioning that with compression every read results in syscall which forces thread to get parked anyway Can we bench with compression maybe? Both to see if any benefits is indeed lost when compression is on, and if compression generally out-perform no-compression or not.
          Hide
          Benedict added a comment - - edited

          (latest bdplab tests)

          Which latest bdplab tests? The longer bdplab test from before (not the latest tests) had some issues (unrelated to this ticket) so we didn't get any read results, but showed increased write throughput.

          The latest tests have all been short runs. I am actually very pleased we are at all faster on bdplab for any workload, as the first versions of these patches did not seem to benefit older hardware/kernels (we don't have enough hardware configurations to say which was the deciding factor), and actually incurred a slight penalty. The fact that the gap is very narrow for bdplab is not really important, nor are the thrift numbers. In both of those instances the interesting thing is only that we do not perform any worse; performing slightly better even here is just a bonus.

          It's worth pointing out that with only one exception out of all the graphs above (EC2 thrift 870tc) latencies are also appreciably reduced right up to the 99.9th percentile, and both latency and throughput are also more consistent and predictable. These is a pretty decent results in themselves; but thrown in with a 20-30% throughput bump for in-memory native workloads on modern OS/hardware, it seems pretty compelling to me.

          Show
          Benedict added a comment - - edited (latest bdplab tests) Which latest bdplab tests? The longer bdplab test from before (not the latest tests) had some issues (unrelated to this ticket) so we didn't get any read results, but showed increased write throughput. The latest tests have all been short runs. I am actually very pleased we are at all faster on bdplab for any workload, as the first versions of these patches did not seem to benefit older hardware/kernels (we don't have enough hardware configurations to say which was the deciding factor), and actually incurred a slight penalty. The fact that the gap is very narrow for bdplab is not really important, nor are the thrift numbers. In both of those instances the interesting thing is only that we do not perform any worse ; performing slightly better even here is just a bonus. It's worth pointing out that with only one exception out of all the graphs above (EC2 thrift 870tc) latencies are also appreciably reduced right up to the 99.9th percentile , and both latency and throughput are also more consistent and predictable . These is a pretty decent results in themselves; but thrown in with a 20-30% throughput bump for in-memory native workloads on modern OS/hardware, it seems pretty compelling to me.
          Hide
          Benedict added a comment -

          Since we're talking about benchmarks, it shouldn't be too hard to remove batching from the equation and check what value remains.

          The batching has already been committed to tip, so the 2.1-batchnetty is essentially this.

          Can we bench with compression maybe?

          We probably should bench to get a comparison. It is quite likely the benefit will be lost, given we decompress 64K chunks at a time and currently have no uncompressed page cache.

          Show
          Benedict added a comment - Since we're talking about benchmarks, it shouldn't be too hard to remove batching from the equation and check what value remains. The batching has already been committed to tip, so the 2.1-batchnetty is essentially this. Can we bench with compression maybe? We probably should bench to get a comparison. It is quite likely the benefit will be lost, given we decompress 64K chunks at a time and currently have no uncompressed page cache.
          Hide
          Sylvain Lebresne added a comment -

          The batching has already been committed to tip, so the 2.1-batchnetty is essentially this.

          And 4718-sep is essentially 2.1-batchnetty + the patch for this, right?

          Show
          Sylvain Lebresne added a comment - The batching has already been committed to tip, so the 2.1-batchnetty is essentially this. And 4718-sep is essentially 2.1-batchnetty + the patch for this, right?
          Hide
          Benedict added a comment -

          And 4718-sep is essentially 2.1-batchnetty + the patch for this, right?

          Correct

          Show
          Benedict added a comment - And 4718-sep is essentially 2.1-batchnetty + the patch for this, right? Correct
          Hide
          Jason Brown added a comment - - edited

          Adding more fuel to the testing fire, i took a first pass at having a large of amount of data on disk (~2x the memory size of each box), and running the read tests - see attached file: stress_2014May15.txt. I cleared the page cache before switching to each branch from for the reads, and then performed 3 rounds of stress. The goal here was to see how the sep branch compared with cassandra-2.1 when doing most of the reads from disk (with a cold page cache, or where the cache is constantly churning due to new blocks being pulled in).

          The short story is the sep branch performs slightly worse the current cassandra-2.1 (which includes CASSANDRA-5663) on both ops/s and latencies.

          I'm going to do one more test where I preload a good chunk of the data into the page cache, then run the stress - hopefully to emulate the case where most reads come from the page cache and some go to disk. Will try to use a less naive key distribution alg, to ensure that we hit the hot keys, which is provided by stress.

          Show
          Jason Brown added a comment - - edited Adding more fuel to the testing fire, i took a first pass at having a large of amount of data on disk (~2x the memory size of each box), and running the read tests - see attached file: stress_2014May15.txt. I cleared the page cache before switching to each branch from for the reads, and then performed 3 rounds of stress. The goal here was to see how the sep branch compared with cassandra-2.1 when doing most of the reads from disk (with a cold page cache, or where the cache is constantly churning due to new blocks being pulled in). The short story is the sep branch performs slightly worse the current cassandra-2.1 (which includes CASSANDRA-5663 ) on both ops/s and latencies. I'm going to do one more test where I preload a good chunk of the data into the page cache, then run the stress - hopefully to emulate the case where most reads come from the page cache and some go to disk. Will try to use a less naive key distribution alg, to ensure that we hit the hot keys, which is provided by stress.
          Jason Brown made changes -
          Attachment stress_2014May15.txt [ 12645055 ]
          Hide
          Jason Brown added a comment -

          Next attachment (stress_2014May16). After I created a new set of data (about 300GB per node), generated with populate=x (rather than uniform=0..x), I dropped the page cache, then I attempted to pre-populate the page cache by running a warm up read round, like this:

          ./tools/bin/cassandra-stress read n=180664790 -key dist=extr\(1..600000000,2\) -rate threads=75 -mode native prepared cql3 -port native=9043 thrift=9161  -node .... 

          I used the value "180664790" from stress's print function, which can give you a reasonable dist count to hit a certain percentage of coverage for the different models (note: i may be completely incorrect on this point, so feel free to correct my understanding).

          Then I ran the same 3 stress tests i ran in yesterday's run (one where I load all 21 cols with the row, load the default count (5), then only iterate over the default key count (1mil)). I then cleared the page cache and performed the same testing for the other branch (using the existing data, but warming up the cache).

          The results look like the sep branch had lower latencies vs. cassandra 2.1 (same as yesterday), but less ops/sec.

          tbh, I'm not sure I proved a whole lot since yesterday’s tests, as dstat showed I was loading 200-300Mb of data per second, so I was certainly giving the page cache a good workout in any case.

          Show
          Jason Brown added a comment - Next attachment (stress_2014May16). After I created a new set of data (about 300GB per node), generated with populate=x (rather than uniform=0..x), I dropped the page cache, then I attempted to pre-populate the page cache by running a warm up read round, like this: ./tools/bin/cassandra-stress read n=180664790 -key dist=extr\(1..600000000,2\) -rate threads=75 -mode native prepared cql3 -port native =9043 thrift=9161 -node .... I used the value "180664790" from stress's print function, which can give you a reasonable dist count to hit a certain percentage of coverage for the different models (note: i may be completely incorrect on this point, so feel free to correct my understanding). Then I ran the same 3 stress tests i ran in yesterday's run (one where I load all 21 cols with the row, load the default count (5), then only iterate over the default key count (1mil)). I then cleared the page cache and performed the same testing for the other branch (using the existing data, but warming up the cache). The results look like the sep branch had lower latencies vs. cassandra 2.1 (same as yesterday), but less ops/sec. tbh, I'm not sure I proved a whole lot since yesterday’s tests, as dstat showed I was loading 200-300Mb of data per second, so I was certainly giving the page cache a good workout in any case.
          Jason Brown made changes -
          Attachment stress_2014May16.txt [ 12645275 ]
          Hide
          Jonathan Ellis added a comment -

          Yeah, 200+MB/s sounds pretty disk bound to me. I vote that we move to the actual code review; we can certainly make further improvements later.

          Show
          Jonathan Ellis added a comment - Yeah, 200+MB/s sounds pretty disk bound to me. I vote that we move to the actual code review; we can certainly make further improvements later.
          Hide
          Benedict added a comment -

          For reference, a plot of reads from a much larger than memory dataset against the austin cluster. There's a lot of variability, but no appreciable difference between the two, which is what I would expect (but good to confirm)

          Show
          Benedict added a comment - For reference, a plot of reads from a much larger than memory dataset against the austin cluster. There's a lot of variability, but no appreciable difference between the two, which is what I would expect (but good to confirm)
          Benedict made changes -
          Attachment austin_diskbound_read.svg [ 12645339 ]
          Hide
          Pavel Yaskevich added a comment -

          Yeah, 200+MB/s sounds pretty disk bound to me. I vote that we move to the actual code review; we can certainly make further improvements later.

          I think what Jason meant is when he started doing reads system was pooling a lot of data into the memory at first, ~300GB he loaded was RF=2 and we have 128GB of RAM apart from kernel memory on those machines, so essentially it's ~150GB for primary replica which is not much bigger than total available memory for page cache, pretty much accounts for 10% you were talking about. As a summary, we made two benchmarks, first where amount of data was bigger than memory available for the page cache, second where most of the data fits into memory, both cases sep branch was performing worse than cassandra-2.1.

          Show
          Pavel Yaskevich added a comment - Yeah, 200+MB/s sounds pretty disk bound to me. I vote that we move to the actual code review; we can certainly make further improvements later. I think what Jason meant is when he started doing reads system was pooling a lot of data into the memory at first, ~300GB he loaded was RF=2 and we have 128GB of RAM apart from kernel memory on those machines, so essentially it's ~150GB for primary replica which is not much bigger than total available memory for page cache, pretty much accounts for 10% you were talking about. As a summary, we made two benchmarks, first where amount of data was bigger than memory available for the page cache, second where most of the data fits into memory, both cases sep branch was performing worse than cassandra-2.1.
          Hide
          Benedict added a comment -

          Pavel Yaskevich why are you only counting the primary replica data? Requests will hit both replicas by default? If you look at the results there is a reasonable amount of variability for both runs, so it's not clear that one is slower or faster - there are a number of points where 4718-sep is faster than 2.1, and vice versa, and given it is disk bound I am inclined to suggest this is not the patch making it perform worse. In fact, a majority of data points show higher throughput for 4718-sep, not for 2.1. Your first test, every thread count below 271 is faster; 271 seems to be a blip due to a small number of very slow reads affecting the very last measurement (there's a "race" in stress' auto mode where some measurements are still accepted after it's decided enough have been taken, as can be seen by the final stderr being above the acceptability point); 2.1 showed a similar effect at this tc, but smaller, so this seems likely to be random chance. The last test it is faster for all thread counts despite some weird max latencies. It's only the middle test where it appears to be marginally slower, and given this test performs effectively exactly the same amount of work as the first test, I'm not sure this demonstrates a great deal other than the variability.

          It's also worth asking what your max read concurrency is? As I'm surprised to see thread counts > 180 causing dramatic spikes in latency (both branches) when I'd expect them to be saturating the read stage well before then?

          Show
          Benedict added a comment - Pavel Yaskevich why are you only counting the primary replica data? Requests will hit both replicas by default? If you look at the results there is a reasonable amount of variability for both runs, so it's not clear that one is slower or faster - there are a number of points where 4718-sep is faster than 2.1, and vice versa, and given it is disk bound I am inclined to suggest this is not the patch making it perform worse. In fact, a majority of data points show higher throughput for 4718-sep, not for 2.1. Your first test, every thread count below 271 is faster; 271 seems to be a blip due to a small number of very slow reads affecting the very last measurement (there's a "race" in stress' auto mode where some measurements are still accepted after it's decided enough have been taken, as can be seen by the final stderr being above the acceptability point); 2.1 showed a similar effect at this tc, but smaller, so this seems likely to be random chance. The last test it is faster for all thread counts despite some weird max latencies. It's only the middle test where it appears to be marginally slower, and given this test performs effectively exactly the same amount of work as the first test, I'm not sure this demonstrates a great deal other than the variability. It's also worth asking what your max read concurrency is? As I'm surprised to see thread counts > 180 causing dramatic spikes in latency (both branches) when I'd expect them to be saturating the read stage well before then?
          Hide
          Benedict added a comment -

          Whilst on this topic, I've been thinking about disk/memory testing protocols in general, and it seems we really need to think through a good strategy for creating a consistent test bed that is representative. The test I have asked Ryan McGuire to run is not going to be fair, as major compaction will cause all of the data points to be randomly distributed (by hash) across a single sstable, and given the records are small, selecting from a smallish random subset of this data will pretty much necessarily involve touching every page on disk with equal probability. However disabling compaction entirely is equally unfair, as we leave many sstables to check from bloom filter false positives (there are around 800+ sstables in Jason's test, for 300Gb of data, at a finger in air estimate), so most of the cache will be going to index files, with almost every data item lookup probably going to disk due to the reduced memory causing the same effect as the major compaction to kick in.

          It seems to me we need to 1) get the exponential distribution to select from last keys in preference to first keys (i.e. most recently written most commonly accessed); and 2) create a compaction strategy for testing purposes, that is designed to create a sort-of "in flight snapshot" of a real STCS workload, by compacting older data into exponentially larger files. These two together should give us much closer to a real live system that is using STCS, and with a consistent reproducible baseline behaviour.

          Show
          Benedict added a comment - Whilst on this topic, I've been thinking about disk/memory testing protocols in general, and it seems we really need to think through a good strategy for creating a consistent test bed that is representative. The test I have asked Ryan McGuire to run is not going to be fair, as major compaction will cause all of the data points to be randomly distributed (by hash) across a single sstable, and given the records are small, selecting from a smallish random subset of this data will pretty much necessarily involve touching every page on disk with equal probability. However disabling compaction entirely is equally unfair, as we leave many sstables to check from bloom filter false positives (there are around 800+ sstables in Jason's test, for 300Gb of data, at a finger in air estimate), so most of the cache will be going to index files, with almost every data item lookup probably going to disk due to the reduced memory causing the same effect as the major compaction to kick in. It seems to me we need to 1) get the exponential distribution to select from last keys in preference to first keys (i.e. most recently written most commonly accessed); and 2) create a compaction strategy for testing purposes, that is designed to create a sort-of "in flight snapshot" of a real STCS workload, by compacting older data into exponentially larger files. These two together should give us much closer to a real live system that is using STCS, and with a consistent reproducible baseline behaviour.
          Hide
          Pavel Yaskevich added a comment -

          Benedict Isn't CQL trying to be smart about request routing like Thrift? Anyhow, read concurrency was default 128 (can you confirm Jason Brown?) and we do remove all of the files, drop cache, restart, disable compaction to remove jitter as much as possible, so the only difference between two runs is one has sep patch another doesn't, if there are slow reads that should be happening in both runs because keys are read uniformly and although there is a big amount of sstables in the system for every read there is only one hit, which pretty much simulates the behavior of the systems where data accumulates over time. Also I can tell you that setup that we have for I/O is able to handle mere 300GB without a problem.

          Show
          Pavel Yaskevich added a comment - Benedict Isn't CQL trying to be smart about request routing like Thrift? Anyhow, read concurrency was default 128 (can you confirm Jason Brown ?) and we do remove all of the files, drop cache, restart, disable compaction to remove jitter as much as possible, so the only difference between two runs is one has sep patch another doesn't, if there are slow reads that should be happening in both runs because keys are read uniformly and although there is a big amount of sstables in the system for every read there is only one hit, which pretty much simulates the behavior of the systems where data accumulates over time. Also I can tell you that setup that we have for I/O is able to handle mere 300GB without a problem.
          Hide
          Benedict added a comment - - edited

          But the sep branch was actually faster more often than it was slower? And yes it routes intelligently, but to both replicas...?

          I've attached three graphs to visualise the output from Jason's test runs, that I hope express better what I was trying to get across in my previous comment: that the sep branch is actually faster in the workload that operates over a smaller domain (run3), and that it is also more often faster for the disk bound workloads, but that I expect that the difference is most likely random variation.

          The evidence is that

          1. run2 shows performance of both branches crossing each other at different points;
          2. that run1 is faster universally for sep (and both run1 and run2 perform the same amount of IO per operation); and
          3. because, unless there is a bug, it should be very difficult for either patch to demonstrate a major performance difference on disk bound workloads - so long as all read workers are scheduled, the disk is exclusively what should define our throughput.

          This work is clearly disk bound as the same hardware was pushing 250k/s with similar record sizes when exclusively in memory - we're seeing only 5% of that now. Unless possibly in-memory index scans are occupying all of the time (but according to Jason CPU utilisation was around 30% from a random non-scientific poll).

          I'm doing my best to produce workload on hardware I have available to me to absolutely 100% rule out any such issue as (3), but my point is that it is very hard to get accurate consistent numbers with which to draw strong conclusions when the difference we're measuring is smaller than measurement noise.

          Show
          Benedict added a comment - - edited But the sep branch was actually faster more often than it was slower? And yes it routes intelligently, but to both replicas...? I've attached three graphs to visualise the output from Jason's test runs, that I hope express better what I was trying to get across in my previous comment: that the sep branch is actually faster in the workload that operates over a smaller domain (run3), and that it is also more often faster for the disk bound workloads, but that I expect that the difference is most likely random variation. The evidence is that run2 shows performance of both branches crossing each other at different points; that run1 is faster universally for sep (and both run1 and run2 perform the same amount of IO per operation); and because, unless there is a bug, it should be very difficult for either patch to demonstrate a major performance difference on disk bound workloads - so long as all read workers are scheduled, the disk is exclusively what should define our throughput. This work is clearly disk bound as the same hardware was pushing 250k/s with similar record sizes when exclusively in memory - we're seeing only 5% of that now. Unless possibly in-memory index scans are occupying all of the time (but according to Jason CPU utilisation was around 30% from a random non-scientific poll). I'm doing my best to produce workload on hardware I have available to me to absolutely 100% rule out any such issue as (3), but my point is that it is very hard to get accurate consistent numbers with which to draw strong conclusions when the difference we're measuring is smaller than measurement noise.
          Benedict made changes -
          Attachment jason_run1.svg [ 12645411 ]
          Attachment jason_run2.svg [ 12645412 ]
          Attachment jason_run3.svg [ 12645413 ]
          Hide
          Pavel Yaskevich added a comment -

          I think you have plotted numbers from May16. I'm not sure what do you mean by "often" the problem with numbers is that they apparently cut off for both branches We have to redo the test I think, Jason Brown is there a way to guarantee that both branches are going to do the same number of runs with stress? I disagree with #2 because sep shows a sudden drop at the end as do runs 1 and 3 so we don't really know what is going to happen with sep on the high stress concurrency in those runs.

          This work is clearly disk bound as the same hardware was pushing 250k/s with similar record sizes when exclusively in memory - we're seeing only 5% of that now. Unless possibly in-memory index scans are occupying all of the time (but according to Jason CPU utilisation was around 30% from a random non-scientific poll).

          I'm not sure if we can count 250K/s as a disk bound workload, which is only 3 buffer reads per second.

          Show
          Pavel Yaskevich added a comment - I think you have plotted numbers from May16. I'm not sure what do you mean by "often" the problem with numbers is that they apparently cut off for both branches We have to redo the test I think, Jason Brown is there a way to guarantee that both branches are going to do the same number of runs with stress? I disagree with #2 because sep shows a sudden drop at the end as do runs 1 and 3 so we don't really know what is going to happen with sep on the high stress concurrency in those runs. This work is clearly disk bound as the same hardware was pushing 250k/s with similar record sizes when exclusively in memory - we're seeing only 5% of that now. Unless possibly in-memory index scans are occupying all of the time (but according to Jason CPU utilisation was around 30% from a random non-scientific poll). I'm not sure if we can count 250K/s as a disk bound workload, which is only 3 buffer reads per second.
          Hide
          Benedict added a comment - - edited

          I meant 250Kop/s. We're now pushing 6Kop/s. The numbers from 16th May are the latest posted, to my knowledge, and the ones we're discussing?

          You can make stress do a fixed number of ops per run, but not a fixed set of thread counts currently - its auto mode (that this is from) ramps up thread counts until it detects a plateau; in these tests it seems that sep reached a higher throughput rate earlier, and so when it normalised down again stress considered it to have plateaued earlier. As to #2, run1 when it is truncated at a lower tc is as fast as stock is at its peak. However, you're right that it is possible it would have tanked further - in this case this would be indicative of a bug rather than a fundamental flaw in its design, but it is almost certainly down to the natural tendency to dip slightly below peak throughput after the real plateau.

          I can patch stress briefly to force it to run all thread counts in the requested range, instead of stopping when it hits a plateau, but the auto-mode isn't really designed to be a canonical test. If we want accurate like-for-like comparisons we want to graph each thread count separately for its whole run, and ensure each run is long enough to spot the general behavioural pattern (i.e. at least a few minutes for IO bound work). I'd also ensure we interleaved the two branches to try to avoid any weird page caching / other utilisation interferences.

          That said, I don't think we're going to see a great deal by doing any of that. But I'm always pleased to see more data points (I will be continuing to run more tests, and burn in tests on the executor service).

          Show
          Benedict added a comment - - edited I meant 250Kop/s. We're now pushing 6Kop/s. The numbers from 16th May are the latest posted, to my knowledge, and the ones we're discussing? You can make stress do a fixed number of ops per run, but not a fixed set of thread counts currently - its auto mode (that this is from) ramps up thread counts until it detects a plateau; in these tests it seems that sep reached a higher throughput rate earlier, and so when it normalised down again stress considered it to have plateaued earlier. As to #2, run1 when it is truncated at a lower tc is as fast as stock is at its peak. However, you're right that it is possible it would have tanked further - in this case this would be indicative of a bug rather than a fundamental flaw in its design, but it is almost certainly down to the natural tendency to dip slightly below peak throughput after the real plateau. I can patch stress briefly to force it to run all thread counts in the requested range, instead of stopping when it hits a plateau, but the auto-mode isn't really designed to be a canonical test. If we want accurate like-for-like comparisons we want to graph each thread count separately for its whole run, and ensure each run is long enough to spot the general behavioural pattern (i.e. at least a few minutes for IO bound work). I'd also ensure we interleaved the two branches to try to avoid any weird page caching / other utilisation interferences. That said, I don't think we're going to see a great deal by doing any of that. But I'm always pleased to see more data points (I will be continuing to run more tests, and burn in tests on the executor service).
          Hide
          Pavel Yaskevich added a comment -

          250K op/s archived when test was running with almost no data, total duration was around a minute or so. We are, on the other hand, trying to make it more realistic in terms of data amount, I'm not sure about the tests from May 16 but when we did May 15 (which is completely different test btw, addressing your point from previous comment) there was almost no disk activity after original page cache warm up. If you can please patch the test to do runs with all of the threads and once we re-run I will also check disk activity, but I'm pretty sure it would be minimal, reading from the page cache cost is not as efficient are reading from anonymous area plus it does a syscall with compression (which is used by default) so I'm not surprised that op/s degraded.

          Show
          Pavel Yaskevich added a comment - 250K op/s archived when test was running with almost no data, total duration was around a minute or so. We are, on the other hand, trying to make it more realistic in terms of data amount, I'm not sure about the tests from May 16 but when we did May 15 (which is completely different test btw, addressing your point from previous comment) there was almost no disk activity after original page cache warm up. If you can please patch the test to do runs with all of the threads and once we re-run I will also check disk activity, but I'm pretty sure it would be minimal, reading from the page cache cost is not as efficient are reading from anonymous area plus it does a syscall with compression (which is used by default) so I'm not surprised that op/s degraded.
          Hide
          Benedict added a comment -

          compression is disabled by stress by default.

          Show
          Benedict added a comment - compression is disabled by stress by default.
          Hide
          Jonathan Ellis added a comment -

          I can patch stress briefly to force it to run all thread counts in the requested range, instead of stopping when it hits a plateau

          That sounds like a good option to have.

          when we did May 15 (which is completely different test btw, addressing your point from previous comment) there was almost no disk activity after original page cache warm up

          That doesn't sound right to me, all the number from May 15 are 7k-14k ops/s which is disk bound territory.

          Show
          Jonathan Ellis added a comment - I can patch stress briefly to force it to run all thread counts in the requested range, instead of stopping when it hits a plateau That sounds like a good option to have. when we did May 15 (which is completely different test btw, addressing your point from previous comment) there was almost no disk activity after original page cache warm up That doesn't sound right to me, all the number from May 15 are 7k-14k ops/s which is disk bound territory.
          Hide
          Benedict added a comment -

          Attached are some graphs of running reads with exponential key distributions over different key ranges (10M, 100M, 600M) - though over the same dataset (600M keys), on a 2-node c3.8xlarge cluster, with 1 c3.8xlarge load generator. All of the thread counts were run for 1M operations. These are the last of a sequence of test runs as, initially, by far the biggest determining factor was page cache behaviour - with each iteration the page cache's page retention algorithm got progressively better at retaining the best subset of the pages to service the requests. Note also the scales - in particular the 600M test both are within 1% of each other performance-wise, and I would note that the variability was greater than 1%, and that in prior runs the positions were reversed.

          Show
          Benedict added a comment - Attached are some graphs of running reads with exponential key distributions over different key ranges (10M, 100M, 600M) - though over the same dataset (600M keys), on a 2-node c3.8xlarge cluster, with 1 c3.8xlarge load generator. All of the thread counts were run for 1M operations. These are the last of a sequence of test runs as, initially, by far the biggest determining factor was page cache behaviour - with each iteration the page cache's page retention algorithm got progressively better at retaining the best subset of the pages to service the requests. Note also the scales - in particular the 600M test both are within 1% of each other performance-wise, and I would note that the variability was greater than 1%, and that in prior runs the positions were reversed.
          Benedict made changes -
          Attachment E10M_summary_key_s.svg [ 12645678 ]
          Attachment E100M_summary_key_s.svg [ 12645679 ]
          Attachment E600M_summary_key_s.svg [ 12645680 ]
          Hide
          Jonathan Ellis added a comment -

          This looks exactly like I would have predicted: the more disk-bound the workload is, the less the executorservice matters. But when our hot dataset is cacheable (10M and to a lesser degree 100M) -sep is a clear win. This is the scenario that we ought to be optimizing for.

          Show
          Jonathan Ellis added a comment - This looks exactly like I would have predicted: the more disk-bound the workload is, the less the executorservice matters. But when our hot dataset is cacheable (10M and to a lesser degree 100M) -sep is a clear win. This is the scenario that we ought to be optimizing for.
          Hide
          Benedict added a comment -

          One thing worth mentioning is that the size of the dataset over which this is effective is not necessarily represented accurately by the test, as it was run over a fully-compacted dataset, so the 10M keys would have been randomly distributed across all pages (we select from a prefix of the key range, but the murmur hash will get evenly distributed across the entire dataset once fully compacted). Were this run on a real dataset, with the most recent data being compacted separately to the older data, and the most recent data being hit primarily, there would be greater locality of data access and so any gains should be effective over a larger quantity of data.

          Show
          Benedict added a comment - One thing worth mentioning is that the size of the dataset over which this is effective is not necessarily represented accurately by the test, as it was run over a fully-compacted dataset, so the 10M keys would have been randomly distributed across all pages (we select from a prefix of the key range, but the murmur hash will get evenly distributed across the entire dataset once fully compacted). Were this run on a real dataset, with the most recent data being compacted separately to the older data, and the most recent data being hit primarily, there would be greater locality of data access and so any gains should be effective over a larger quantity of data.
          Hide
          Jason Brown added a comment -

          +1 on the current 4718-sep branch

          FTR, Benedict and I have worked closely on the code for the last several weeks, and I've provided direct feedback to him about problems/concerns.

          Show
          Jason Brown added a comment - +1 on the current 4718-sep branch FTR, Benedict and I have worked closely on the code for the last several weeks, and I've provided direct feedback to him about problems/concerns.
          Hide
          Benedict added a comment - - edited

          Thanks! I just need to tidy up the uber minor outstanding bug you spotted (ClassCastException on the logging thread hitting the MBean), and I'm adding a long test as well so it has some isolated testing for future work. Should be ready to commit this evening.

          Show
          Benedict added a comment - - edited Thanks! I just need to tidy up the uber minor outstanding bug you spotted (ClassCastException on the logging thread hitting the MBean), and I'm adding a long test as well so it has some isolated testing for future work. Should be ready to commit this evening.
          Hide
          Benedict added a comment -

          I've uploaded an updated branch. I tweaked a couple of minor things (I let the last thread spin down, which couldn't happen before), and re-ran my battery of tests to confirm the performance is still good. I've introduced a new long test and fixed the ClassCastException on the logger thread. Should be good to go.

          Show
          Benedict added a comment - I've uploaded an updated branch. I tweaked a couple of minor things (I let the last thread spin down, which couldn't happen before), and re-ran my battery of tests to confirm the performance is still good. I've introduced a new long test and fixed the ClassCastException on the logger thread. Should be good to go.
          Hide
          Jason Brown added a comment -

          Spoke with Benedict about his latest change (to spin down the last thread), we realized he had a bug, and he'll take another couple of days to fix/find better solution.

          Show
          Jason Brown added a comment - Spoke with Benedict about his latest change (to spin down the last thread), we realized he had a bug, and he'll take another couple of days to fix/find better solution.
          Hide
          Benedict added a comment -

          I've uploaded an update to the branch which should permit spinning down the last thread much more simply (and correctly) than the previous patch did. I've also retested it to confirm the performance characteristics remain intact.

          Show
          Benedict added a comment - I've uploaded an update to the branch which should permit spinning down the last thread much more simply (and correctly) than the previous patch did. I've also retested it to confirm the performance characteristics remain intact.
          Hide
          Jason Brown added a comment -

          nit: please clean up the documentation grammar a little bit above the SEPWorker.prevStopCheck declaration. Other than that, +1.

          Show
          Jason Brown added a comment - nit: please clean up the documentation grammar a little bit above the SEPWorker.prevStopCheck declaration. Other than that, +1.
          Hide
          Benedict added a comment -

          Committed with that doc reworded

          Show
          Benedict added a comment - Committed with that doc reworded
          Benedict made changes -
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Fix Version/s 2.1 rc1 [ 12326658 ]
          Fix Version/s 2.1.0 [ 12324159 ]
          Resolution Fixed [ 1 ]
          Benedict made changes -
          Link This issue supercedes CASSANDRA-6995 [ CASSANDRA-6995 ]
          Jonathan Ellis made changes -
          Link This issue is duplicated by CASSANDRA-6995 [ CASSANDRA-6995 ]
          Benedict made changes -
          Link This issue supercedes CASSANDRA-7045 [ CASSANDRA-7045 ]
          Hide
          Jonathan Ellis added a comment -

          To summarize for those browsing, the primary result here was the introduction of SharedExecutorPool: https://github.com/apache/cassandra/blob/cassandra-2.1.0/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java

          Show
          Jonathan Ellis added a comment - To summarize for those browsing, the primary result here was the introduction of SharedExecutorPool: https://github.com/apache/cassandra/blob/cassandra-2.1.0/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
          Transition Time In Source Status Execution Times Last Executer Last Execution Date
          Open Open Patch Available Patch Available
          565d 23h 56m 1 Jason Brown 14/Apr/14 19:08
          Patch Available Patch Available Resolved Resolved
          45d 3h 21m 1 Benedict 29/May/14 22:29

            People

            • Assignee:
              Benedict
              Reporter:
              Jonathan Ellis
              Reviewer:
              Jason Brown
            • Votes:
              2 Vote for this issue
              Watchers:
              32 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development