Cassandra
  1. Cassandra
  2. CASSANDRA-4718

More-efficient ExecutorService for improved throughput

    Details

    • Type: Improvement Improvement
    • Status: Patch Available
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Fix Version/s: 2.1.0
    • Component/s: None
    • Labels:

      Description

      Currently all our execution stages dequeue tasks one at a time. This can result in contention between producers and consumers (although we do our best to minimize this by using LinkedBlockingQueue).

      One approach to mitigating this would be to make consumer threads do more work in "bulk" instead of just one task per dequeue. (Producer threads tend to be single-task oriented by nature, so I don't see an equivalent opportunity there.)

      BlockingQueue has a drainTo(collection, int) method that would be perfect for this. However, no ExecutorService in the jdk supports using drainTo, nor could I google one.

      What I would like to do here is create just such a beast and wire it into (at least) the write and read stages. (Other possible candidates for such an optimization, such as the CommitLog and OutboundTCPConnection, are not ExecutorService-based and will need to be one-offs.)

      AbstractExecutorService may be useful. The implementations of ICommitLogExecutorService may also be useful. (Despite the name these are not actual ExecutorServices, although they share the most important properties of one.)

      1. v1-stress.out
        11 kB
        Jason Brown
      2. stress op rate with various queues.ods
        37 kB
        Benedict
      3. PerThreadQueue.java
        3 kB
        Vijay
      4. op costs of various queues.ods
        28 kB
        Benedict
      5. baq vs trunk.png
        44 kB
        Ryan McGuire
      6. 4718-v1.patch
        23 kB
        Jason Brown

        Issue Links

          Activity

          Hide
          Jonathan Ellis added a comment -

          (See CASSANDRA-1632 for the genesis of this idea.)

          Show
          Jonathan Ellis added a comment - (See CASSANDRA-1632 for the genesis of this idea.)
          Hide
          Michaël Figuière added a comment -

          I'd suggest to make the maxElements to "drain" somewhat proportional to the size of the queue, using maybe some thresholds. This would mostly be about taking only one element per thread from the queue when it's almost empty to avoid serializing tasks processing, and draining several when it's more full. The expected behavior would be to keep latency low at light/nominal load and increasing throughput at heavy load.

          Show
          Michaël Figuière added a comment - I'd suggest to make the maxElements to "drain" somewhat proportional to the size of the queue, using maybe some thresholds. This would mostly be about taking only one element per thread from the queue when it's almost empty to avoid serializing tasks processing, and draining several when it's more full. The expected behavior would be to keep latency low at light/nominal load and increasing throughput at heavy load.
          Hide
          Jonathan Ellis added a comment -

          That's reasonable, but it's worth noting that Queue.size() is not always constant time (notably in JDK7 LinkedTransferQueue, which we will want to evaluate vs LBQ).

          Show
          Jonathan Ellis added a comment - That's reasonable, but it's worth noting that Queue.size() is not always constant time (notably in JDK7 LinkedTransferQueue, which we will want to evaluate vs LBQ).
          Hide
          Michaël Figuière added a comment -

          Right, an Atomic counter should then be more appropriate to track the current amount of tasks in queue. It won't be updated atomically with the actual size of the queue but no big deal for this purpose...

          Show
          Michaël Figuière added a comment - Right, an Atomic counter should then be more appropriate to track the current amount of tasks in queue. It won't be updated atomically with the actual size of the queue but no big deal for this purpose...
          Hide
          Jonathan Ellis added a comment -

          I posted a quick translation of LBQ to LTQ at https://github.com/jbellis/cassandra/branches/ltq.

          Show
          Jonathan Ellis added a comment - I posted a quick translation of LBQ to LTQ at https://github.com/jbellis/cassandra/branches/ltq .
          Hide
          Brandon Williams added a comment - - edited

          Performance tests conclude that LTQ is significantly (15%+) slower for both reads and writes

          Show
          Brandon Williams added a comment - - edited Performance tests conclude that LTQ is significantly (15%+) slower for both reads and writes
          Hide
          Nate McCall added a comment -
          Show
          Nate McCall added a comment - Sorta related - Guava's Futures class has some interesting grouping functionality: http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Futures.html#allAsList(java.lang.Iterable )
          Hide
          Vijay added a comment - - edited

          How about Per thread queue? was curious about this ticket and ran a test.
          I have to also note that the stages content when we have a lot of CPU's (was testing with 64 core machines)

          Show
          Vijay added a comment - - edited How about Per thread queue? was curious about this ticket and ran a test. I have to also note that the stages content when we have a lot of CPU's (was testing with 64 core machines)
          Hide
          Vijay added a comment -

          Sorry take I take it back.... it is not good @ smaller numbers.... Had a bug in my previous run.

          Making JIT happy
          TimeTaken: 230
          Real test starts!
          running per thread exec Test!
          TimeTaken: 165
          running Plain old exec Test!
          TimeTaken: 54
          Test with multiple of 10!
          TimeTaken: 1519
          running Plain old exec Test!
          TimeTaken: 690
          Test with multiple of 100!
          TimeTaken: 15357
          running Plain old exec Test!
          TimeTaken: 30131
          
          Show
          Vijay added a comment - Sorry take I take it back.... it is not good @ smaller numbers.... Had a bug in my previous run. Making JIT happy TimeTaken: 230 Real test starts! running per thread exec Test! TimeTaken: 165 running Plain old exec Test! TimeTaken: 54 Test with multiple of 10! TimeTaken: 1519 running Plain old exec Test! TimeTaken: 690 Test with multiple of 100! TimeTaken: 15357 running Plain old exec Test! TimeTaken: 30131
          Hide
          Jonathan Ellis added a comment - - edited

          Another quick translation, this time to Jetty's BlockingArrayQueue: https://github.com/jbellis/cassandra/tree/baq

          (Edit: fixed branch url)

          Show
          Jonathan Ellis added a comment - - edited Another quick translation, this time to Jetty's BlockingArrayQueue: https://github.com/jbellis/cassandra/tree/baq (Edit: fixed branch url)
          Hide
          Ryan McGuire added a comment - - edited

          I ran a performance test comparing jonathan's baq branch with trunk and saw a substantial increase in read performance. See http://goo.gl/ZeUS6 or see attached png.

          Show
          Ryan McGuire added a comment - - edited I ran a performance test comparing jonathan's baq branch with trunk and saw a substantial increase in read performance. See http://goo.gl/ZeUS6 or see attached png.
          Hide
          T Jake Luciani added a comment -

          Stu Hood mentions http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html
          which is only in java7 but that's required for 2.0 no?

          Show
          T Jake Luciani added a comment - Stu Hood mentions http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html which is only in java7 but that's required for 2.0 no?
          Hide
          Nitsan Wakart added a comment -

          OK, off the bat without knowing anything about nothing (following on from twitter chat with Jake Luciani ‏@tjake):
          I understand your process to be a pipeline of work being performed in stages such that tasks have to complete one stage to go to the next. You need some fast queue mechanism to deliver tasks between stages with minimum contention. Two approaches come to mind:
          1. To build on existing frameworks you will probably want to look at the Disruptor (https://github.com/LMAX-Exchange/disruptor) and thread affinity (https://github.com/peter-lawrey/Java-Thread-Affinity) this is going to work well for rigid processes with an abundance of CPUs to play with, but with planning can also work well for more restrictive environments.
          2. Go wild and build something new!!! the typical work pool model will have all the workers contending for work, this will create contention over the head of the queue. We can try for a new algorithm where we have some way of removing this contention for example by having an work arbitration thread deliver work to the least busy worker or some other crazy scheme. I plan to give it a go and see what happens
          Option 1 is sure to deliver improved performance over your JDK variety of collections. Thread affinity will help deliver more predictable results but will require some layout algorithm for the available resources(i.e. make a different plan for a dual socket 8 core than a 4 socket 4 core machine). Affinity is likely to prove important in any case, but I am not aware of any public framework making use of it.
          An overlooked aspect of the Disruptor is the fact that it is a queue and an object pool merged into one. To make the most of it you will need to have your pipelined events form some super structure which will cycle through the process and then get reused.
          In any case, here's an initial brain dump, happy to get involved in the solution if this sounds interesting.

          Show
          Nitsan Wakart added a comment - OK, off the bat without knowing anything about nothing (following on from twitter chat with Jake Luciani ‏@tjake): I understand your process to be a pipeline of work being performed in stages such that tasks have to complete one stage to go to the next. You need some fast queue mechanism to deliver tasks between stages with minimum contention. Two approaches come to mind: 1. To build on existing frameworks you will probably want to look at the Disruptor ( https://github.com/LMAX-Exchange/disruptor ) and thread affinity ( https://github.com/peter-lawrey/Java-Thread-Affinity ) this is going to work well for rigid processes with an abundance of CPUs to play with, but with planning can also work well for more restrictive environments. 2. Go wild and build something new!!! the typical work pool model will have all the workers contending for work, this will create contention over the head of the queue. We can try for a new algorithm where we have some way of removing this contention for example by having an work arbitration thread deliver work to the least busy worker or some other crazy scheme. I plan to give it a go and see what happens Option 1 is sure to deliver improved performance over your JDK variety of collections. Thread affinity will help deliver more predictable results but will require some layout algorithm for the available resources(i.e. make a different plan for a dual socket 8 core than a 4 socket 4 core machine). Affinity is likely to prove important in any case, but I am not aware of any public framework making use of it. An overlooked aspect of the Disruptor is the fact that it is a queue and an object pool merged into one. To make the most of it you will need to have your pipelined events form some super structure which will cycle through the process and then get reused. In any case, here's an initial brain dump, happy to get involved in the solution if this sounds interesting.
          Hide
          Jonathan Ellis added a comment -

          Let me give a little more color as to what our existing stages are. Most of these are ThreadPoolExecutors connected by LinkedBlockingQueue.

          A client sends each request to a node in the cluster called the Coordinator. The coordinator stages are

          1. Request: either Thrift or Netty reads the request from the client
          2. StorageProxy: the coordinator validates the request and decides which replicas need to be contacted
          3. MessagingService (out): the coordinator sends the requests to the appropriate replicas
          4. MessagingService (in): the coordinator reads the reply
          5. Response: the coordinator processes callbacks for the reply
          6. StorageProxy: this thread will have been waiting on a Future or a Condition for the callbacks, and can now reply to the client

          When a replica receives a message, it also goes through a few stages:

          1. MessagingService (in): the replica reads the coordinator's request
          2. Read or Write: fetch or append the data specified by the request
          3. MessagingService (out): the replica sends the result to the coordinator

          So the obstacles I see to incorporating Disruptor are

          • MessagingService. This is an exception to the rule in that it is not actually a ThreadPoolExecutor; we have a custom thread pool per replica that does some gymnastics to keep its queue from growing indefinitely when a replica gets behind (CASSANDRA-3005). MS uses blocking sockets; long ago, we observed this to give better performance than NIO. I'd be willing to evaluate redoing this on e.g. Netty, but:
          • More generally, requests are not constant-size, which makes disruptor Entry re-use difficult
          • The read stage is basically forced to be a separate thread pool because of blocking i/o from disk
          • StorageProxy is not yet asynchronous

          Addressing the last of these is straightforward, but the others give me pause.

          What I'd like to do is pick part of the system and see if converting that to Disruptor gives a big enough win to be worth pursuing with a full-scale conversion, but given how Disruptor wants to manage everything I'm not sure how to do that either!

          Show
          Jonathan Ellis added a comment - Let me give a little more color as to what our existing stages are. Most of these are ThreadPoolExecutors connected by LinkedBlockingQueue. A client sends each request to a node in the cluster called the Coordinator. The coordinator stages are Request: either Thrift or Netty reads the request from the client StorageProxy: the coordinator validates the request and decides which replicas need to be contacted MessagingService (out): the coordinator sends the requests to the appropriate replicas MessagingService (in): the coordinator reads the reply Response: the coordinator processes callbacks for the reply StorageProxy: this thread will have been waiting on a Future or a Condition for the callbacks, and can now reply to the client When a replica receives a message, it also goes through a few stages: MessagingService (in): the replica reads the coordinator's request Read or Write: fetch or append the data specified by the request MessagingService (out): the replica sends the result to the coordinator So the obstacles I see to incorporating Disruptor are MessagingService. This is an exception to the rule in that it is not actually a ThreadPoolExecutor; we have a custom thread pool per replica that does some gymnastics to keep its queue from growing indefinitely when a replica gets behind ( CASSANDRA-3005 ). MS uses blocking sockets; long ago, we observed this to give better performance than NIO. I'd be willing to evaluate redoing this on e.g. Netty, but: More generally, requests are not constant-size, which makes disruptor Entry re-use difficult The read stage is basically forced to be a separate thread pool because of blocking i/o from disk StorageProxy is not yet asynchronous Addressing the last of these is straightforward, but the others give me pause. What I'd like to do is pick part of the system and see if converting that to Disruptor gives a big enough win to be worth pursuing with a full-scale conversion, but given how Disruptor wants to manage everything I'm not sure how to do that either!
          Hide
          Darach Ennis added a comment -

          +1 to Nitsan Wakart's observations w.r.t. disruptor and thread affinity, although YMMV on Mac OS X w.r.t. affinity.
          I would take Java-Thread-Affinity with a pinch of salt on OS X w.r.t claims of something equivalent to true affinity. Sounds like disruptor is a good candidate here though given Jonathan Ellis's added color.

          An aside, LMAX have recently open sourced a coalescing ring buffer that uses similar underlying techniques to
          the disruptor. It's designed for event streams that tend to benefit from coalescing or combining in some way. Perhaps this may suit some use cases. If not it may still serve as inspiration / color.

          https://github.com/LMAX-Exchange/LMAXCollections/tree/master/CoalescingRingBuffer/src/com/lmax/collections/coalescing/ring/buffer

          Show
          Darach Ennis added a comment - +1 to Nitsan Wakart 's observations w.r.t. disruptor and thread affinity, although YMMV on Mac OS X w.r.t. affinity. I would take Java-Thread-Affinity with a pinch of salt on OS X w.r.t claims of something equivalent to true affinity. Sounds like disruptor is a good candidate here though given Jonathan Ellis 's added color. An aside, LMAX have recently open sourced a coalescing ring buffer that uses similar underlying techniques to the disruptor. It's designed for event streams that tend to benefit from coalescing or combining in some way. Perhaps this may suit some use cases. If not it may still serve as inspiration / color. https://github.com/LMAX-Exchange/LMAXCollections/tree/master/CoalescingRingBuffer/src/com/lmax/collections/coalescing/ring/buffer
          Hide
          Jonathan Ellis added a comment -

          LMAX have recently open sourced a coalescing ring buffer that uses similar underlying techniques to the disruptor

          More for the curious: http://nickzeeb.wordpress.com/2013/03/07/the-coalescing-ring-buffer/

          "It is a component that we have written in Java to efficiently buffer messages between a producer and a consumer thread where only the latest value for a given topic is of interest. All other messages can be discarded immediately."

          Show
          Jonathan Ellis added a comment - LMAX have recently open sourced a coalescing ring buffer that uses similar underlying techniques to the disruptor More for the curious: http://nickzeeb.wordpress.com/2013/03/07/the-coalescing-ring-buffer/ "It is a component that we have written in Java to efficiently buffer messages between a producer and a consumer thread where only the latest value for a given topic is of interest. All other messages can be discarded immediately."
          Hide
          Nitsan Wakart added a comment -

          I'd suggest this is a less mature offering than the Disruptor at this point. Still, worth a read.

          Show
          Nitsan Wakart added a comment - I'd suggest this is a less mature offering than the Disruptor at this point. Still, worth a read.
          Hide
          Piotr Kołaczkowski added a comment -

          Another thing to consider might be using a high-performance Actor library e.g. Akka.

          I did a quick microbenchmark to see what is the latency of just passing a single message through several stages, in 3 variants:

          1. Sync: one threadpool per stage, where some coordinator thread just moves message from one ExecutorService to another, after the stage finished processing
          2. Async: one threadpool per stage, where every stage directly asynchronously pushes its result into the next stage
          3. Akka: one Akka actor per stage, where every stage directly asynchronously pushes its result into the next stage

          The clear winner is Akka:

          2 stages: 
          Sync:     38717 ns
          Async:    36159 ns
          Akka:     12969 ns
          
          4 stages: 
          Sync:     65793 ns
          Async:    49964 ns
          Akka:     18516 ns
          
          8 stages: 
          Sync:    162256 ns
          Async:   100009 ns
          Akka:      9237 ns
          
          16 stages: 
          Sync:    296951 ns
          Async:   183588 ns
          Akka:     13574 ns
          
          32 stages: 
          Sync:    572605 ns
          Async:   361959 ns
          Akka:     23344 ns
          

          Code of the benchmark:

          package pl.pk.messaging
          
          import java.util.concurrent.{CountDownLatch, Executors}
          import akka.actor.{Props, ActorSystem, Actor, ActorRef}
          
          
          class Message {
            var counter = 0
            val latch = new CountDownLatch(1)
          }
          
          abstract class MultistageThreadPoolProcessor(stageCount: Int) {
          
            val stages =
              for (i <- 1 to stageCount) yield Executors.newCachedThreadPool()
          
            def shutdown() {
              stages.foreach(_.shutdown())
            }
          
          }
          
          /** Synchronously processes a message through the stages.
            * The message is passed stage-to-stage by the coordinator thread. */
          class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) {
          
            def process() {
          
              val message = new Message
          
              val task = new Runnable() {
                def run() { message.counter += 1 }
              }
          
              for (executor <- stages)
                executor.submit(task).get()
            }
          }
          
          /** Asynchronously processes a message through the stages.
            * Every stage after finishing its processing of the message
            * passes the message directly to the next stage, without bothering the coordinator thread. */
          class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) {
          
            def process() {
          
              val message = new Message
          
              val task = new Runnable() {
                def run() {
                  message.counter += 1
                  if (message.counter >= stages.size)
                    message.latch.countDown()
                  else
                    stages(message.counter).submit(this)
                }
              }
          
              stages(0).submit(task)
              message.latch.await()
            }
          }
          
          /** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools and queues.
            * Every stage after finishing its processing of the message
            * passes the message directly to the next stage, without bothering the coordinator thread. */
          class AkkaProcessor(stageCount: Int) {
          
            val system = ActorSystem()
          
            val stages: IndexedSeq[ActorRef] = {
              for (i <- 1 to stageCount) yield system.actorOf(Props(new Actor {
                def receive = {
                  case m: Message =>
                    m.counter += 1
                    if (m.counter >= stages.size)
                      m.latch.countDown()
                    else
                      stages(m.counter) ! m
                }
              }))
            }
          
            def process() {
              val message = new Message
              stages(0) ! message
              message.latch.await()
            }
          
            def shutdown() {
              system.shutdown()
            }
          
          }
          
          
          
          object MessagingBenchmark extends App {
          
            def measureLatency(count: Int, f: () => Any): Double = {
              val start = System.nanoTime()
              for (i <- 1 to count)
                f()
              val end = System.nanoTime()
              (end - start).toDouble / count
            }
          
            val messageCount = 200000
            for (stageCount <- List(2,4,8,16,32))
            {
              printf("\n%d stages: \n", stageCount)
              val syncProcessor = new SyncThreadPoolProcessor(stageCount)
              val asyncProcessor = new AsyncThreadPoolProcessor(stageCount)
              val akkaProcessor = new AkkaProcessor(stageCount)
          
              printf("Sync:  %8.0f ns\n", measureLatency(messageCount, syncProcessor.process))
              printf("Async: %8.0f ns\n", measureLatency(messageCount, asyncProcessor.process))
              printf("Akka:  %8.0f ns\n", measureLatency(messageCount, akkaProcessor.process))
          
              syncProcessor.shutdown()
              asyncProcessor.shutdown()
              akkaProcessor.shutdown()
            }
          }
          
          Show
          Piotr Kołaczkowski added a comment - Another thing to consider might be using a high-performance Actor library e.g. Akka. I did a quick microbenchmark to see what is the latency of just passing a single message through several stages, in 3 variants: 1. Sync: one threadpool per stage, where some coordinator thread just moves message from one ExecutorService to another, after the stage finished processing 2. Async: one threadpool per stage, where every stage directly asynchronously pushes its result into the next stage 3. Akka: one Akka actor per stage, where every stage directly asynchronously pushes its result into the next stage The clear winner is Akka: 2 stages: Sync: 38717 ns Async: 36159 ns Akka: 12969 ns 4 stages: Sync: 65793 ns Async: 49964 ns Akka: 18516 ns 8 stages: Sync: 162256 ns Async: 100009 ns Akka: 9237 ns 16 stages: Sync: 296951 ns Async: 183588 ns Akka: 13574 ns 32 stages: Sync: 572605 ns Async: 361959 ns Akka: 23344 ns Code of the benchmark: package pl.pk.messaging import java.util.concurrent.{CountDownLatch, Executors} import akka.actor.{Props, ActorSystem, Actor, ActorRef} class Message { var counter = 0 val latch = new CountDownLatch(1) } abstract class MultistageThreadPoolProcessor(stageCount: Int) { val stages = for (i <- 1 to stageCount) yield Executors.newCachedThreadPool() def shutdown() { stages.foreach(_.shutdown()) } } /** Synchronously processes a message through the stages. * The message is passed stage-to-stage by the coordinator thread. */ class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) { def process() { val message = new Message val task = new Runnable() { def run() { message.counter += 1 } } for (executor <- stages) executor.submit(task).get() } } /** Asynchronously processes a message through the stages. * Every stage after finishing its processing of the message * passes the message directly to the next stage, without bothering the coordinator thread. */ class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) { def process() { val message = new Message val task = new Runnable() { def run() { message.counter += 1 if (message.counter >= stages.size) message.latch.countDown() else stages(message.counter).submit(this) } } stages(0).submit(task) message.latch.await() } } /** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools and queues. * Every stage after finishing its processing of the message * passes the message directly to the next stage, without bothering the coordinator thread. */ class AkkaProcessor(stageCount: Int) { val system = ActorSystem() val stages: IndexedSeq[ActorRef] = { for (i <- 1 to stageCount) yield system.actorOf(Props(new Actor { def receive = { case m: Message => m.counter += 1 if (m.counter >= stages.size) m.latch.countDown() else stages(m.counter) ! m } })) } def process() { val message = new Message stages(0) ! message message.latch.await() } def shutdown() { system.shutdown() } } object MessagingBenchmark extends App { def measureLatency(count: Int, f: () => Any): Double = { val start = System.nanoTime() for (i <- 1 to count) f() val end = System.nanoTime() (end - start).toDouble / count } val messageCount = 200000 for (stageCount <- List(2,4,8,16,32)) { printf("\n%d stages: \n", stageCount) val syncProcessor = new SyncThreadPoolProcessor(stageCount) val asyncProcessor = new AsyncThreadPoolProcessor(stageCount) val akkaProcessor = new AkkaProcessor(stageCount) printf("Sync: %8.0f ns\n", measureLatency(messageCount, syncProcessor.process)) printf("Async: %8.0f ns\n", measureLatency(messageCount, asyncProcessor.process)) printf("Akka: %8.0f ns\n", measureLatency(messageCount, akkaProcessor.process)) syncProcessor.shutdown() asyncProcessor.shutdown() akkaProcessor.shutdown() } }
          Hide
          Piotr Kołaczkowski added a comment - - edited

          Interesting thing, that after boosting the number of threads that invoke the process() method from 1 to 16, Akka gets slower, while thread-pool per stage approach gets faster.

          16 user threads invoking process(), 4 core i7 with HT (8-virtual cores):

          Huh, Sergio Bossa found a bug in my multithreaded benchmark...

          Show
          Piotr Kołaczkowski added a comment - - edited Interesting thing, that after boosting the number of threads that invoke the process() method from 1 to 16, Akka gets slower, while thread-pool per stage approach gets faster. 16 user threads invoking process(), 4 core i7 with HT (8-virtual cores): Huh, Sergio Bossa found a bug in my multithreaded benchmark...
          Hide
          Piotr Kołaczkowski added a comment - - edited

          I made another version of benchmark, according to Sergio's suggestions. Now it uses the following message processing graph:

                           
             /------ stage 0 processor 0  ----\         /----           ----\                  /---           ---\
             +------ stage 0 processor 1  ----+         +----           ----+                  +---           ---+
          >--+------ stage 0 processor 2  ----+---->----+----  STAGE 1  ----+------>- ... --->-+---  STAGE m  ---+----->
             +------ ...                  ----+         +----           ----+                  +---           ---+
             \------ stage 0 processor n  ----/         \----           ----/                  \---           ---/
          

          128 threads are concurrently trying to get messages through all the stages and measure average latency, including the time required for the message to enter stage 0.
          Thread-pool stages are built from fixed size thread pools with n=8, because there are 8 cores.
          Actor-based stages are build from 128 actors each with a RoundRobinRouter in front of every stage.

          Average latencies:

          3 stages: 
          Sync:    364687 ns
          Async:   210766 ns
          Akka:    201842 ns
          
          4 stages: 
          Sync:    492581 ns
          Async:   221118 ns
          Akka:    239407 ns
          
          5 stages: 
          Sync:    671733 ns
          Async:   245370 ns
          Akka:    283798 ns
          
          6 stages: 
          Sync:    781759 ns
          Async:   262742 ns
          Akka:    309384 ns
          

          So Akka comes slightly slower than async thread pools.

          If someone wants to play with my code, here is the up-to-date version:

          import java.util.concurrent.{CountDownLatch, Executors}
          import akka.actor.{Props, ActorSystem, Actor, ActorRef}
          import akka.routing.{SmallestMailboxRouter, RoundRobinRouter}
          
          
          class Message {
            var counter = 0
            val latch = new CountDownLatch(1)
          }
          
          abstract class MultistageThreadPoolProcessor(stageCount: Int) {
          
            val stages =
              for (i <- 1 to stageCount) yield Executors.newFixedThreadPool(8)
          
            def shutdown() {
              stages.foreach(_.shutdown())
            }
          
          }
          
          /** Synchronously processes a message through the stages.
            * The message is passed stage-to-stage by the coordinator thread. */
          class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) {
          
            def process() {
          
              val message = new Message
          
              val task = new Runnable() {
                def run() { message.counter += 1 }
              }
          
              for (executor <- stages)
                executor.submit(task).get()
            }
          }
          
          /** Asynchronously processes a message through the stages.
            * Every stage after finishing its processing of the message
            * passes the message directly to the next stage, without bothering the coordinator thread. */
          class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) {
          
            def process() {
          
              val message = new Message
          
              val task = new Runnable() {
                def run() {
                  message.counter += 1
                  if (message.counter >= stages.size)
                    message.latch.countDown()
                  else
                    stages(message.counter).submit(this)
                }
              }
          
              stages(0).submit(task)
              message.latch.await()
            }
          }
          
          /** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools and queues.
            * Every stage after finishing its processing of the message
            * passes the message directly to the next stage, without bothering the coordinator thread. */
          class AkkaProcessor(stageCount: Int) {
          
            val system = ActorSystem()
          
            val stages: IndexedSeq[ActorRef] = {
              for (i <- 1 to stageCount) yield
                system.actorOf(Props(createActor()).withRouter(RoundRobinRouter(nrOfInstances = 128)))
            }
          
            def createActor(): Actor = {
              new Actor {
          
                def receive = {
                  case m: Message =>
                    m.counter += 1
                    if (m.counter >= stages.size)
                      m.latch.countDown()
                    else
                      stages(m.counter) ! m
                }
              }
            }
          
            def process() {
              val message = new Message
              stages(0) ! message
              message.latch.await()
            }
          
            def shutdown() {
              system.shutdown()
            }
          
          }
          
          
          
          object MessagingBenchmark extends App {
          
            def measureLatency(count: Int, f: () => Any): Double = {
              val start = System.nanoTime()
              for (i <- 1 to count)
                f()
              val end = System.nanoTime()
              (end - start).toDouble / count
            }
          
            def measureLatency(threadCount: Int, messageCount: Int, f: () => Any): Double = {
          
              class RequestThread extends Thread {
                var latency: Double = 0.0
                override def run() { latency = measureLatency(messageCount, f) }
              }
          
              val threads =
                for (i <- 1 to threadCount) yield new RequestThread()
          
              threads.foreach(_.start())
              threads.foreach(_.join())
          
              threads.map(_.latency).sum / threads.size
            }
          
          
            val messageCount = 50000
            for (stageCount <- List(3,3,4,5,6,7,8,16,32))
            {
              printf("\n%d stages: \n", stageCount)
              val syncProcessor = new SyncThreadPoolProcessor(stageCount)
              val asyncProcessor = new AsyncThreadPoolProcessor(stageCount)
              val akkaProcessor = new AkkaProcessor(stageCount)
          
              printf("Sync:  %8.0f ns\n", measureLatency(128, messageCount, syncProcessor.process))
              printf("Async: %8.0f ns\n", measureLatency(128, messageCount, asyncProcessor.process))
              printf("Akka:  %8.0f ns\n", measureLatency(128, messageCount, akkaProcessor.process))
          
              syncProcessor.shutdown()
              asyncProcessor.shutdown()
              akkaProcessor.shutdown()
            }
          }
          
          
          Show
          Piotr Kołaczkowski added a comment - - edited I made another version of benchmark, according to Sergio's suggestions. Now it uses the following message processing graph: /------ stage 0 processor 0 ----\ /---- ----\ /--- ---\ +------ stage 0 processor 1 ----+ +---- ----+ +--- ---+ >--+------ stage 0 processor 2 ----+---->----+---- STAGE 1 ----+------>- ... --->-+--- STAGE m ---+-----> +------ ... ----+ +---- ----+ +--- ---+ \------ stage 0 processor n ----/ \---- ----/ \--- ---/ 128 threads are concurrently trying to get messages through all the stages and measure average latency, including the time required for the message to enter stage 0. Thread-pool stages are built from fixed size thread pools with n=8, because there are 8 cores. Actor-based stages are build from 128 actors each with a RoundRobinRouter in front of every stage. Average latencies: 3 stages: Sync: 364687 ns Async: 210766 ns Akka: 201842 ns 4 stages: Sync: 492581 ns Async: 221118 ns Akka: 239407 ns 5 stages: Sync: 671733 ns Async: 245370 ns Akka: 283798 ns 6 stages: Sync: 781759 ns Async: 262742 ns Akka: 309384 ns So Akka comes slightly slower than async thread pools. If someone wants to play with my code, here is the up-to-date version: import java.util.concurrent.{CountDownLatch, Executors} import akka.actor.{Props, ActorSystem, Actor, ActorRef} import akka.routing.{SmallestMailboxRouter, RoundRobinRouter} class Message { var counter = 0 val latch = new CountDownLatch(1) } abstract class MultistageThreadPoolProcessor(stageCount: Int) { val stages = for (i <- 1 to stageCount) yield Executors.newFixedThreadPool(8) def shutdown() { stages.foreach(_.shutdown()) } } /** Synchronously processes a message through the stages. * The message is passed stage-to-stage by the coordinator thread. */ class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) { def process() { val message = new Message val task = new Runnable() { def run() { message.counter += 1 } } for (executor <- stages) executor.submit(task).get() } } /** Asynchronously processes a message through the stages. * Every stage after finishing its processing of the message * passes the message directly to the next stage, without bothering the coordinator thread. */ class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) { def process() { val message = new Message val task = new Runnable() { def run() { message.counter += 1 if (message.counter >= stages.size) message.latch.countDown() else stages(message.counter).submit(this) } } stages(0).submit(task) message.latch.await() } } /** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools and queues. * Every stage after finishing its processing of the message * passes the message directly to the next stage, without bothering the coordinator thread. */ class AkkaProcessor(stageCount: Int) { val system = ActorSystem() val stages: IndexedSeq[ActorRef] = { for (i <- 1 to stageCount) yield system.actorOf(Props(createActor()).withRouter(RoundRobinRouter(nrOfInstances = 128))) } def createActor(): Actor = { new Actor { def receive = { case m: Message => m.counter += 1 if (m.counter >= stages.size) m.latch.countDown() else stages(m.counter) ! m } } } def process() { val message = new Message stages(0) ! message message.latch.await() } def shutdown() { system.shutdown() } } object MessagingBenchmark extends App { def measureLatency(count: Int, f: () => Any): Double = { val start = System.nanoTime() for (i <- 1 to count) f() val end = System.nanoTime() (end - start).toDouble / count } def measureLatency(threadCount: Int, messageCount: Int, f: () => Any): Double = { class RequestThread extends Thread { var latency: Double = 0.0 override def run() { latency = measureLatency(messageCount, f) } } val threads = for (i <- 1 to threadCount) yield new RequestThread() threads.foreach(_.start()) threads.foreach(_.join()) threads.map(_.latency).sum / threads.size } val messageCount = 50000 for (stageCount <- List(3,3,4,5,6,7,8,16,32)) { printf("\n%d stages: \n", stageCount) val syncProcessor = new SyncThreadPoolProcessor(stageCount) val asyncProcessor = new AsyncThreadPoolProcessor(stageCount) val akkaProcessor = new AkkaProcessor(stageCount) printf("Sync: %8.0f ns\n", measureLatency(128, messageCount, syncProcessor.process)) printf("Async: %8.0f ns\n", measureLatency(128, messageCount, asyncProcessor.process)) printf("Akka: %8.0f ns\n", measureLatency(128, messageCount, akkaProcessor.process)) syncProcessor.shutdown() asyncProcessor.shutdown() akkaProcessor.shutdown() } }
          Hide
          Marcus Eriksson added a comment -

          ftr, im very much -1 on using scala in cassandra (dont know if you suggest that even)

          i know it is supposed to interface nicely with java code, but it generally becomes a huge hairy part of the code base that noone wants to touch

          Show
          Marcus Eriksson added a comment - ftr, im very much -1 on using scala in cassandra (dont know if you suggest that even) i know it is supposed to interface nicely with java code, but it generally becomes a huge hairy part of the code base that noone wants to touch
          Hide
          Piotr Kołaczkowski added a comment - - edited

          I'm not suggesting using scala in C* nor anywhere. It was just quicker for me to write a throw-away benchmark and Akka was already bundled. I hope it is understandable even to those not knowing Scala. BTW: Akka has a native Java API, too. Although, looking at the numbers, I'm not necessarily convinced it would help for that ticket now.

          Show
          Piotr Kołaczkowski added a comment - - edited I'm not suggesting using scala in C* nor anywhere. It was just quicker for me to write a throw-away benchmark and Akka was already bundled. I hope it is understandable even to those not knowing Scala. BTW: Akka has a native Java API, too. Although, looking at the numbers, I'm not necessarily convinced it would help for that ticket now.
          Hide
          Marcus Eriksson added a comment -

          unless the java api has improved alot the last year or so, the code will be horrible

          Show
          Marcus Eriksson added a comment - unless the java api has improved alot the last year or so, the code will be horrible
          Show
          Jonathan Ellis added a comment - Pavel Yaskevich Thoughts on my comment above? https://issues.apache.org/jira/browse/CASSANDRA-4718?focusedCommentId=13629447&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13629447
          Hide
          Pavel Yaskevich added a comment -

          I think even though we aren't in the most cache friendly behavior with variable size RMs we can still utilize better dispatch behavior with low cost synchronization. We can't do anything about blocking I/O operations requiring separate thread but I think it's time to re-evaluate NIO async sockets vs. having thread per in/out connection.

          Show
          Pavel Yaskevich added a comment - I think even though we aren't in the most cache friendly behavior with variable size RMs we can still utilize better dispatch behavior with low cost synchronization. We can't do anything about blocking I/O operations requiring separate thread but I think it's time to re-evaluate NIO async sockets vs. having thread per in/out connection.
          Hide
          Jonathan Ellis added a comment -

          Sounds like a reasonable place to start.

          Show
          Jonathan Ellis added a comment - Sounds like a reasonable place to start.
          Hide
          darion yaphets added a comment -

          LMAX Disruptor's RingBuffer maybe a good idea for lock free component
          But maybe set a bigger size for hold the structure in ring buffer to avoid cover by new one
          And is meaning to use more memory ...

          Show
          darion yaphets added a comment - LMAX Disruptor's RingBuffer maybe a good idea for lock free component But maybe set a bigger size for hold the structure in ring buffer to avoid cover by new one And is meaning to use more memory ...
          Hide
          Benedict added a comment -

          Disruptors are very difficult to use as a drop in replacement for the executor service, so I tried to knock up some queues that could provide similar performance without ripping apart the whole application. The resulting queues I benchmarked under high load, in isolation, against LinkedBlockingQueue, BlockingArrayQueue and the Disruptor, and plotted the average op costs in the "op costs of various queues" attachment*. As can be seen, these queues and the Disruptor are substantially faster under high load than LinkedBlockingQueue, however it can also be seen that:

          • The average op cost for LinkedBlockingQueue is still very low, in fact only around 300ns at worst
          • BlockingArrayQueue is considerably worse than LinkedBlockingQueue under all conditions

          These suggest both that the overhead attributed to LinkedBlockingQueue for a 1Mop workload (as run above) should be at most a few seconds of the overall cost (probably much less); and that BlockingArrayQueue is unlikely to make any cost incurred by LinkedBlockingQueue substantially better. This made me suspect the previous result might be attributable to random variance, but to be sure I ran a number of ccm -stress tests with the different queues, and plotted the results in "stress op rate with various queues.ods", which show the following:

          1) No meaningful difference between BAQ, LBQ and SlowQueue (though the latter has a clear ~1% slow down)
          2) UltraSlow (~10x slow down, or 2000ns spinning each op) is approximately 5% slower
          3) The faster queue actually slows down the process, by about 9% - more than the queue supposedly much slower than it!

          Anyway, I've been concurrently looking at where I might be able to improve performance independent of this, and have found the following:

          A) Raw performance of local reads is ~6-7x faster than through Stress
          B) Raw performance of local reads run asynchronously is ~4x faster
          C) Raw performance of local reads run asynchronously using the fast queue is ~4.7x faster
          D) Performance of local reads from the Thrift server-side methods is ~3x faster
          E) Performance of remote (i.e. local non-optimised) reads is ~1.5x faster

          In particular (C) is interesting, as it demonstrates the queue really is faster in use, but I've yet to absolutely determine why that translates into an overall decline in throughput. It looks as though it's possible it causes greater congestion in LockSupport.unpark(), but this is a new piece of information, derived from YourKit. As these sorts of methods are difficult to meter accurately I don't necessarily trust it, and haven't had a chance to figure out what I can do with the information. If it is accurate, and I can figure out how to reduce the overhead, we might get a modest speed boost, which will accumulate as we find other places to improve.

          As to the overall problem of improving throughput, it seems to me that there are two big avenues to explore:

          1) the networking (software) overhead is large;
          2) possibly the cost of managing thread liveness (e.g. park/unpark/scheduler costs); though the evidence for this is as yet inconclusive... given the op rate and other evidence it doesn't seem to be synchronization overhead. I'm still trying to pin this down.

          Once the costs here are nailed down as tight as they can go, I'm pretty confident we can get some noticeable improvements to the actual work being done, but since that currently accounts for only a fraction of the time spent (probably less than 20%), I'd rather wait until it was a higher percentage so any improvement is multiplied.

          • These can be replicated by running org.apache.cassandra.concurrent.test.bench.Benchmark on any of the linked branches on github.

          https://github.com/belliottsmith/cassandra/tree/4718-lbq [using LinkedBlockingQueue]
          https://github.com/belliottsmith/cassandra/tree/4718-baq [using BlockingArrayQueue]
          https://github.com/belliottsmith/cassandra/tree/4718-lpbq [using a new high performance queue]
          https://github.com/belliottsmith/cassandra/tree/4718-slow [using a LinkedBlockingQueue with 200ns spinning each op]
          https://github.com/belliottsmith/cassandra/tree/4718-ultraslow [using a LinkedBlockingQueue with 2000ns spinning each op]

          Show
          Benedict added a comment - Disruptors are very difficult to use as a drop in replacement for the executor service, so I tried to knock up some queues that could provide similar performance without ripping apart the whole application. The resulting queues I benchmarked under high load, in isolation, against LinkedBlockingQueue, BlockingArrayQueue and the Disruptor, and plotted the average op costs in the "op costs of various queues" attachment*. As can be seen, these queues and the Disruptor are substantially faster under high load than LinkedBlockingQueue, however it can also be seen that: The average op cost for LinkedBlockingQueue is still very low, in fact only around 300ns at worst BlockingArrayQueue is considerably worse than LinkedBlockingQueue under all conditions These suggest both that the overhead attributed to LinkedBlockingQueue for a 1Mop workload (as run above) should be at most a few seconds of the overall cost (probably much less); and that BlockingArrayQueue is unlikely to make any cost incurred by LinkedBlockingQueue substantially better. This made me suspect the previous result might be attributable to random variance, but to be sure I ran a number of ccm -stress tests with the different queues, and plotted the results in "stress op rate with various queues.ods", which show the following: 1) No meaningful difference between BAQ, LBQ and SlowQueue (though the latter has a clear ~1% slow down) 2) UltraSlow (~10x slow down, or 2000ns spinning each op) is approximately 5% slower 3) The faster queue actually slows down the process, by about 9% - more than the queue supposedly much slower than it! Anyway, I've been concurrently looking at where I might be able to improve performance independent of this, and have found the following: A) Raw performance of local reads is ~6-7x faster than through Stress B) Raw performance of local reads run asynchronously is ~4x faster C) Raw performance of local reads run asynchronously using the fast queue is ~4.7x faster D) Performance of local reads from the Thrift server-side methods is ~3x faster E) Performance of remote (i.e. local non-optimised) reads is ~1.5x faster In particular (C) is interesting, as it demonstrates the queue really is faster in use, but I've yet to absolutely determine why that translates into an overall decline in throughput. It looks as though it's possible it causes greater congestion in LockSupport.unpark(), but this is a new piece of information, derived from YourKit. As these sorts of methods are difficult to meter accurately I don't necessarily trust it, and haven't had a chance to figure out what I can do with the information. If it is accurate, and I can figure out how to reduce the overhead, we might get a modest speed boost, which will accumulate as we find other places to improve. As to the overall problem of improving throughput, it seems to me that there are two big avenues to explore: 1) the networking (software) overhead is large; 2) possibly the cost of managing thread liveness (e.g. park/unpark/scheduler costs); though the evidence for this is as yet inconclusive... given the op rate and other evidence it doesn't seem to be synchronization overhead. I'm still trying to pin this down. Once the costs here are nailed down as tight as they can go, I'm pretty confident we can get some noticeable improvements to the actual work being done, but since that currently accounts for only a fraction of the time spent (probably less than 20%), I'd rather wait until it was a higher percentage so any improvement is multiplied. These can be replicated by running org.apache.cassandra.concurrent.test.bench.Benchmark on any of the linked branches on github. https://github.com/belliottsmith/cassandra/tree/4718-lbq [using LinkedBlockingQueue] https://github.com/belliottsmith/cassandra/tree/4718-baq [using BlockingArrayQueue] https://github.com/belliottsmith/cassandra/tree/4718-lpbq [using a new high performance queue] https://github.com/belliottsmith/cassandra/tree/4718-slow [using a LinkedBlockingQueue with 200ns spinning each op] https://github.com/belliottsmith/cassandra/tree/4718-ultraslow [using a LinkedBlockingQueue with 2000ns spinning each op]
          Hide
          Jonathan Ellis added a comment -

          The faster queue actually slows down the process, by about 9% - more than the queue supposedly much slower than it

          So this actually confirms Ryan's original measurement of C*/BAQ [slow queue] faster than C*/LBQ [fast queue]?

          Show
          Jonathan Ellis added a comment - The faster queue actually slows down the process, by about 9% - more than the queue supposedly much slower than it So this actually confirms Ryan's original measurement of C*/BAQ [slow queue] faster than C*/LBQ [fast queue] ?
          Hide
          Benedict added a comment -

          Not necessarily. I still think that was most likely variance:

          • I have BAQ at same speed as LBQ in application
          • a 2x slow down of LBQ -> 0.01x slow down of application
          • a 10x slow down of LBQ -> 0.05x slow down of application

          => the queue speed is currently only ~1% of application cost. It's possible the faster queue is causing greater contention at a sync point, but this wouldn't work in the opposite direction if the contention at the sync point is low. Either way, if this were true we'd see the artificially slow queues also improve stress performance.

          Ryan also ran some of my tests and found no difference. I wouldn't absolutely rule out the possibility his test was valid, though, as I did not swap out the queues in OutboundTcpConnection for these tests as, at the time, I was concerned about the calls to size() which are expensive for my test queues, and I wanted the queue swap to be on equal terms across the board. I realise now these are only called via JMX, so shouldn't stop me swapping them in.

          I've just tried a quick test of directly (in process) stressing through the MessagingService and found no measureable difference to putting BAQ in the OutboundTcpConnection, though if I swap out across the board it is about 25% slower, which itself is interesting as this is close to a full stress, minus thrift.

          Show
          Benedict added a comment - Not necessarily. I still think that was most likely variance: I have BAQ at same speed as LBQ in application a 2x slow down of LBQ -> 0.01x slow down of application a 10x slow down of LBQ -> 0.05x slow down of application => the queue speed is currently only ~1% of application cost. It's possible the faster queue is causing greater contention at a sync point, but this wouldn't work in the opposite direction if the contention at the sync point is low. Either way, if this were true we'd see the artificially slow queues also improve stress performance. Ryan also ran some of my tests and found no difference. I wouldn't absolutely rule out the possibility his test was valid, though, as I did not swap out the queues in OutboundTcpConnection for these tests as, at the time, I was concerned about the calls to size() which are expensive for my test queues, and I wanted the queue swap to be on equal terms across the board. I realise now these are only called via JMX, so shouldn't stop me swapping them in. I've just tried a quick test of directly (in process) stressing through the MessagingService and found no measureable difference to putting BAQ in the OutboundTcpConnection, though if I swap out across the board it is about 25% slower, which itself is interesting as this is close to a full stress, minus thrift.
          Hide
          Jonathan Ellis added a comment -

          What's the latest on this? I think Jason had some work with FJP he was testing out...

          Show
          Jonathan Ellis added a comment - What's the latest on this? I think Jason had some work with FJP he was testing out...
          Hide
          Jason Brown added a comment -

          Ha, just this week I was untangling my branch for CASSANDRA-1632, which included the FJP work. Should be able to get to this one next week after more performance testing.

          Show
          Jason Brown added a comment - Ha, just this week I was untangling my branch for CASSANDRA-1632 , which included the FJP work. Should be able to get to this one next week after more performance testing.
          Hide
          Jason Brown added a comment -

          OK, looks like my initial stab at switching over to FJP netted about 10-15% throughput increase, and mixed results on the latency scores (sometimes better, sometimes on par with trunk). I'm going run some more perf tests this weekend, and will decide how to proceed early next week - but the initial results do look promising. I've only tested the thrift endpoints so far, but when I retest this weekend, I'll throw in the cql3/native protocol, as well.

          Here's my current working branch: https://github.com/jasobrown/cassandra/tree/4718_fjp . Note, it's very hacked up/WIP as I wanted to confirm the performance benefits before making everything happy (read: metrics pools). Also, I modified Pavel Yaskevich's thrift-disruptor lib, for this: https://github.com/jasobrown/disruptor_thrift_server/tree/4718_fjp.

          Show
          Jason Brown added a comment - OK, looks like my initial stab at switching over to FJP netted about 10-15% throughput increase, and mixed results on the latency scores (sometimes better, sometimes on par with trunk). I'm going run some more perf tests this weekend, and will decide how to proceed early next week - but the initial results do look promising. I've only tested the thrift endpoints so far, but when I retest this weekend, I'll throw in the cql3/native protocol, as well. Here's my current working branch: https://github.com/jasobrown/cassandra/tree/4718_fjp . Note, it's very hacked up/WIP as I wanted to confirm the performance benefits before making everything happy (read: metrics pools). Also, I modified Pavel Yaskevich 's thrift-disruptor lib, for this: https://github.com/jasobrown/disruptor_thrift_server/tree/4718_fjp .
          Hide
          Jason Brown added a comment -

          After several month hiatus, digging into this again. This time around, though, I have real hardware to test on, and thus the results are more consistent across executions (no cloud provider intermittently throttling me).

          Testing the thrift interface (both sync and hsha), the short story is throughput is up ~20% vs. TPE, and 95% / 99%iles are down 40-60%. The 99.9%ile, however, is a bit tricker. In some of my tests it is down almost 80%, and sometimes it is up 40-50%. I need to dig in further to understand what is going on (not sure if it’s because of a shared env, reading across numa cores, and so on). perf and likwid are my friends in this investigation.

          As to testing the native protocol interface, I’ve only tested writes (new 2.1 stress seems broken on reads) and I get double the throughput and 40-50% lower latencies across the board.

          My test cluster consists of three machines, 32 cores each, 2 sockets (2 numa cores), 132G memory, 2.6.39 kernel, plus a similar box that generates the load.

          A couple of notes about this patch:

          • RequestThreadPoolExecutor now decorates a FJP. Previously we had a TPE which contains, of course, a (bounded) queue. The bounded queue helped with back pressure from incoming requests. By using a FJP, there is no queue to help with back pressure as the FJP always enqueue a task (without blocking). Not sure if we still want/need that back pressure here.
          • As ForkJoinPool doesn’t expose much in terms of use metrics (like total completed) compared to ThreadPoolExecutor, the ForkJoinPoolMetrics is similarly barren. Not sure if we want to capture this on our own in DFJP or something like else.
          • I have made similar FJP changes to the disruptor-thrift library, and once this patch is committed, I’ll work with Pavel to make the changes over there and pull in the updated jar.

          As a side note, looks like the quasar project (http://docs.paralleluniverse.co/quasar/) indicates the jsr166e jar has some optimizations (http://blog.paralleluniverse.co/2013/05/02/quasar-pulsar/) over the jdk7 implementation (that are included in jdk8). I pulled in those changes and stress tested, but didn’t see much of a difference for our use case. I can, however, pull them in again if any one feels strongly.

          Show
          Jason Brown added a comment - After several month hiatus, digging into this again. This time around, though, I have real hardware to test on, and thus the results are more consistent across executions (no cloud provider intermittently throttling me). Testing the thrift interface (both sync and hsha), the short story is throughput is up ~20% vs. TPE, and 95% / 99%iles are down 40-60%. The 99.9%ile, however, is a bit tricker. In some of my tests it is down almost 80%, and sometimes it is up 40-50%. I need to dig in further to understand what is going on (not sure if it’s because of a shared env, reading across numa cores, and so on). perf and likwid are my friends in this investigation. As to testing the native protocol interface, I’ve only tested writes (new 2.1 stress seems broken on reads) and I get double the throughput and 40-50% lower latencies across the board. My test cluster consists of three machines, 32 cores each, 2 sockets (2 numa cores), 132G memory, 2.6.39 kernel, plus a similar box that generates the load. A couple of notes about this patch: RequestThreadPoolExecutor now decorates a FJP. Previously we had a TPE which contains, of course, a (bounded) queue. The bounded queue helped with back pressure from incoming requests. By using a FJP, there is no queue to help with back pressure as the FJP always enqueue a task (without blocking). Not sure if we still want/need that back pressure here. As ForkJoinPool doesn’t expose much in terms of use metrics (like total completed) compared to ThreadPoolExecutor, the ForkJoinPoolMetrics is similarly barren. Not sure if we want to capture this on our own in DFJP or something like else. I have made similar FJP changes to the disruptor-thrift library, and once this patch is committed, I’ll work with Pavel to make the changes over there and pull in the updated jar. As a side note, looks like the quasar project ( http://docs.paralleluniverse.co/quasar/ ) indicates the jsr166e jar has some optimizations ( http://blog.paralleluniverse.co/2013/05/02/quasar-pulsar/ ) over the jdk7 implementation (that are included in jdk8). I pulled in those changes and stress tested, but didn’t see much of a difference for our use case. I can, however, pull them in again if any one feels strongly.
          Hide
          Benedict added a comment - - edited

          Nice! I wonder if this is a much bigger impact on multi-cpu machines, as I did not see anything like this dramatic improvement. But this is great. Do you have some stress dumps we can look at? Nevermind, I see that you do

          new 2.1 stress seems broken on reads

          Shouldn't be - what problem are you seeing?

          Show
          Benedict added a comment - - edited Nice! I wonder if this is a much bigger impact on multi-cpu machines, as I did not see anything like this dramatic improvement. But this is great. Do you have some stress dumps we can look at? Nevermind, I see that you do new 2.1 stress seems broken on reads Shouldn't be - what problem are you seeing?
          Hide
          Benedict added a comment -

          Just to check: you were tearing down and trashing the data directories between write runs? Because the last result is a bit weird: double the throughput, but it ran for 4 times as long, indicating there was a high variance in throughput (usually means compaction / heavy flushing is taking effect) - but the same workload under thrift had no such spikes...

          Show
          Benedict added a comment - Just to check: you were tearing down and trashing the data directories between write runs? Because the last result is a bit weird: double the throughput, but it ran for 4 times as long, indicating there was a high variance in throughput (usually means compaction / heavy flushing is taking effect) - but the same workload under thrift had no such spikes...
          Hide
          Jason Brown added a comment -

          As to multi-cpu machines, I spent a lot time thinking about the affects of NUMA systems on CAS operations/algs (esp. wrt to FJP, obviously). As I mentioned, I'm using systems with two sockets (two NUMA cores). As you get more sockets (and thus more numa cores) a thread on one core will be reaching across to more cores to do work stealing, thus adding contention to that memory address. Imagine four threads on for sockets all contending for work on a fifth thread. The memory values for that portion of the queue for that fifth thread is now pulled into all four sockets, thus becoming more of a contention point, as well as impacting latency (due to the CAS operation). However, this could be (and hopefully is) less of a cost than bothering with queues, blocking, posix threads, OS interrupts, and everything else that makes standard thread pool executors work.

          Thinking even crazier to optimize the FJP sharing across numa cores, this is when I start thinking about digging up the thread affinity work again, and binding threads of similar types (probably by Stage) to sockets, not just an individual CPU (I think that was my problem before). But then I wonder how much is to be gained on non-NUMA systems or systems where you can't determine if it's got NUMA or not (hello, cloud!) - and at that point I'm happy to realize the gains we have and move forward.

          what problem are you seeing?

          Will ping you offline - too unexciting for this space

          Show
          Jason Brown added a comment - As to multi-cpu machines, I spent a lot time thinking about the affects of NUMA systems on CAS operations/algs (esp. wrt to FJP, obviously). As I mentioned, I'm using systems with two sockets (two NUMA cores). As you get more sockets (and thus more numa cores) a thread on one core will be reaching across to more cores to do work stealing, thus adding contention to that memory address. Imagine four threads on for sockets all contending for work on a fifth thread. The memory values for that portion of the queue for that fifth thread is now pulled into all four sockets, thus becoming more of a contention point, as well as impacting latency (due to the CAS operation). However, this could be (and hopefully is) less of a cost than bothering with queues, blocking, posix threads, OS interrupts, and everything else that makes standard thread pool executors work. Thinking even crazier to optimize the FJP sharing across numa cores, this is when I start thinking about digging up the thread affinity work again, and binding threads of similar types (probably by Stage) to sockets, not just an individual CPU (I think that was my problem before). But then I wonder how much is to be gained on non-NUMA systems or systems where you can't determine if it's got NUMA or not (hello, cloud!) - and at that point I'm happy to realize the gains we have and move forward. what problem are you seeing? Will ping you offline - too unexciting for this space
          Hide
          Jason Brown added a comment -

          you were tearing down and trashing the data directories between write runs

          Yes, was also clearing the page cache, as well.

          Show
          Jason Brown added a comment - you were tearing down and trashing the data directories between write runs Yes, was also clearing the page cache, as well.
          Hide
          Jason Brown added a comment -

          but it ran for 4 times as long, indicating there was a high variance in throughput

          Huh, yeah, you are right, it did run longer. Admittedly my eyes have been ignoring that column (shame on me). Let me run the native protocol test again (and try to figure out the read situation, as well).

          Show
          Jason Brown added a comment - but it ran for 4 times as long, indicating there was a high variance in throughput Huh, yeah, you are right, it did run longer. Admittedly my eyes have been ignoring that column (shame on me). Let me run the native protocol test again (and try to figure out the read situation, as well).
          Hide
          Benedict added a comment -

          I think it is unlikely that the CAS has any meaningful impact. The QPI is very quick. I think this kind of speedup is more likely down to reduced signalling costs (unpark costs ~ 10micros, and work stealing means you have to go to the main queue far less frequently); possibly also the signalling of threads has been directly optimised in FJP. I knocked up a "low-switch" executor but found fairly little benefit on my box, as I can saturate the CPUs very easily (at which point the unpark cost is never incurred). On a many-CPU box, saturating all the cores is difficult, and so it is much more likely you'll be introducing bottlenecks on producers adding to the queue.

          Show
          Benedict added a comment - I think it is unlikely that the CAS has any meaningful impact. The QPI is very quick. I think this kind of speedup is more likely down to reduced signalling costs (unpark costs ~ 10micros, and work stealing means you have to go to the main queue far less frequently); possibly also the signalling of threads has been directly optimised in FJP. I knocked up a "low-switch" executor but found fairly little benefit on my box, as I can saturate the CPUs very easily (at which point the unpark cost is never incurred). On a many-CPU box, saturating all the cores is difficult, and so it is much more likely you'll be introducing bottlenecks on producers adding to the queue.
          Hide
          Benedict added a comment -

          Note that given the way C* works, the work stealing crossing CPU boundaries is really not important - the data is completely randomly hodgepodgedly allocated across the CPUs. Nothing we can do in FJP can fix that

          Show
          Benedict added a comment - Note that given the way C* works, the work stealing crossing CPU boundaries is really not important - the data is completely randomly hodgepodgedly allocated across the CPUs. Nothing we can do in FJP can fix that
          Hide
          Jason Brown added a comment -

          Yeah, I think the extra cost across the QPI bus and such is easily masked by any disk I/O the actual op may have to

          the data is completely randomly hodgepodgedly allocated across the CPUs

          Correct, given the current structure of the app. I can image something more CPU cache friendly, but it's huge change and I suspect that I/O still dwarfs those latency reductions. Nice hack day project ....

          Show
          Jason Brown added a comment - Yeah, I think the extra cost across the QPI bus and such is easily masked by any disk I/O the actual op may have to the data is completely randomly hodgepodgedly allocated across the CPUs Correct, given the current structure of the app. I can image something more CPU cache friendly, but it's huge change and I suspect that I/O still dwarfs those latency reductions. Nice hack day project ....
          Hide
          Benedict added a comment -

          Maybe you're one of the few people I haven't told of my dream of internally hashing C* so that each sub-hash is run by its own core. Guaranteed NUMA behaviour, and we can stop performing any CASes and make all kinds of single threaded optimisations in various places (e.g. no need to do CoW updates to memtables, so massive garbage reduction).

          Bit of a pipedream for the moment though

          Show
          Benedict added a comment - Maybe you're one of the few people I haven't told of my dream of internally hashing C* so that each sub-hash is run by its own core. Guaranteed NUMA behaviour, and we can stop performing any CASes and make all kinds of single threaded optimisations in various places (e.g. no need to do CoW updates to memtables, so massive garbage reduction). Bit of a pipedream for the moment though
          Hide
          Benedict added a comment -

          Jason Brown: Could you upload the full stress outputs for these runs? And also try running a separate stress run with a fixed high threadcount and op count?

          In particular for CQL, the results in the file are a little bit weird. That said, given their consistency for thrift I don't doubt the result is meaningful, but it would be good to understand what we're incorporating a bit better before committing.

          Show
          Benedict added a comment - Jason Brown : Could you upload the full stress outputs for these runs? And also try running a separate stress run with a fixed high threadcount and op count? In particular for CQL, the results in the file are a little bit weird. That said, given their consistency for thrift I don't doubt the result is meaningful, but it would be good to understand what we're incorporating a bit better before committing.
          Hide
          Jason Brown added a comment -

          OK, will give it a shot today. Also, just noticed I did not tune native_transport_max_threads at all (so I have the default of 128). Might play with that a bit, as well.

          Show
          Jason Brown added a comment - OK, will give it a shot today. Also, just noticed I did not tune native_transport_max_threads at all (so I have the default of 128). Might play with that a bit, as well.
          Hide
          Pavel Yaskevich added a comment -

          I still want to review this, why are you re-assigning Benedict ?

          Show
          Pavel Yaskevich added a comment - I still want to review this, why are you re-assigning Benedict ?
          Hide
          Aleksey Yeschenko added a comment -

          We were just doing some house cleaning today on IRC

          [16:49:06] jbellis: belliottsmith: is 4718 really patch available?
          [16:49:14] jbellis: #4718
          [16:49:14] CassBotJr: https://issues.apache.org/jira/browse/CASSANDRA-4718 (Unresolved; 2.1): "More-efficient ExecutorService for improved throughput"
          [16:50:15] belliottsmith: so jasobrown says - he's marked pavel as reviewer so i've kept out of it beyond asking for a little more stress info
          [16:51:28] belliottsmith: honestly though, from last time i looked at it (a while back), it was a pretty simple change

          Show
          Aleksey Yeschenko added a comment - We were just doing some house cleaning today on IRC [16:49:06] jbellis: belliottsmith: is 4718 really patch available? [16:49:14] jbellis: #4718 [16:49:14] CassBotJr: https://issues.apache.org/jira/browse/CASSANDRA-4718 (Unresolved; 2.1): "More-efficient ExecutorService for improved throughput" [16:50:15] belliottsmith: so jasobrown says - he's marked pavel as reviewer so i've kept out of it beyond asking for a little more stress info [16:51:28] belliottsmith: honestly though, from last time i looked at it (a while back), it was a pretty simple change
          Hide
          Jason Brown added a comment -

          Guys, guys, there's plenty of my patch to criticize - you'll each have your fun, I'm sure

          Show
          Jason Brown added a comment - Guys, guys, there's plenty of my patch to criticize - you'll each have your fun, I'm sure

            People

            • Assignee:
              Jason Brown
              Reporter:
              Jonathan Ellis
              Reviewer:
              Benedict
            • Votes:
              2 Vote for this issue
              Watchers:
              26 Start watching this issue

              Dates

              • Created:
                Updated:

                Development