Uploaded image for project: 'Cassandra'
  1. Cassandra
  2. CASSANDRA-1434

ColumnFamilyOutputFormat performs blocking writes for large batches

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Fix Version/s: 0.7 beta 2
    • Component/s: None
    • Labels:
      None

      Description

      By default, ColumnFamilyOutputFormat batches mapreduce.output.columnfamilyoutputformat.batch.threshold or Long.MAX_VALUE mutations, and then performs a blocking write.

      1. 1434-v7.txt
        25 kB
        Jonathan Ellis
      2. 1434-v6.txt
        25 kB
        Jonathan Ellis
      3. 1434-v5.txt
        24 kB
        Jonathan Ellis
      4. 1434-v4.txt
        23 kB
        Jonathan Ellis
      5. 1434-v3.txt
        23 kB
        Jonathan Ellis
      6. 0004-Replace-Executor-with-map-of-threads.patch
        13 kB
        Stu Hood
      7. 0003-Switch-RingCache-back-to-multimap.patch
        5 kB
        Stu Hood
      8. 0002-Improve-concurrency-and-add-basic-retries-by-attempt.patch
        23 kB
        Stu Hood
      9. 0001-Switch-away-from-Multimap-and-fix-regression-introdu.patch
        4 kB
        Stu Hood

        Issue Links

          Activity

          Hide
          mrflip Philip (flip) Kromer added a comment - - edited

          The blocking behavior is causing 'broken pipe' errors (even with relatively small batch sizes) when cassandra latency is high. (This is afaict not network latency but response latency due to a compaction or flush, etc.)

          It also makes the whole cluster resonate: one slow node blocks many writers, which then all unblock at the same time, write bursts of enough size to cause a compaction or GC, etc simultaneously on every node. This means adding more writers doesn't work around the blocking write

          Show
          mrflip Philip (flip) Kromer added a comment - - edited The blocking behavior is causing 'broken pipe' errors (even with relatively small batch sizes) when cassandra latency is high. (This is afaict not network latency but response latency due to a compaction or flush, etc.) It also makes the whole cluster resonate: one slow node blocks many writers, which then all unblock at the same time, write bursts of enough size to cause a compaction or GC, etc simultaneously on every node. This means adding more writers doesn't work around the blocking write
          Hide
          stuhood Stu Hood added a comment -

          0001 and 0003 are minor fixes, but 0002:

          • Avoids blocking processing for writes (but only 2 * batchSize mutations may be in memory at a time, so we may still block)
          • Changes the default batchSize to 2^14
          • Rotates through possible endpoints for a range per flush, which should more evenly distribute client connections when there are small numbers of keys in play

          One issue we haven't tackled yet is how to handle failures: I've reopened CASSANDRA-1264 to handle that.

          Show
          stuhood Stu Hood added a comment - 0001 and 0003 are minor fixes, but 0002: Avoids blocking processing for writes (but only 2 * batchSize mutations may be in memory at a time, so we may still block) Changes the default batchSize to 2^14 Rotates through possible endpoints for a range per flush, which should more evenly distribute client connections when there are small numbers of keys in play One issue we haven't tackled yet is how to handle failures: I've reopened CASSANDRA-1264 to handle that.
          Hide
          stuhood Stu Hood added a comment -

          Doh... this applies atop CASSANDRA-1368.

          Show
          stuhood Stu Hood added a comment - Doh... this applies atop CASSANDRA-1368 .
          Hide
          stuhood Stu Hood added a comment -

          0004 collects all replicas for each range in RingCache, which I broke in 1322 (previously, we were completely rebuilding the tokenmap using a replication strategy, which would have recreated the lost information).

          Show
          stuhood Stu Hood added a comment - 0004 collects all replicas for each range in RingCache, which I broke in 1322 (previously, we were completely rebuilding the tokenmap using a replication strategy, which would have recreated the lost information).
          Hide
          jbellis Jonathan Ellis added a comment -

          why the double-flushing in close()? can you add a comment for that?

          Show
          jbellis Jonathan Ellis added a comment - why the double-flushing in close()? can you add a comment for that?
          Hide
          jbellis Jonathan Ellis added a comment -

          committed 01. declining to apply 03; in general refactoring out a method that is called in a single place obscures control flow rather than clarifies it. 04 looks ok but i'm not sure to what degree it depends on 02 (see above) so leaving alone for now.

          Show
          jbellis Jonathan Ellis added a comment - committed 01. declining to apply 03; in general refactoring out a method that is called in a single place obscures control flow rather than clarifies it. 04 looks ok but i'm not sure to what degree it depends on 02 (see above) so leaving alone for now.
          Hide
          mrflip Philip (flip) Kromer added a comment -

          Right now the code does

          { buffer n mutations, holding each acc. to its endpoint. After n writes, check that all endpoint writes are finished, and dispatch to each endpoint its share of the n mutations }

          This is non-blocking at the socket level but ends up being blocking at the app level, and the wide variance in size has bad effects on gc at the cassandra end.

          I think the ColumnFamilyRecordWriter would see a speedup & improved stability with

          { buffer mutations, holding each acc. to its endpoint. When an endpoint has seen n writes, check that any previous write has finished, and dispatch to this endpoint a full buffer of N mutations }

          .

          Show
          mrflip Philip (flip) Kromer added a comment - Right now the code does { buffer n mutations, holding each acc. to its endpoint. After n writes, check that all endpoint writes are finished, and dispatch to each endpoint its share of the n mutations } This is non-blocking at the socket level but ends up being blocking at the app level, and the wide variance in size has bad effects on gc at the cassandra end. I think the ColumnFamilyRecordWriter would see a speedup & improved stability with { buffer mutations, holding each acc. to its endpoint. When an endpoint has seen n writes, check that any previous write has finished, and dispatch to this endpoint a full buffer of N mutations } .
          Hide
          stuhood Stu Hood added a comment -

          0001 is changes to the RingCache that survived from v1: it fixes the bug in ringcache that was handled by pre-0004, and removes the multimap.

          0002 is a completely revamped ColumnFamilyRecordWriter: nothing from the original patch survived.

          • Launches a client thread per unique range, which is responsible for communicating with endpoint replicas for that range.
            • The client threads receives mutations for the range from the parent thread on a bounded queue.
            • Client threads will attempt to send a full batch of mutations to its replicas in order: this means that each batch gets up to RF retries before failing, but without any failures, connections will always be made to the first replica.
          • The parent thread loops trying to offer to queues for client threads, and checks that they are still alive (and fails if they aren't).
          • For a N node cluster, up to (2 * N * batchSize) mutations will be in memory at once, so the default batchSize was lowered to 4096.

          Fairly well tested against a 12 node cluster: no obvious races or bottlenecks.

          Show
          stuhood Stu Hood added a comment - 0001 is changes to the RingCache that survived from v1: it fixes the bug in ringcache that was handled by pre-0004, and removes the multimap. 0002 is a completely revamped ColumnFamilyRecordWriter: nothing from the original patch survived. Launches a client thread per unique range, which is responsible for communicating with endpoint replicas for that range. The client threads receives mutations for the range from the parent thread on a bounded queue. Client threads will attempt to send a full batch of mutations to its replicas in order: this means that each batch gets up to RF retries before failing, but without any failures, connections will always be made to the first replica. The parent thread loops trying to offer to queues for client threads, and checks that they are still alive (and fails if they aren't). For a N node cluster, up to (2 * N * batchSize) mutations will be in memory at once, so the default batchSize was lowered to 4096. Fairly well tested against a 12 node cluster: no obvious races or bottlenecks.
          Hide
          jbellis Jonathan Ellis added a comment -

          why is switching from Multimap<Range, InetAddress> to Map<Range, List<InetAddress>> an improvement?

          Show
          jbellis Jonathan Ellis added a comment - why is switching from Multimap<Range, InetAddress> to Map<Range, List<InetAddress>> an improvement?
          Hide
          stuhood Stu Hood added a comment -

          Sent via e-mail while I was on vacation:

          I wanted to dodge object creation, but I guess I assumed that Multimap created Set and Collection facades for every call. Also, there didn't appear to be a way to iterate over unique keys without a facade.

          Show
          stuhood Stu Hood added a comment - Sent via e-mail while I was on vacation: I wanted to dodge object creation, but I guess I assumed that Multimap created Set and Collection facades for every call. Also, there didn't appear to be a way to iterate over unique keys without a facade.
          Hide
          jbellis Jonathan Ellis added a comment -

          Had a look at 02. Still don't understand your objection to using Multimap – use ListMultimap if you want to preserve ordering. It's noticeably cleaner than Map<X, List<Y>>, and the Guava guys are very careful about performance.

          Also, 02 kind of abuses an executor when a map of threads would be clearer as to what is going on, while not requiring much more code. "Send this message" is a good task to submit to an executor; "run an infinite loop pulling messages off a public queue" is not.

          Show
          jbellis Jonathan Ellis added a comment - Had a look at 02. Still don't understand your objection to using Multimap – use ListMultimap if you want to preserve ordering. It's noticeably cleaner than Map<X, List<Y>>, and the Guava guys are very careful about performance. Also, 02 kind of abuses an executor when a map of threads would be clearer as to what is going on, while not requiring much more code. "Send this message" is a good task to submit to an executor; "run an infinite loop pulling messages off a public queue" is not.
          Hide
          stuhood Stu Hood added a comment -

          Adding 0003 and 0004 with the requested changes: exception handling was also improved a bit.

          Show
          stuhood Stu Hood added a comment - Adding 0003 and 0004 with the requested changes: exception handling was also improved a bit.
          Hide
          jbellis Jonathan Ellis added a comment -

          I squashed and added code to keep CFRW from slamming Cassandra with spikes of load: it keeps a pooled connection, and sends mutations one at a time over that. This is only a trivial amount of overhead compared to using a large batch, since we're not reconnecting for each message. (The main advantage of using a larger batch is that it gives you an idempotent group of work to replay if necessary, which doesn't matter here. Under the hood it takes the same code path.)

          Also attempted to distinguish between recoverable errors and non- in the exception handling.

          Show
          jbellis Jonathan Ellis added a comment - I squashed and added code to keep CFRW from slamming Cassandra with spikes of load: it keeps a pooled connection, and sends mutations one at a time over that. This is only a trivial amount of overhead compared to using a large batch, since we're not reconnecting for each message. (The main advantage of using a larger batch is that it gives you an idempotent group of work to replay if necessary, which doesn't matter here. Under the hood it takes the same code path.) Also attempted to distinguish between recoverable errors and non- in the exception handling.
          Hide
          stuhood Stu Hood added a comment -
          • ArrayBlockingQueue.isEmpty will kill client threads if their queue is ever empty
          • Interrupt handling doesn't seem like a clearer solution for killing client threads: what happens when an interrupt in received during a mutation?
          • I don't like the idea of indefinite retries: pretending that the cluster is never unavailable sidesteps Hadoop's own retry system
          • As mentioned in IRC, batchSize == 1 does not seem like a good value to hardcode. Any amount of overhead becomes measurable when you are sending small enough values: mutations containing a single integer might increase in size X fold for instance
          Show
          stuhood Stu Hood added a comment - ArrayBlockingQueue.isEmpty will kill client threads if their queue is ever empty Interrupt handling doesn't seem like a clearer solution for killing client threads: what happens when an interrupt in received during a mutation? I don't like the idea of indefinite retries: pretending that the cluster is never unavailable sidesteps Hadoop's own retry system As mentioned in IRC, batchSize == 1 does not seem like a good value to hardcode. Any amount of overhead becomes measurable when you are sending small enough values: mutations containing a single integer might increase in size X fold for instance
          Hide
          jbellis Jonathan Ellis added a comment -

          ArrayBlockingQueue.isEmpty will kill client threads if their queue is ever empty

          it's while (run || isEmpty). am i missing something?

          what happens when an interrupt in received during a mutation

          nothing. InterruptedException is only thrown at well-defined points (one of the few times checked exceptions have done me a favor), and blocking socket send is not one of them. the JDK uses this pattern to shut down threadpoolexecutors.

          I don't like the idea of indefinite retries

          the idea is it tries each endpoint, then throws if they all fail. (if !iter.hasnext() then throw)

          batchSize == 1 does not seem like a good value to hardcode

          as described above, batching > 1 is a misfeature that has been demonstrated to cause badness in practice.

          Show
          jbellis Jonathan Ellis added a comment - ArrayBlockingQueue.isEmpty will kill client threads if their queue is ever empty it's while (run || isEmpty). am i missing something? what happens when an interrupt in received during a mutation nothing. InterruptedException is only thrown at well-defined points (one of the few times checked exceptions have done me a favor), and blocking socket send is not one of them. the JDK uses this pattern to shut down threadpoolexecutors. I don't like the idea of indefinite retries the idea is it tries each endpoint, then throws if they all fail. (if !iter.hasnext() then throw) batchSize == 1 does not seem like a good value to hardcode as described above, batching > 1 is a misfeature that has been demonstrated to cause badness in practice.
          Hide
          stuhood Stu Hood added a comment -

          > it's while (run || isEmpty). am i missing something?
          Ah, sorry.

          > the idea is it tries each endpoint, then throws if they all fail. (if !iter.hasnext() then throw)
          Gotcha. I missed that part because there doesn't appear to be a way for the parent thread to figure out that a client died, so I assumed that the clients never died. Does it need an UncaughtExceptionHandler that alerts the parent thread? This was what was accomplished by using offer() rather than put() in the previous version.

          > as described above, batching > 1 is a misfeature that has been demonstrated to cause badness in practice.
          In Cassandra, or in general?

          Show
          stuhood Stu Hood added a comment - > it's while (run || isEmpty). am i missing something? Ah, sorry. > the idea is it tries each endpoint, then throws if they all fail. (if !iter.hasnext() then throw) Gotcha. I missed that part because there doesn't appear to be a way for the parent thread to figure out that a client died, so I assumed that the clients never died. Does it need an UncaughtExceptionHandler that alerts the parent thread? This was what was accomplished by using offer() rather than put() in the previous version. > as described above, batching > 1 is a misfeature that has been demonstrated to cause badness in practice. In Cassandra, or in general?
          Hide
          jbellis Jonathan Ellis added a comment -

          > Does it need an UncaughtExceptionHandler that alerts the parent thread?

          Probably. What should the parent thread do?

          > In Cassandra, or in general?

          In Cassandra. Flip spent several days in the user IRC channel trying to deal with the load spikes.

          Show
          jbellis Jonathan Ellis added a comment - > Does it need an UncaughtExceptionHandler that alerts the parent thread? Probably. What should the parent thread do? > In Cassandra, or in general? In Cassandra. Flip spent several days in the user IRC channel trying to deal with the load spikes.
          Hide
          stuhood Stu Hood added a comment -

          > Probably. What should the parent thread do?
          Probably what the previous version did.

          > In Cassandra. Flip spent several days in the user IRC channel trying to deal with the load spikes.
          Does changing this patch solve his problem, or are we assuming that?

          Show
          stuhood Stu Hood added a comment - > Probably. What should the parent thread do? Probably what the previous version did. > In Cassandra. Flip spent several days in the user IRC channel trying to deal with the load spikes. Does changing this patch solve his problem, or are we assuming that?
          Hide
          jbellis Jonathan Ellis added a comment -

          v4 attached to throw IOException on put or stopNicely if the thread has errored out

          Show
          jbellis Jonathan Ellis added a comment - v4 attached to throw IOException on put or stopNicely if the thread has errored out
          Hide
          stuhood Stu Hood added a comment -
          • There is a race condition in put() between !run and the put itself
          • Exceptions thrown by child threads will be logged, but not reported to the Hadoop frontend, since they aren't what kill the parent thread

          I'm -0 on v3 and v4: but I'll add a 0005 to separate queue size from batch size, so that we can tune down the batch size for Flip.

          Show
          stuhood Stu Hood added a comment - There is a race condition in put() between !run and the put itself Exceptions thrown by child threads will be logged, but not reported to the Hadoop frontend, since they aren't what kill the parent thread I'm -0 on v3 and v4: but I'll add a 0005 to separate queue size from batch size, so that we can tune down the batch size for Flip.
          Hide
          jbellis Jonathan Ellis added a comment -

          i'm -1 on batching at all.

          Show
          jbellis Jonathan Ellis added a comment - i'm -1 on batching at all.
          Hide
          jbellis Jonathan Ellis added a comment -

          v5 uses a small batch size and eagerly sends out "incomplete" batches if the reducer falls behind

          Show
          jbellis Jonathan Ellis added a comment - v5 uses a small batch size and eagerly sends out "incomplete" batches if the reducer falls behind
          Hide
          stuhood Stu Hood added a comment -
          • ColumnFamilyOutputFormat.createAuthenticatedClient calls socket.open, so the second open in RangeClient is getting TTransportException: Socket already connected
          • Logging a NPE for the first batch is pretty ugly
          • The default batchSize was increased back up to Long.MAX_VALUE: it should probably be significantly lower (32~128) for the reasons you've mentioned
          Show
          stuhood Stu Hood added a comment - ColumnFamilyOutputFormat.createAuthenticatedClient calls socket.open, so the second open in RangeClient is getting TTransportException: Socket already connected Logging a NPE for the first batch is pretty ugly The default batchSize was increased back up to Long.MAX_VALUE: it should probably be significantly lower (32~128) for the reasons you've mentioned
          Hide
          jbellis Jonathan Ellis added a comment -

          v6.

          There is a race condition in put() between !run and the put itself

          not really. the check in put is just an attempt to abort earlier if possible.

          Exceptions thrown by child threads will be logged, but not reported to the Hadoop frontend

          saved actual exceptions.

          the second open in RangeClient is getting TTransportException: Socket already connected

          fixed

          Logging a NPE for the first batch is pretty ugly

          nothing is logged. the alternatives strike me as uglier.

          The default batchSize was increased back up to Long.MAX_VALUE

          fixed

          Show
          jbellis Jonathan Ellis added a comment - v6. There is a race condition in put() between !run and the put itself not really. the check in put is just an attempt to abort earlier if possible. Exceptions thrown by child threads will be logged, but not reported to the Hadoop frontend saved actual exceptions. the second open in RangeClient is getting TTransportException: Socket already connected fixed Logging a NPE for the first batch is pretty ugly nothing is logged. the alternatives strike me as uglier. The default batchSize was increased back up to Long.MAX_VALUE fixed
          Hide
          stuhood Stu Hood added a comment -

          This patch includes a change CompactionManager.java.

          >> There is a race condition in put() between !run and the put itself
          > not really. the check in put is just an attempt to abort earlier if possible.
          put() is called from the parent thread: it isn't interrupted by the child thread, so it will block indefinitely if an exception occurs between lastException != null and the blocking put(). Unlikely, but...


          Other than those two nitpicks, +1: tested against a 12 node cluster and saw smooth network utilization.

          Show
          stuhood Stu Hood added a comment - This patch includes a change CompactionManager.java. >> There is a race condition in put() between !run and the put itself > not really. the check in put is just an attempt to abort earlier if possible. put() is called from the parent thread: it isn't interrupted by the child thread, so it will block indefinitely if an exception occurs between lastException != null and the blocking put(). Unlikely, but... Other than those two nitpicks, +1: tested against a 12 node cluster and saw smooth network utilization.
          Hide
          jbellis Jonathan Ellis added a comment -

          v7

          This patch includes a change CompactionManager.java

          fixed

          it will block indefinitely if an exception occurs between lastException != null and the blocking put()

          you're right. fixed

          Show
          jbellis Jonathan Ellis added a comment - v7 This patch includes a change CompactionManager.java fixed it will block indefinitely if an exception occurs between lastException != null and the blocking put() you're right. fixed
          Hide
          stuhood Stu Hood added a comment -

          +1

          Show
          stuhood Stu Hood added a comment - +1
          Hide
          jbellis Jonathan Ellis added a comment -

          committed

          Show
          jbellis Jonathan Ellis added a comment - committed

            People

            • Assignee:
              jbellis Jonathan Ellis
              Reporter:
              stuhood Stu Hood
              Reviewer:
              Stu Hood
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development