Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Duplicate
    • Fix Version/s: 2.1 beta1
    • Component/s: Core
    • Labels:

      Description

      The existing partitioned counters remain a source of frustration for most users almost two years after being introduced. The remaining problems are inherent in the design, not something that can be fixed given enough time/eyeballs.

      Ideally a solution would give us

      • similar performance
      • less special cases in the code
      • potential for a retry mechanism

        Issue Links

          Activity

          Hide
          Jonathan Ellis added a comment -

          One possibility:

          During write each increment is a separate increment – that is, a separate wide row cell / cql3 row. But during reads, each node sums everything locally and sends that.

          If we do that, merging counts becomes a local problem: you can say that each node merges locally once the counter is older than gc_grace_seconds (or maybe a separate configuration setting). This could be done during compaction.

          Show
          Jonathan Ellis added a comment - One possibility: During write each increment is a separate increment – that is, a separate wide row cell / cql3 row. But during reads, each node sums everything locally and sends that. If we do that, merging counts becomes a local problem: you can say that each node merges locally once the counter is older than gc_grace_seconds (or maybe a separate configuration setting). This could be done during compaction.
          Hide
          Jonathan Halliday added a comment -

          RFE: consider allowing configurable subtyping of counters to support similar behaviours. Existing counters implement SUM(...) but with relatively minor modifications to the merge step the same architecture could also support MIN(...) and MAX(...) semantics which currently require locking or other unpleasant solutions. AVG(...) is harder, needing both the sum and item count to be stored under the hood. Even more advanced is bitwise logical AND/OR/... on bitsets rather than numbers, such as would allow for hyperloglog approximate cardinality counting or other similar operations.

          Show
          Jonathan Halliday added a comment - RFE: consider allowing configurable subtyping of counters to support similar behaviours. Existing counters implement SUM(...) but with relatively minor modifications to the merge step the same architecture could also support MIN(...) and MAX(...) semantics which currently require locking or other unpleasant solutions. AVG(...) is harder, needing both the sum and item count to be stored under the hood. Even more advanced is bitwise logical AND/OR/... on bitsets rather than numbers, such as would allow for hyperloglog approximate cardinality counting or other similar operations.
          Hide
          Srdjan Mitrovic added a comment - - edited

          Don't atomic batches have potential for a retry mechanism?
          Now that CASSANDRA-4285 has been resolved can't you use Atomic batches to have idempotent counters? If we put in the same batch both incr and adding a wide row cell we could later see if we have a cell and not execute that batch again.

          I apologize if there is something fundamentally wrong in my understanding of how cassandra batches work so in that case please delete my message so that it doesn't derail the issue.

          Show
          Srdjan Mitrovic added a comment - - edited Don't atomic batches have potential for a retry mechanism? Now that CASSANDRA-4285 has been resolved can't you use Atomic batches to have idempotent counters? If we put in the same batch both incr and adding a wide row cell we could later see if we have a cell and not execute that batch again. I apologize if there is something fundamentally wrong in my understanding of how cassandra batches work so in that case please delete my message so that it doesn't derail the issue.
          Hide
          Jonathan Ellis added a comment -

          Nope, counters can't be included in atomic batches because of exactly the problems we need to fix here. (Atomic batches rely on the constituent updates being idempotent.)

          Show
          Jonathan Ellis added a comment - Nope, counters can't be included in atomic batches because of exactly the problems we need to fix here. (Atomic batches rely on the constituent updates being idempotent.)
          Hide
          Jonathan Ellis added a comment -

          AVG(...) is harder, needing both the sum and item count to be stored under the hood. Even more advanced is bitwise logical AND/OR/... on bitsets rather than numbers, such as would allow for hyperloglog approximate cardinality counting or other similar operations.

          Not sure we'd want to support avg (since it requires extra information to be stored, as you point out), but it would be easy to support some kind of reduce(Iterable<Column>) with any scheme that stores per-column updates.

          (Not sure we'd want to expose that directly to users though.)

          Show
          Jonathan Ellis added a comment - AVG(...) is harder, needing both the sum and item count to be stored under the hood. Even more advanced is bitwise logical AND/OR/... on bitsets rather than numbers, such as would allow for hyperloglog approximate cardinality counting or other similar operations. Not sure we'd want to support avg (since it requires extra information to be stored, as you point out), but it would be easy to support some kind of reduce(Iterable<Column>) with any scheme that stores per-column updates. (Not sure we'd want to expose that directly to users though.)
          Hide
          Srdjan Mitrovic added a comment - - edited

          Not sure we'd want to support avg (since it requires extra information to be stored, as you point out)

          If we record every incr operation we will have extra info (until compaction )

          I will propose a way you can make idempotent counters work and have all these features.
          1. Create a CF with columns replayID, counterName, value, cnt and optional columns customField1, customField2,....
          (Random partitioner on replayID or if we want to be sure it is unique we can use ComposityType replayID:counterName
          2. Create a secondary index on counterName that we use to find sum(value) on each node separately because secondary index is distributed.
          3. On compaction we delete old replayID, find total of value*cnt and sum(cnt) and store a new row (replayId, counterName, total, new cnt)

          We can use increment operation with some count (this will affect avg). For example incr(counters, myCounter, replayId, 3, 5) which will increment counter by 15 but it will be stored as value 3, cnt 5 so that it affects average in a different way than incrementing by value 15, count 1.

          We can create custom fields for some reduce(Iterable<Column> so that we can support min, max, AND/OR/XOR...For examoke on compaction we would store reduced max in that custom field.

          These counters with replayID could be used in atomic batches.

          It would be ideal if a secondary index could also store values of the columns so that we can read counters in one go on each node. There is another jira issue for this. After that issue is resolved we can only keep secondary index without original CF, we just pretend it exists

          I guess that this approach could be achieved by clients if we have a pluggable compaction strategy but it would still be much easier if secondary indexes could also store other column values, not only keys.

          Show
          Srdjan Mitrovic added a comment - - edited Not sure we'd want to support avg (since it requires extra information to be stored, as you point out) If we record every incr operation we will have extra info (until compaction ) I will propose a way you can make idempotent counters work and have all these features. 1. Create a CF with columns replayID, counterName, value, cnt and optional columns customField1, customField2,.... (Random partitioner on replayID or if we want to be sure it is unique we can use ComposityType replayID:counterName 2. Create a secondary index on counterName that we use to find sum(value) on each node separately because secondary index is distributed. 3. On compaction we delete old replayID, find total of value*cnt and sum(cnt) and store a new row (replayId, counterName, total, new cnt) We can use increment operation with some count (this will affect avg). For example incr(counters, myCounter, replayId, 3, 5) which will increment counter by 15 but it will be stored as value 3, cnt 5 so that it affects average in a different way than incrementing by value 15, count 1. We can create custom fields for some reduce(Iterable<Column> so that we can support min, max, AND/OR/XOR...For examoke on compaction we would store reduced max in that custom field. These counters with replayID could be used in atomic batches. It would be ideal if a secondary index could also store values of the columns so that we can read counters in one go on each node. There is another jira issue for this. After that issue is resolved we can only keep secondary index without original CF, we just pretend it exists I guess that this approach could be achieved by clients if we have a pluggable compaction strategy but it would still be much easier if secondary indexes could also store other column values, not only keys.
          Hide
          Jonathan Halliday added a comment -

          > Not sure we'd want to support avg (since it requires extra information to be stored, as you point out), but it would be easy to support some kind of reduce(Iterable<Column> with any scheme that stores per-column updates.

          On reflection I don't think the extra data is really the problem. It's how to present that data at the client that's tricky. Supporting a configurable reducer for merging old+new local values and local+remote values is only one of the ways that 'counters' can be made more flexible. By also making the datatype of the column that reducer operates on a pluggable option you can achieve a variety of additional behaviours. The existing counters become essentially ReducibleThing<bigint,IntegerAdditionReducer> whilst HyperLogLog may use ReducibleThing<bitset,BitwiseLogicalANDReducer> and the most general case is ReducibleThing<blob,UserDefinedReducer> which would allow for custom 'merge / conflict resolution' policies. In this model the implementation of avg would be ReducibleThing<CompositeType(bigint:sum,bigint:count),TwoFieldIntegerAdditionReducer> which does the right thing on the server, but leaves the client having to manage the composite and calculate the average as sum/count rather than being handed the single value directly.

          Show
          Jonathan Halliday added a comment - > Not sure we'd want to support avg (since it requires extra information to be stored, as you point out), but it would be easy to support some kind of reduce(Iterable<Column> with any scheme that stores per-column updates. On reflection I don't think the extra data is really the problem. It's how to present that data at the client that's tricky. Supporting a configurable reducer for merging old+new local values and local+remote values is only one of the ways that 'counters' can be made more flexible. By also making the datatype of the column that reducer operates on a pluggable option you can achieve a variety of additional behaviours. The existing counters become essentially ReducibleThing<bigint,IntegerAdditionReducer> whilst HyperLogLog may use ReducibleThing<bitset,BitwiseLogicalANDReducer> and the most general case is ReducibleThing<blob,UserDefinedReducer> which would allow for custom 'merge / conflict resolution' policies. In this model the implementation of avg would be ReducibleThing<CompositeType(bigint:sum,bigint:count),TwoFieldIntegerAdditionReducer> which does the right thing on the server, but leaves the client having to manage the composite and calculate the average as sum/count rather than being handed the single value directly.
          Hide
          Jonathan Ellis added a comment -

          To go back to implementation...

          merging counts becomes a local problem: you can say that each node merges locally once the counter is older than gc_grace_seconds

          There is a coordination problem here; if one node merges away the values slightly before the others, it will give us counts that we can't reconcile at the coordinator. So we'd need to do it more like "after gc_grace_seconds, rounded up to the nearest hour" to prevent problems with clock drift.

          Additional note: until we perma-merge the old data after gcgs, we'll want to keep a "preliminary merge" (in a separate, hidden CF?) for read performance. This could be a (merged count, md5) tuple; coordinator could read-repair using the md5s.

          Show
          Jonathan Ellis added a comment - To go back to implementation... merging counts becomes a local problem: you can say that each node merges locally once the counter is older than gc_grace_seconds There is a coordination problem here; if one node merges away the values slightly before the others, it will give us counts that we can't reconcile at the coordinator. So we'd need to do it more like "after gc_grace_seconds, rounded up to the nearest hour" to prevent problems with clock drift. Additional note: until we perma-merge the old data after gcgs, we'll want to keep a "preliminary merge" (in a separate, hidden CF?) for read performance. This could be a (merged count, md5) tuple; coordinator could read-repair using the md5s.
          Hide
          Sylvain Lebresne added a comment -

          For the record, I'd like to "quickly" sum up what are the problems of the counters 1.0, and more precisely what I think are problems inherent to the design, and what I believe might be fixable with some effort.

          The current counter implementation is based on the idea of internally keeping one separated sub-counter (or "shard") for each replica of the counter, and making sure that for each increment, one shard and only one is ever incremented. The latter is ensure by the special write path of counters that:

          • pick a live replica and forward it the increment
          • have that replica increment it's own "shard" locally
          • then have the replica send the result of this local shard increment to the other replicas

          This mechanism have (at least) the following problems:

          1. counters cannot be retried safely on timeout.
          2. removing counters works only halfway. If you re-increment a deleted counter too soon, the result is somewhat random.

          Those problems are largely due to the general mechanism used, not to implementation details. That being said, on the retry problem, I'll note that while I don't think we can fix it in the current mechanism, tickets like CASSANDRA-3199 could mitigate it somewhat by making TimeoutException less likely.

          Other problems are more due to how the implementation works. More precisely, they are due to how a replica proceed to incrementing it's own shard. To do that, the implementation uses separated merging rules for "local" shards and "remote" ones. Namely, local shards are summed during merge (so the sub-count they contain is used as a delta) while for remote ones, the "biggest" value is kept (where "biggest" means "the one with the biggest clock"). So for remote shards, conflicts are handled as "latests wins" as usual. The reason for that difference between local and remote shards is a performance one: when a replica needs to increment his shard, it needs to do that "atomically". So if local shard were handled like remote ones, then to increment the local shard we would need to 1) grab a lock, 2) read the current value, 3) increment it, 4) write it and then 5) release the lock. Instead, in the current implementation, the replica just insert an increment to his own shard. And to find the total value of its local shard, it just read and increments get merged on reads. In practice, what we win is that we don't have to grab a lock.

          However, I believe that "implementation detail" is responsible for a fair amount of the pain counters are. In particular it complicates the implementation substantially because:

          • a local shard on one replica is a remote shard on another replica. We handle this by transforming shards during deserialization, which is complex and fragile. It's also the source of CASSANDRA-4071 (and at least one contributor to CASSANDRA-4417).
          • we have to be extremely careful not to duplicate a local shard internally or we'll over-count. The storage engine having been initially designed with the idea that using the same column twice was harmless, this has led to a number of bugs.

          We could change that "implementation detail". Instead we could stop distinguishing the merge rules for local shard, and when a replica need to increment his hard, he would read/increment/write while holding a lock to ensure atomicity. This would likely simplify the implementation and fix CASSANDRA-4071 and CASSANDRA-4417. Of course, this would still not fix the other top-level problems (not being able to replay, broken remove, ....).

          Show
          Sylvain Lebresne added a comment - For the record, I'd like to "quickly" sum up what are the problems of the counters 1.0, and more precisely what I think are problems inherent to the design, and what I believe might be fixable with some effort. The current counter implementation is based on the idea of internally keeping one separated sub-counter (or "shard") for each replica of the counter, and making sure that for each increment, one shard and only one is ever incremented. The latter is ensure by the special write path of counters that: pick a live replica and forward it the increment have that replica increment it's own "shard" locally then have the replica send the result of this local shard increment to the other replicas This mechanism have (at least) the following problems: counters cannot be retried safely on timeout. removing counters works only halfway. If you re-increment a deleted counter too soon, the result is somewhat random. Those problems are largely due to the general mechanism used, not to implementation details. That being said, on the retry problem, I'll note that while I don't think we can fix it in the current mechanism, tickets like CASSANDRA-3199 could mitigate it somewhat by making TimeoutException less likely. Other problems are more due to how the implementation works. More precisely, they are due to how a replica proceed to incrementing it's own shard. To do that, the implementation uses separated merging rules for "local" shards and "remote" ones. Namely, local shards are summed during merge (so the sub-count they contain is used as a delta) while for remote ones, the "biggest" value is kept (where "biggest" means "the one with the biggest clock"). So for remote shards, conflicts are handled as "latests wins" as usual. The reason for that difference between local and remote shards is a performance one: when a replica needs to increment his shard, it needs to do that "atomically". So if local shard were handled like remote ones, then to increment the local shard we would need to 1) grab a lock, 2) read the current value, 3) increment it, 4) write it and then 5) release the lock. Instead, in the current implementation, the replica just insert an increment to his own shard. And to find the total value of its local shard, it just read and increments get merged on reads. In practice, what we win is that we don't have to grab a lock. However, I believe that "implementation detail" is responsible for a fair amount of the pain counters are. In particular it complicates the implementation substantially because: a local shard on one replica is a remote shard on another replica. We handle this by transforming shards during deserialization, which is complex and fragile. It's also the source of CASSANDRA-4071 (and at least one contributor to CASSANDRA-4417 ). we have to be extremely careful not to duplicate a local shard internally or we'll over-count. The storage engine having been initially designed with the idea that using the same column twice was harmless, this has led to a number of bugs. We could change that "implementation detail". Instead we could stop distinguishing the merge rules for local shard, and when a replica need to increment his hard, he would read/increment/write while holding a lock to ensure atomicity. This would likely simplify the implementation and fix CASSANDRA-4071 and CASSANDRA-4417 . Of course, this would still not fix the other top-level problems (not being able to replay, broken remove, ....).
          Hide
          Jonathan Halliday added a comment -

          To my mind it's the read rather than the lock that's the problem. The design pattern of 'blind write to log the event on increment, read and replay the series of events to calculate the current value on read' would seem more in keeping with the 'cassandra way'. Of course you need to periodically squash the increment log down to a single point in time value to stop it growing unbounded and that step does need locking of some form, but that batch operation is less costly than a lock/read on each write. In keeping with my earlier point re: making the 'counters' mechanism as general/pluggable as possible, having a generic implementation of the 'log a modification event stream, replay it on read' that could eventually be exposed as a primitive on which users could build their own functionality would be a big win. It may be a harder pattern to implement well, but it's ultimately a lot more powerful.

          Show
          Jonathan Halliday added a comment - To my mind it's the read rather than the lock that's the problem. The design pattern of 'blind write to log the event on increment, read and replay the series of events to calculate the current value on read' would seem more in keeping with the 'cassandra way'. Of course you need to periodically squash the increment log down to a single point in time value to stop it growing unbounded and that step does need locking of some form, but that batch operation is less costly than a lock/read on each write. In keeping with my earlier point re: making the 'counters' mechanism as general/pluggable as possible, having a generic implementation of the 'log a modification event stream, replay it on read' that could eventually be exposed as a primitive on which users could build their own functionality would be a big win. It may be a harder pattern to implement well, but it's ultimately a lot more powerful.
          Hide
          Staņislavs Koikovs added a comment -

          I think you could simplify counter implementation a little bit by creating 2 counters internally instead of 1 (increment counter and decrement counter). So with RF=3 each counter replica would keep track of 6 sub-counters (2 local and 4 remote). To get counter value you sum all sub-counters and for conflict resolution you take a sub-counter with higher value. I hope this makes sense.

          Show
          Staņislavs Koikovs added a comment - I think you could simplify counter implementation a little bit by creating 2 counters internally instead of 1 (increment counter and decrement counter). So with RF=3 each counter replica would keep track of 6 sub-counters (2 local and 4 remote). To get counter value you sum all sub-counters and for conflict resolution you take a sub-counter with higher value. I hope this makes sense.
          Hide
          Nicolas Favre-Felix added a comment -

          I would like to describe a design that was discussed at Acunu last year, aiming to resolve the problems pointed out by Sylvain as well as remove the read operation needed by replicate_on_write.

          Our solution added a unique identifier per counter update operation, used to identify duplicate commands and avoid overcounts on retry. The main problem in storing (UUID, delta) pairs per counter is the O(N) read complexity; this is how people implemented counters before 0.8 and it is a pretty inefficient way of counting things.

          Our idea was to merge those update pairs in the back-end, trying to always keep a small number of deltas instead of all of them. Merging those updates requires some level of synchronisation between the replicas, but that's not something that Cassandra is completely adverse to as active anty-entropy also requires all replicas to be available.
          This design considered using a tree per counter, with time-based buckets containing all increments to the counter for a given time period - say, 5 seconds by default. Once this time has passed, the bucket for the past 5 seconds is queued for synchronization amongst all replicas and eventually replaced with an equivalent bucket containing a single increment with a UUID built from all the updates that it replaces (using XOR would work). If the replicas disagree on what needs to be in the bucket, they send each other missed updates in the same way that data is exchanged during repair. If a node is down, we keep accumulating 5-second buckets that will need to be merged later.
          The 5-second buckets are eventually merged into a minute bucket, then an hour bucket, etc.

          As an added bonus, the reduce function can be set to MIN, MAX, SUM_SQ, etc. instead of just SUM.

          Here are the main drawbacks I see for this approach:

          • The implementation becomes a fair bit more complicated.
          • Counters take more space that they used to.
          • The replicas need to all be up for the "collapsing" operation to run. It might just be that counters start to get slower if some of your nodes are down for a long time. You can't merge updates with a replica down or you might lose increments.
          • We introduce an actual timestamp instead of the current binary blob.
          • The implementation is not compatible with the current one.
          • The performance characteristics of these counters are unknown.
          • No code exists.
          Show
          Nicolas Favre-Felix added a comment - I would like to describe a design that was discussed at Acunu last year, aiming to resolve the problems pointed out by Sylvain as well as remove the read operation needed by replicate_on_write. Our solution added a unique identifier per counter update operation, used to identify duplicate commands and avoid overcounts on retry. The main problem in storing (UUID, delta) pairs per counter is the O(N) read complexity; this is how people implemented counters before 0.8 and it is a pretty inefficient way of counting things. Our idea was to merge those update pairs in the back-end, trying to always keep a small number of deltas instead of all of them. Merging those updates requires some level of synchronisation between the replicas, but that's not something that Cassandra is completely adverse to as active anty-entropy also requires all replicas to be available. This design considered using a tree per counter, with time-based buckets containing all increments to the counter for a given time period - say, 5 seconds by default. Once this time has passed, the bucket for the past 5 seconds is queued for synchronization amongst all replicas and eventually replaced with an equivalent bucket containing a single increment with a UUID built from all the updates that it replaces (using XOR would work). If the replicas disagree on what needs to be in the bucket, they send each other missed updates in the same way that data is exchanged during repair. If a node is down, we keep accumulating 5-second buckets that will need to be merged later. The 5-second buckets are eventually merged into a minute bucket, then an hour bucket, etc. As an added bonus, the reduce function can be set to MIN, MAX, SUM_SQ, etc. instead of just SUM. Here are the main drawbacks I see for this approach: The implementation becomes a fair bit more complicated. Counters take more space that they used to. The replicas need to all be up for the "collapsing" operation to run. It might just be that counters start to get slower if some of your nodes are down for a long time. You can't merge updates with a replica down or you might lose increments. We introduce an actual timestamp instead of the current binary blob. The implementation is not compatible with the current one. The performance characteristics of these counters are unknown. No code exists.
          Hide
          Jonathan Ellis added a comment -

          The implementation becomes a fair bit more complicated

          I'm not really sure about that. Both with this and the above design [1], we do have more complexity on read/reconcile, but we trade that for zero special cases at the storage engine, which is a huge win.

          [1] https://issues.apache.org/jira/browse/CASSANDRA-4775?focusedCommentId=13586715&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13586715

          Show
          Jonathan Ellis added a comment - The implementation becomes a fair bit more complicated I'm not really sure about that. Both with this and the above design [1] , we do have more complexity on read/reconcile, but we trade that for zero special cases at the storage engine, which is a huge win. [1] https://issues.apache.org/jira/browse/CASSANDRA-4775?focusedCommentId=13586715&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13586715
          Hide
          Edward Capriolo added a comment -

          Random idea: Why not have separate memtable and separate commit logs for counters. You can not mix columns and counter columns anyway and they work very differently so lifting them completely out of the normal storage layer might help.

          Show
          Edward Capriolo added a comment - Random idea: Why not have separate memtable and separate commit logs for counters. You can not mix columns and counter columns anyway and they work very differently so lifting them completely out of the normal storage layer might help.
          Hide
          Edward Capriolo added a comment -

          Crazy idea #2: Bring back the vector clocks!

          Show
          Edward Capriolo added a comment - Crazy idea #2: Bring back the vector clocks!
          Hide
          Jonathan Ellis added a comment -

          The reason we backed away from vector clocks is that they can tell you you had a conflict, but not how to resolve it. E.g. applied to counters, if you had divergent clocks for counts of 99 and 101, the correct combined count could be anything from 101 to 200. You don't know without additional history that is basically equivalent of column-per-increment.

          Show
          Jonathan Ellis added a comment - The reason we backed away from vector clocks is that they can tell you you had a conflict, but not how to resolve it. E.g. applied to counters, if you had divergent clocks for counts of 99 and 101, the correct combined count could be anything from 101 to 200. You don't know without additional history that is basically equivalent of column-per-increment.
          Hide
          Jonathan Ellis added a comment -

          We could change that "implementation detail". Instead we could stop distinguishing the merge rules for local shard, and when a replica need to increment his hard, he would read/increment/write while holding a lock to ensure atomicity. This would likely simplify the implementation and fix CASSANDRA-4071 and CASSANDRA-4417. Of course, this would still not fix the other top-level problems (not being able to replay, broken remove, ....).

          I'm starting to think this is probably our lowest-hanging fruit here. I think we could get good performance too if we "cache" hot counters as AtomicLong objects.

          I note for the record that "retryable" is at the very bottom of my priorities here. Single-machine databases don't allow retry either if they lose the connection in the middle of UPDATE foo SET x=x+1 WHERE key = .... And everyone just lives with it.

          Show
          Jonathan Ellis added a comment - We could change that "implementation detail". Instead we could stop distinguishing the merge rules for local shard, and when a replica need to increment his hard, he would read/increment/write while holding a lock to ensure atomicity. This would likely simplify the implementation and fix CASSANDRA-4071 and CASSANDRA-4417 . Of course, this would still not fix the other top-level problems (not being able to replay, broken remove, ....). I'm starting to think this is probably our lowest-hanging fruit here. I think we could get good performance too if we "cache" hot counters as AtomicLong objects. I note for the record that "retryable" is at the very bottom of my priorities here. Single-machine databases don't allow retry either if they lose the connection in the middle of UPDATE foo SET x=x+1 WHERE key = ... . And everyone just lives with it.
          Hide
          Aleksey Yeschenko added a comment -

          Was looking into possible variations of the original idea - the first comment in this issue, and how we could implement it to allow true idempotent client-retriable counter updates, and came with this so far: https://gist.github.com/iamaleksey/fa36552409dc2aa70bee

          Cons:

          • has significant overhead per-cell - needs to store the timeuuid in the column name
          • has the obvious overhead of having multiple cells per single counter
          • requires either forcing QUORUM writes (so that merge could get away with QUORUM reads) or ALL reads for the merge. Now, this is probably true for any merge process for any variation of 'each update = new cell, with periodic merges'
          • requires special-casing on the read path
          • maintaining backward compatibility isn't going to be fun

          Pros:

          • updates are truly idempotent (within the configurable write window)
          • allows counters to coexist with regular columns in regular tables
          • allows including counters in (non-logged) batches
          • requires a lot less overall special-casing
          • easily supports min/max/avg/sum_sq functions

          Not sure if this is viable considering that '"retryable" is at the very bottom of my priorities here' but maybe there is something useful in there anyway.

          Show
          Aleksey Yeschenko added a comment - Was looking into possible variations of the original idea - the first comment in this issue, and how we could implement it to allow true idempotent client-retriable counter updates, and came with this so far: https://gist.github.com/iamaleksey/fa36552409dc2aa70bee Cons: has significant overhead per-cell - needs to store the timeuuid in the column name has the obvious overhead of having multiple cells per single counter requires either forcing QUORUM writes (so that merge could get away with QUORUM reads) or ALL reads for the merge. Now, this is probably true for any merge process for any variation of 'each update = new cell, with periodic merges' requires special-casing on the read path maintaining backward compatibility isn't going to be fun Pros: updates are truly idempotent (within the configurable write window) allows counters to coexist with regular columns in regular tables allows including counters in (non-logged) batches requires a lot less overall special-casing easily supports min/max/avg/sum_sq functions Not sure if this is viable considering that '"retryable" is at the very bottom of my priorities here' but maybe there is something useful in there anyway.
          Hide
          Theodore Hong added a comment -

          Regarding retries, it seems like CASSANDRA-3199 would have at least partially addressed the retry problem - why was that closed?

          Show
          Theodore Hong added a comment - Regarding retries, it seems like CASSANDRA-3199 would have at least partially addressed the retry problem - why was that closed?
          Hide
          Timo Kinnunen added a comment -

          I read a paper called A comprehensive study of Convergent and Commutative Replicated Data Types, which specifies an eventually consistent increment/decrement counter, among other things. It's available here: http://hal.inria.fr/docs/00/55/55/88/PDF/techreport.pdf

          Briefly, the idea is pretty simple: each replica of the counter has an array of values, containing one value per replica, called a payload. Each value in the array is the sum of the increments that a single replica has performed. The value of the counter is the sum of the values in the array. The payloads are sent to other replicas and each replica merges an incoming payload by taking the maximum of each pair of values in the incoming and local arrays. This is idempotent so payloads can be resent as many times as needed.

          Show
          Timo Kinnunen added a comment - I read a paper called A comprehensive study of Convergent and Commutative Replicated Data Types, which specifies an eventually consistent increment/decrement counter, among other things. It's available here: http://hal.inria.fr/docs/00/55/55/88/PDF/techreport.pdf Briefly, the idea is pretty simple: each replica of the counter has an array of values, containing one value per replica, called a payload. Each value in the array is the sum of the increments that a single replica has performed. The value of the counter is the sum of the values in the array. The payloads are sent to other replicas and each replica merges an incoming payload by taking the maximum of each pair of values in the incoming and local arrays. This is idempotent so payloads can be resent as many times as needed.
          Hide
          Jonathan Ellis added a comment -

          That is an excellent description of our existing implementation.

          Show
          Jonathan Ellis added a comment - That is an excellent description of our existing implementation.
          Hide
          Colin B. added a comment -

          Would anyone be interested in a type of counter that is about 99% correct, but not exact?

          The Hyperloglog cardinality estimation algorithm would be fairly straight forward to implement inside Cassandra. It estimates the number of distinct elements in a set. One way to use it as a counter is to have a two "set"s of timeuuids, A and R. Each time you want to increment the counter add a timeuuid to the A set, each time you want to decrement add a timeuuid to the R set. Count is the count in A minus the count in R. Re-adding the same item (timeuuid) to a "set" is idempotent. A read would need to access a constant amount of internal data and the internal data is a good fit for Cassandra's method of merging distributed writes.

          A description of the Hyperloglog algorithm is available here:
          http://blog.aggregateknowledge.com/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/

          Show
          Colin B. added a comment - Would anyone be interested in a type of counter that is about 99% correct, but not exact? The Hyperloglog cardinality estimation algorithm would be fairly straight forward to implement inside Cassandra. It estimates the number of distinct elements in a set. One way to use it as a counter is to have a two "set"s of timeuuids, A and R. Each time you want to increment the counter add a timeuuid to the A set, each time you want to decrement add a timeuuid to the R set. Count is the count in A minus the count in R. Re-adding the same item (timeuuid) to a "set" is idempotent. A read would need to access a constant amount of internal data and the internal data is a good fit for Cassandra's method of merging distributed writes. A description of the Hyperloglog algorithm is available here: http://blog.aggregateknowledge.com/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/
          Hide
          Nicolas Favre-Felix added a comment -

          A few comments on the design posted above in a GitHub gist:

          • The "time" part of the client-provided TimeUUID is now compared to the server's timestamp in the test "if(time(update-timeuuid) < now() - counter_write_window)". This is not ideal in my opinion, but I guess Cassandra is now using "real" timestamps a lot more than it used to. In any case, an "old" delta could also fall behind a "merge" cell and be ignored on read.
          • Having "merge cells" means that we could support both TTLs and "put" operations on counters, as long as the semantics are well defined.
          • Could counter merges happen in the background at ALL since most reads will receive responses from all replicas anyway, or would we always require QUORUM writes? This could be too restrictive in multi-DC deployments where most people probably prefer to read and write at LOCAL_QUORUM.

          As described above, finding a "merge point" at which we could roll-up deltas involves either QUORUM reads + QUORUM writes or a read at ALL. This is necessary since we need a majority of replicas to persist the merge cell. We could consider this "set of deltas" that make up a counter to be merged at different levels, though. When this set is common to all replicas (as is proposed above), we can only merge in QUORUM reads if we can guarantee quorum writes or must merge in reads at ALL otherwise.

          If, instead, we shard this "set of deltas" among replicas with a distribution scheme resembling the existing implementation, each replica becomes the "coordinator" for a fraction of deltas and can (on its own) merge the increments for which it is responsible and issue "merge cells" to replace them. It becomes possible to read and write at LOCAL_QUORUM using this scheme. As any particular replica is the only source of truth for the subset of deltas that it was assigned, it does by definition read ALL of its deltas and can sum them up with no risk of inconsistency. When these cells are node-local with a single source of truth, they can be merged by their owner and a merge cell replicated easily.
          The main issue with this implementation is the choice of the coordinator node for an increment operation: if we assign a replica at random, retrying would lead to duplicates; if we assign a replica deterministically (based on the operation's UUID for example) we risk not being able to write to the counter if that particular replica goes down.

          I'd like to propose a solution that lies between merging counters across the whole cluster and merging counters in each individual replica:
          We can shard counters based on the datacenter, and roll-up these UUIDs per DC. In that case, the scope of the set of replicas involved in merging deltas together is therfore limited to the replicas of the local DC, which (once again) can merge deltas by either getting involved at W.QUORUM+R.QUORUM or W.anything+R.ALL.
          A configuration flag per counter CF would configure whether we require W.QUORUM+R.QUORUM (default) or let clients write with any CL with the downside that deltas can only be merged at CL.ALL.
          The same issue with retries applies here, albeit at a different level: a particular operation can only be retried safely if it sent to the same datacenter, which seems reasonable.

          I believe that the space and time overheads are about the same as in Aleksey's design.

          Suggestions and ideas much welcome.

          Show
          Nicolas Favre-Felix added a comment - A few comments on the design posted above in a GitHub gist: The "time" part of the client-provided TimeUUID is now compared to the server's timestamp in the test "if(time(update-timeuuid) < now() - counter_write_window)". This is not ideal in my opinion, but I guess Cassandra is now using "real" timestamps a lot more than it used to. In any case, an "old" delta could also fall behind a "merge" cell and be ignored on read. Having "merge cells" means that we could support both TTLs and "put" operations on counters, as long as the semantics are well defined. Could counter merges happen in the background at ALL since most reads will receive responses from all replicas anyway, or would we always require QUORUM writes? This could be too restrictive in multi-DC deployments where most people probably prefer to read and write at LOCAL_QUORUM. As described above, finding a "merge point" at which we could roll-up deltas involves either QUORUM reads + QUORUM writes or a read at ALL. This is necessary since we need a majority of replicas to persist the merge cell. We could consider this "set of deltas" that make up a counter to be merged at different levels, though. When this set is common to all replicas (as is proposed above), we can only merge in QUORUM reads if we can guarantee quorum writes or must merge in reads at ALL otherwise. If, instead, we shard this "set of deltas" among replicas with a distribution scheme resembling the existing implementation, each replica becomes the "coordinator" for a fraction of deltas and can (on its own) merge the increments for which it is responsible and issue "merge cells" to replace them. It becomes possible to read and write at LOCAL_QUORUM using this scheme. As any particular replica is the only source of truth for the subset of deltas that it was assigned, it does by definition read ALL of its deltas and can sum them up with no risk of inconsistency. When these cells are node-local with a single source of truth, they can be merged by their owner and a merge cell replicated easily. The main issue with this implementation is the choice of the coordinator node for an increment operation: if we assign a replica at random, retrying would lead to duplicates; if we assign a replica deterministically (based on the operation's UUID for example) we risk not being able to write to the counter if that particular replica goes down. I'd like to propose a solution that lies between merging counters across the whole cluster and merging counters in each individual replica: We can shard counters based on the datacenter, and roll-up these UUIDs per DC. In that case, the scope of the set of replicas involved in merging deltas together is therfore limited to the replicas of the local DC, which (once again) can merge deltas by either getting involved at W.QUORUM+R.QUORUM or W.anything+R.ALL. A configuration flag per counter CF would configure whether we require W.QUORUM+R.QUORUM (default) or let clients write with any CL with the downside that deltas can only be merged at CL.ALL. The same issue with retries applies here, albeit at a different level: a particular operation can only be retried safely if it sent to the same datacenter, which seems reasonable. I believe that the space and time overheads are about the same as in Aleksey's design. Suggestions and ideas much welcome.
          Hide
          Timo Kinnunen added a comment -

          I've elaborated about my understanding of the similarities and differences between Cassandra's counters and CRDT Positive-Negative Counters, what the design looks like in the problem areas in contrast to CRDT's design, what changes could be made to bring the design closer to how PN-Counters are designed to work and finally how garbage collecting shards, shard ownership changes, decommissioning nodes and retrying could be implemented under the changed design, it's all here: https://gist.github.com/Overruler/14c0f3810e870666a328

          To summarize these 3 changes together should make the design more robust:

          1) The buffering and bulk processing of incoming requests isn’t specific to counters so the unprocessed increments don’t need to be stored in the counter, instead they should be inserted into a work queue that's separate from the counter and only belongs to the replica. This way unprocessed increments won’t get propagated to other replicas. When the replica needs to calculate the value of the counter, it processes the work queue and increments its shards as normal with all the locking and writing.

          2) To propagate the incremented counter the replica can create a list of all (or some of) the shards that are replicating the counter and insert an exact duplicate shard for each of them to receive. The new shards are transmitted to other replicas like before.

          3) Resolving two shards that are owned by the same replica is changed to happen the same way on every replica. To ensure a shard is never decremented the node always keeps the shard with the highest absolute value and ignores timestamps. Last-Write-Wins must not be allowed to affect the convergence of the values in shards. I’m not sure how big of a threat this is in practice.

          With these the values in shards and the whole counter should keep converging. I'm probably using wrong terms for things somewhere, apologies. Please tell me if I'm missing something about Cassandra's workings or CRDTs.

          Show
          Timo Kinnunen added a comment - I've elaborated about my understanding of the similarities and differences between Cassandra's counters and CRDT Positive-Negative Counters, what the design looks like in the problem areas in contrast to CRDT's design, what changes could be made to bring the design closer to how PN-Counters are designed to work and finally how garbage collecting shards, shard ownership changes, decommissioning nodes and retrying could be implemented under the changed design, it's all here: https://gist.github.com/Overruler/14c0f3810e870666a328 To summarize these 3 changes together should make the design more robust: 1) The buffering and bulk processing of incoming requests isn’t specific to counters so the unprocessed increments don’t need to be stored in the counter, instead they should be inserted into a work queue that's separate from the counter and only belongs to the replica. This way unprocessed increments won’t get propagated to other replicas. When the replica needs to calculate the value of the counter, it processes the work queue and increments its shards as normal with all the locking and writing. 2) To propagate the incremented counter the replica can create a list of all (or some of) the shards that are replicating the counter and insert an exact duplicate shard for each of them to receive. The new shards are transmitted to other replicas like before. 3) Resolving two shards that are owned by the same replica is changed to happen the same way on every replica. To ensure a shard is never decremented the node always keeps the shard with the highest absolute value and ignores timestamps. Last-Write-Wins must not be allowed to affect the convergence of the values in shards. I’m not sure how big of a threat this is in practice. With these the values in shards and the whole counter should keep converging. I'm probably using wrong terms for things somewhere, apologies. Please tell me if I'm missing something about Cassandra's workings or CRDTs.
          Hide
          Jonathan Ellis added a comment -

          Going back to the Sylvain's comments on the existing counters implementation:

          I believe that "implementation detail" is responsible for a fair amount of the pain counters are... We could change that "implementation detail". Instead we could stop distinguishing the merge rules for local shard, and when a replica need to increment his hard, he would read/increment/write while holding a lock to ensure atomicity.

          On the one hand, the idea of applying a band aid instead of a rewrite is very appealing from a risk management perspective, and the merge code has pained me ever since we added it. On the other hand, I have two problems with counters 1.0 that are not addressed by this:

          1. read-before-write is inherent in the 1.0 design [although for the record I think its original authors did not realize that], which means counters offer very different [worse] performance from "normal" Cassandra updates. We see this confuse people fairly regularly today, and we saw the same confusion with indexes before we fixed it there.
          2. I don't care a whole lot about replayability from a client perspective, but idempotence internally is very handy (CASSANDRA-5549).
          Show
          Jonathan Ellis added a comment - Going back to the Sylvain's comments on the existing counters implementation: I believe that "implementation detail" is responsible for a fair amount of the pain counters are... We could change that "implementation detail". Instead we could stop distinguishing the merge rules for local shard, and when a replica need to increment his hard, he would read/increment/write while holding a lock to ensure atomicity. On the one hand, the idea of applying a band aid instead of a rewrite is very appealing from a risk management perspective, and the merge code has pained me ever since we added it. On the other hand, I have two problems with counters 1.0 that are not addressed by this: read-before-write is inherent in the 1.0 design [although for the record I think its original authors did not realize that] , which means counters offer very different [worse] performance from "normal" Cassandra updates. We see this confuse people fairly regularly today, and we saw the same confusion with indexes before we fixed it there. I don't care a whole lot about replayability from a client perspective, but idempotence internally is very handy ( CASSANDRA-5549 ).
          Hide
          Sylvain Lebresne added a comment -

          read-before-write is inherent in the 1.0 design

          That's true, and that's annoying. That being said, I think the fact that today CL.ONE lies about doing a read-before-write is probably a good part of what end up surprising people. Besides, I'll admit that I remain to be convinced about the performance of doing one column per write with on-read merging (as as been suggested for a replacement). It will clearly be faster on writes, but performance on read remains to be seen (I'm particularly afraid of "everything run fines in testing, but I go in production, some counters gets more load than expected and now the read performance on those goes through the roof"). But yes, read-before-write is a PITA and somewhat "anti-Cassandra".

          but idempotence internally is very handy

          I couldn't agree more, and that's my biggest problem with the current implementation (and as always be really). But just in case that wasn't clear, the whole goal of the "band aid" (as you call it) I'm mentioning would to restore internal idempotence. So that problem is not inherent to counters 1.0.

          Show
          Sylvain Lebresne added a comment - read-before-write is inherent in the 1.0 design That's true, and that's annoying. That being said, I think the fact that today CL.ONE lies about doing a read-before-write is probably a good part of what end up surprising people. Besides, I'll admit that I remain to be convinced about the performance of doing one column per write with on-read merging (as as been suggested for a replacement). It will clearly be faster on writes, but performance on read remains to be seen (I'm particularly afraid of "everything run fines in testing, but I go in production, some counters gets more load than expected and now the read performance on those goes through the roof"). But yes, read-before-write is a PITA and somewhat "anti-Cassandra". but idempotence internally is very handy I couldn't agree more, and that's my biggest problem with the current implementation (and as always be really). But just in case that wasn't clear, the whole goal of the "band aid" (as you call it) I'm mentioning would to restore internal idempotence. So that problem is not inherent to counters 1.0.
          Hide
          Jonathan Ellis added a comment - - edited

          that problem is not inherent to counters 1.0

          You'll have to explain that for me in small words, then – I don't see how moving to lock-and-read-before-write allows us to safely replay the same commitlog segment twice, for instance.

          Show
          Jonathan Ellis added a comment - - edited that problem is not inherent to counters 1.0 You'll have to explain that for me in small words, then – I don't see how moving to lock-and-read-before-write allows us to safely replay the same commitlog segment twice, for instance.
          Hide
          Sylvain Lebresne added a comment -

          We currently have 2 types of shards in a couter context: local and remotes. Remote ones are resolved using a "the biggest-clock wins" rule so they are not problem for internal idempotence. The problem are local shards: when a new increment comes int, the "leader" of that increment writes a new cell with that increment as local shard, and those local shard get summed during merge. That's the problem, since duplicating said shard imply a duplicate count.

          But those local shards are an implementation detail. The idea of doing a read-before-write imply not using those anymore. So on a new increment, the "leader" of the increment would read the current value for his own shard, compute the increment value (and incremented clock) and write that result. But it's now the result of the increment (for his own shard, other node can safely increment their own shard concurrently), not an increment. So duplicating those cells doesn't matter at all. It does mean you synchronize locally the read-before-write of course, hence my mentioning of locking above.

          Make sense?

          Show
          Sylvain Lebresne added a comment - We currently have 2 types of shards in a couter context: local and remotes. Remote ones are resolved using a "the biggest-clock wins" rule so they are not problem for internal idempotence. The problem are local shards: when a new increment comes int, the "leader" of that increment writes a new cell with that increment as local shard, and those local shard get summed during merge. That's the problem, since duplicating said shard imply a duplicate count. But those local shards are an implementation detail. The idea of doing a read-before-write imply not using those anymore. So on a new increment, the "leader" of the increment would read the current value for his own shard, compute the increment value (and incremented clock) and write that result. But it's now the result of the increment (for his own shard, other node can safely increment their own shard concurrently), not an increment. So duplicating those cells doesn't matter at all. It does mean you synchronize locally the read-before-write of course, hence my mentioning of locking above. Make sense?
          Hide
          Jonathan Ellis added a comment -

          So we'd lock even before the commitlog entry?

          Show
          Jonathan Ellis added a comment - So we'd lock even before the commitlog entry?
          Hide
          Sylvain Lebresne added a comment -

          So we'd lock even before the commitlog entry?

          Yes. The SP.applyCounterMutationOnLeader would become something like (on just one counter to make it simpler):

          public AbstractWriteResponseHandler applyCounterMutationOnLeader(CFStore cfs, ByteBuffer key, CounterUpdateColumn increment)
          {
             counterLock.lock();
             try
             {
                CounterColumn current = read(cfs, key, column.name());
                CounterColumn newValue = current.add(increment); // This would return a new value where the current node shard has an incremented value and clock
                cfs.apply(newValue);
             }
             finally
             {
                 counterLock.unlock();
             }
          }
          

          (of course in practice we would "shard" that counterLock, and there is probably some fancier optimization to do).

          Show
          Sylvain Lebresne added a comment - So we'd lock even before the commitlog entry? Yes. The SP.applyCounterMutationOnLeader would become something like (on just one counter to make it simpler): public AbstractWriteResponseHandler applyCounterMutationOnLeader(CFStore cfs, ByteBuffer key, CounterUpdateColumn increment) { counterLock.lock(); try { CounterColumn current = read(cfs, key, column.name()); CounterColumn newValue = current.add(increment); // This would return a new value where the current node shard has an incremented value and clock cfs.apply(newValue); } finally { counterLock.unlock(); } } (of course in practice we would "shard" that counterLock, and there is probably some fancier optimization to do).
          Hide
          Jonathan Ellis added a comment -
          Show
          Jonathan Ellis added a comment - Thoughts on that Nicolas Favre-Felix Aleksey Yeschenko ?
          Hide
          Aleksey Yeschenko added a comment -

          I think it's worth implementing whether or not counters-2.0 ever materialize.

          Show
          Aleksey Yeschenko added a comment - I think it's worth implementing whether or not counters-2.0 ever materialize.
          Hide
          Nicolas Favre-Felix added a comment -

          Jonathan Ellis sorry about the late answer.

          I am not convinced that locking before to the commit log entry is a great idea.
          First, it does not solve the retry problem, even if it does mitigate it somewhat. It allows batches to be retried internally but doesn't give any guarantee to the client in the case of a timeout before the batch is added to the batchlog.
          I implemented a read-modify-write (RMW) counter as a personal exercise last year and gave up on the idea because its performance was much lower than the current implementation. Cassandra currently allows concurrent updates to the same counter, with two clients applying deltas +x and +y, resulting in two replication reads that might both read (+x+y). This is not possible with a locked RMW and I remember observing many more timeouts on "hot" counters due to contention on this very coarse lock.
          My toy implementation did not even lock around the commit log entry, which would be even slower.

          It is true that the read in a RMW design is cheaper than the current read which might be touching several SSTables, but it's still very expensive and I'm worried that the internal retry safety wouldn't be enough to convince users that these slower counters are "better".

          What do you think?

          Show
          Nicolas Favre-Felix added a comment - Jonathan Ellis sorry about the late answer. I am not convinced that locking before to the commit log entry is a great idea. First, it does not solve the retry problem, even if it does mitigate it somewhat. It allows batches to be retried internally but doesn't give any guarantee to the client in the case of a timeout before the batch is added to the batchlog. I implemented a read-modify-write (RMW) counter as a personal exercise last year and gave up on the idea because its performance was much lower than the current implementation. Cassandra currently allows concurrent updates to the same counter, with two clients applying deltas +x and +y, resulting in two replication reads that might both read (+x+y). This is not possible with a locked RMW and I remember observing many more timeouts on "hot" counters due to contention on this very coarse lock. My toy implementation did not even lock around the commit log entry, which would be even slower. It is true that the read in a RMW design is cheaper than the current read which might be touching several SSTables, but it's still very expensive and I'm worried that the internal retry safety wouldn't be enough to convince users that these slower counters are "better". What do you think?
          Hide
          Aleksey Yeschenko added a comment -

          Nicolas Favre-Felix, regarding the comments on the gist:

          Having "merge cells" means that we could support both TTLs and "put" operations on counters, as long as the semantics are well defined.

          Can do "put" writes - true, this would become possible (and trivial - you just write a new merge cell). I don't see how this design would allow for TTLs in a sane way, though (or even insane ones).

          … or would we always require QUORUM writes? This could be too restrictive in multi-DC deployments where most people probably prefer to read and write at LOCAL_QUORUM. … When this set is common to all replicas (as is proposed above), we can only merge in QUORUM reads if we can guarantee QUORUM writes or must merge in reads at ALL otherwise.

          Yep, this is (too) restrictive, and it's a problem. Both options are too restrictive - we can't require QUORUM writes (esp. in multi-DC clusters), and requiring ALL for merges is also unreasonable, multi-DC or not. This alone makes the (initial) design in the gist unsuitable as-is, IMO.

          Now, regarding the first improvement suggestion (sharded replicas):

          if we assign a replica deterministically (based on the operation's UUID for example) we risk not being able to write to the counter if that particular replica goes down

          And this is unacceptable.

          if we assign a replica at random, retrying would lead to duplicates

          While not being a total deal-breaker, it still weakens the argument for the set-based design somewhat (enabling external idempotence/client retries is one of the justifications for the extra overhead).

          Also, while set collapse becomes a local problem now, what we can actually merge/collapse is just a subset of all updates, not all of them. The reads (and cleanup during compaction) logic (and the collapse logic itself) becomes more complicated now. Despite there being a logical 'owner', the increments (including the merge cells) still need to be replicated (for RF > 1). This means that each replica now has increments it 'owns' and increments it doesn't. So now you can't just read until the first merge cell you meet and stop there, because it's only responsible for a subset of all writes. You have to go on until you read the whole row or a merge cell for every shard there is, and accumulation itself becomes harder, because you have to track whether or not you've already read a shadowing merge cell for the subset a particular increment belongs to and either add it or ignore it.

          It becomes possible to read and write at LOCAL_QUORUM using this scheme. As any particular replica is the only source of truth for the subset of deltas that it was assigned, it does by definition read ALL of its deltas and can sum them up with no risk of inconsistency.

          You still need the write path special-casing of the current counters - that is, you have to send the request to just one replica, to determine the name of the resulting cell (uuid + owner) and it would replicate the cell to other replicas (this kills internal retry safety btw). If you determine the owner replica first and generate the cell name, and write it at LOCAL_QUORUM, and the write to that owner replica actually fails.. then you can't merge locally, even the owner's subset, you'd need at least LOCAL_QUORUM reads.

          So you can't do client retries safely, can't do internal retries safely, have to keep the special write path, and have reads that are pretty much as complex as the current implementation, with the added bonus of extra space and time overhead

          Regarding DC-level-sharding:

          True, you can now do merges at LOCAL_ALL (which we don't have, but that's not an issue) or at LOCAL_QUORUM, if you mandate writes at LOCAL_QUORUM. (If the writes are at regular QUORUM or anything other than LOCAL_QUORUM/ALL, then you still need LOCAL_ALL for the merges, because QUORUM doesn't guarantee LOCAL_QUORUM).

          A configuration flag per counter CF would configure whether we require W.QUORUM+R.QUORUM (default) or let clients write with any CL with the downside that deltas can only be merged at CL.ALL.

          As I wrote above, unless all the writes are at ALL or LOCAL_QUORUM, you can't merge strictly DC-locally So it's actually the choice between [W.LOCAL_QUORUM writes + R.LOCAL_QUORUM merges] or [W.WHATEVER writes + R.LOCAL_ALL merges]. And I'm not a fan of either, even though it's definitely better than requiring ALL.

          You also keep the complexity of sharded-partitions design - you can only merge the subset of the updates that belongs to the DC and thus have complicated reads/compaction logic. Oh, and safe idempotent client retries would require drivers to special-case counter-retries somehow - which means they'd have to look inside the query. To determine that it's a counter update and not follow the configured retry strategy.

          I do suspect that both of these designs are also sensitive to topology changes. Oh, and with both you can't do 'read-repair-merge' at most read consistency levels, and when you can, these are now also complicated by multiple subsets per counter.

          I believe that the space and time overheads are about the same as in Aleksey's design.

          Kinda, you now have to encode the owning node (in design #2) or the owning DC (in design #3) along with the update id, or take away some bytes from it.

          Now, there is another issue with the design in the gist and the two suggested improvements. We only have two mechanisms to trigger the collapse/merge - do it during reads (and only QUORUM reads for design #1 and at limited levels with #2 and #3) and by building candidate lists during compaction. I'm concerned about hot counters that can generate tons of updates, but not read, or read with the updates still within counter_write_window. The read latency becomes unpredictable with a set-based design. With a particularly hot counter it could even be possible to OOM during the reads. Writes would be faster than with the current implementation, true, but the unpredictable read latency bothers me a lot.

          I could also write up why the original Jonathan's suggestion wouldn't work either - "after gc_grace_seconds, rounded up to the nearest hour" does not magically remove the need to synchronize during the collapse, and implementing "preliminary merge" CF is far from trivial.

          To summarize, I don't think that set-based designs are viable. They can be made either simple, but with too unreasonable limitations, or nearly as complex as the current implementation, but limitations still too strict, and with added unpredictable read latencies/storage overhead on top.

          We should look into potential improvements to the current implementation instead:

          1. See if we can simplify it by switching to RMW without killing the speed of current counters (maybe it could be done with some intelligent caching or another not suggested yet optimization);
          2. Maybe switch to having two increment-only counters (G-Counters) for each shard, one for increments and one for decrements, if that proves beneficial.

          (If we implement both 1) and 2) then we'd have exactly the PN-Counter implementation described in the CRDT whitepaper, btw).

          Show
          Aleksey Yeschenko added a comment - Nicolas Favre-Felix , regarding the comments on the gist: Having "merge cells" means that we could support both TTLs and "put" operations on counters, as long as the semantics are well defined. Can do "put" writes - true, this would become possible (and trivial - you just write a new merge cell). I don't see how this design would allow for TTLs in a sane way, though (or even insane ones). … or would we always require QUORUM writes? This could be too restrictive in multi-DC deployments where most people probably prefer to read and write at LOCAL_QUORUM. … When this set is common to all replicas (as is proposed above), we can only merge in QUORUM reads if we can guarantee QUORUM writes or must merge in reads at ALL otherwise. Yep, this is (too) restrictive, and it's a problem. Both options are too restrictive - we can't require QUORUM writes (esp. in multi-DC clusters), and requiring ALL for merges is also unreasonable, multi-DC or not. This alone makes the (initial) design in the gist unsuitable as-is, IMO. Now, regarding the first improvement suggestion (sharded replicas): if we assign a replica deterministically (based on the operation's UUID for example) we risk not being able to write to the counter if that particular replica goes down And this is unacceptable. if we assign a replica at random, retrying would lead to duplicates While not being a total deal-breaker, it still weakens the argument for the set-based design somewhat (enabling external idempotence/client retries is one of the justifications for the extra overhead). Also, while set collapse becomes a local problem now, what we can actually merge/collapse is just a subset of all updates, not all of them. The reads (and cleanup during compaction) logic (and the collapse logic itself) becomes more complicated now. Despite there being a logical 'owner', the increments (including the merge cells) still need to be replicated (for RF > 1). This means that each replica now has increments it 'owns' and increments it doesn't. So now you can't just read until the first merge cell you meet and stop there, because it's only responsible for a subset of all writes. You have to go on until you read the whole row or a merge cell for every shard there is, and accumulation itself becomes harder, because you have to track whether or not you've already read a shadowing merge cell for the subset a particular increment belongs to and either add it or ignore it. It becomes possible to read and write at LOCAL_QUORUM using this scheme. As any particular replica is the only source of truth for the subset of deltas that it was assigned, it does by definition read ALL of its deltas and can sum them up with no risk of inconsistency. You still need the write path special-casing of the current counters - that is, you have to send the request to just one replica, to determine the name of the resulting cell (uuid + owner) and it would replicate the cell to other replicas (this kills internal retry safety btw). If you determine the owner replica first and generate the cell name, and write it at LOCAL_QUORUM, and the write to that owner replica actually fails.. then you can't merge locally, even the owner's subset, you'd need at least LOCAL_QUORUM reads. So you can't do client retries safely, can't do internal retries safely, have to keep the special write path, and have reads that are pretty much as complex as the current implementation, with the added bonus of extra space and time overhead Regarding DC-level-sharding: True, you can now do merges at LOCAL_ALL (which we don't have, but that's not an issue) or at LOCAL_QUORUM, if you mandate writes at LOCAL_QUORUM. (If the writes are at regular QUORUM or anything other than LOCAL_QUORUM/ALL, then you still need LOCAL_ALL for the merges, because QUORUM doesn't guarantee LOCAL_QUORUM). A configuration flag per counter CF would configure whether we require W.QUORUM+R.QUORUM (default) or let clients write with any CL with the downside that deltas can only be merged at CL.ALL. As I wrote above, unless all the writes are at ALL or LOCAL_QUORUM, you can't merge strictly DC-locally So it's actually the choice between [W.LOCAL_QUORUM writes + R.LOCAL_QUORUM merges] or [W.WHATEVER writes + R.LOCAL_ALL merges] . And I'm not a fan of either, even though it's definitely better than requiring ALL. You also keep the complexity of sharded-partitions design - you can only merge the subset of the updates that belongs to the DC and thus have complicated reads/compaction logic. Oh, and safe idempotent client retries would require drivers to special-case counter-retries somehow - which means they'd have to look inside the query. To determine that it's a counter update and not follow the configured retry strategy. I do suspect that both of these designs are also sensitive to topology changes. Oh, and with both you can't do 'read-repair-merge' at most read consistency levels, and when you can, these are now also complicated by multiple subsets per counter. I believe that the space and time overheads are about the same as in Aleksey's design. Kinda, you now have to encode the owning node (in design #2) or the owning DC (in design #3) along with the update id, or take away some bytes from it. Now, there is another issue with the design in the gist and the two suggested improvements. We only have two mechanisms to trigger the collapse/merge - do it during reads (and only QUORUM reads for design #1 and at limited levels with #2 and #3) and by building candidate lists during compaction. I'm concerned about hot counters that can generate tons of updates, but not read, or read with the updates still within counter_write_window. The read latency becomes unpredictable with a set-based design. With a particularly hot counter it could even be possible to OOM during the reads. Writes would be faster than with the current implementation, true, but the unpredictable read latency bothers me a lot. I could also write up why the original Jonathan's suggestion wouldn't work either - "after gc_grace_seconds, rounded up to the nearest hour" does not magically remove the need to synchronize during the collapse, and implementing "preliminary merge" CF is far from trivial. To summarize, I don't think that set-based designs are viable. They can be made either simple, but with too unreasonable limitations, or nearly as complex as the current implementation, but limitations still too strict, and with added unpredictable read latencies/storage overhead on top. We should look into potential improvements to the current implementation instead: 1. See if we can simplify it by switching to RMW without killing the speed of current counters (maybe it could be done with some intelligent caching or another not suggested yet optimization); 2. Maybe switch to having two increment-only counters (G-Counters) for each shard, one for increments and one for decrements, if that proves beneficial. (If we implement both 1) and 2) then we'd have exactly the PN-Counter implementation described in the CRDT whitepaper, btw).
          Hide
          Craig Hansen added a comment -

          I hate to pollute such a scholarly thread with this comment. But I've been researching all of the potential issues with cassandra counters for several days now, and I have to say I'm not too encouraged by everything I'm reading. I love how they work in development, however - just brilliant. While there is a lot of information relating to potential problems, there seems to be very little consensus regarding potential solutions.

          In my case, I'm just trying to figure out if they are "good enough" for my use cases, and whether or not there is any way to configure a cassandra cluster specifically to mitigate some of the risks of using counters. I'd be willing to create a specialized cluster for counter column families if the risks could be mitigated through configuration, various write consistency levels, etc.

          So at this point we're looking at using redis sets or cassandra counters intra-day just for speed, and summarizing transactional data to cassandra integer columns periodically for durability and historical accuracy. Any links to resources for such solutions would be greatly appreciated. Also any practical information relating to just how fragile they have proven to be would be helpful.

          The main thing is strategic: It would really help if I could get a sense of whether resolving counter issues is on the roadmap, or if they will remain in the "OK use them, but it's gonna be dumb" category.

          Show
          Craig Hansen added a comment - I hate to pollute such a scholarly thread with this comment. But I've been researching all of the potential issues with cassandra counters for several days now, and I have to say I'm not too encouraged by everything I'm reading. I love how they work in development, however - just brilliant. While there is a lot of information relating to potential problems, there seems to be very little consensus regarding potential solutions. In my case, I'm just trying to figure out if they are "good enough" for my use cases, and whether or not there is any way to configure a cassandra cluster specifically to mitigate some of the risks of using counters. I'd be willing to create a specialized cluster for counter column families if the risks could be mitigated through configuration, various write consistency levels, etc. So at this point we're looking at using redis sets or cassandra counters intra-day just for speed, and summarizing transactional data to cassandra integer columns periodically for durability and historical accuracy. Any links to resources for such solutions would be greatly appreciated. Also any practical information relating to just how fragile they have proven to be would be helpful. The main thing is strategic: It would really help if I could get a sense of whether resolving counter issues is on the roadmap, or if they will remain in the "OK use them, but it's gonna be dumb" category.
          Hide
          Nicolas Favre-Felix added a comment -

          I realize that there hasn't been much progress on this ticket during the summer. Following Jonathan Ellis's call for contributors on cassandra-dev, we (Acunu) are willing to dedicate more time and resources to this particular ticket to see it implemented in the near future.

          Aleksey Yeschenko, thanks for your comments and critique; I would like to summarize the above and hopefully move this discussion towards a design that we can agree on.

          As you pointed out, the set-based approach has some significant drawbacks:

          • The design is complex and the distributed problem of merging deltas pushes some of this complexity to the client side.
          • Reads are always more expensive as all replicas need to return the set of deltas they know about.
          • The unpredictability in the distribution of read latencies is a serious issue.

          The set-based design also has two advantages, speed (skipping a random read) and idempotence; I'd like to address these two below.

          Although the read on the write path is an anti-pattern for Cassandra, it serves a useful purpose and ensures that the data sent from the coordinator to the replicas is always already summed up which means that reads stay predictably fast. The read during replicate_on_write is also more expensive than the one in a RMW design since we have to look up all the counter shards in (potentially) many sstables, and we have to do this every single time we increment a counter. RMW counters in Riak and HBase only need to read the latest value for the counter, and can immediately write it back to an in-memory data structure. I would also like to point out that reads will only become cheaper in the future as solid state drives become more commonplace, and not by a small amount. Designing a complex and unreliable solution to address a disappearing problem would be a mistake.

          Idempotence is also a very useful property and the lack of safety for Cassandra counters is probably the first drawback that people mention when they describe Cassandra counters. Their reliability is questioned pretty often, and this criticism is not without merit. Sylvain Lebresne's suggestion of a retryable increment with a lock around the commit log entry is a great improvement over the current design, with the only limitation that the operation has to be commutative. I have written above that I had my doubts about the throughput of counters with such a lock, but I also recognize that there could be more optimisations as some suggested.

          Internally, I would suggest the sharded counter design remain similar with one shard per replica for all commutative replicated data types; implementing counters as PN-counters is also a great way to introduce more general data types, something I believe would be much welcomed by the Cassandra community.

          After this long discussion I believe the following design would be a viable alternative:

          • Remove the replicate_on_write option and switch to a locked/retryable RMW.
          • Provide a pluggable way to implement commutative types, with a PN-counter implementation.

          This solution could also be migrated to from an existing deployment; the proposed set-based design would be too complex for that.

          We are ready to work on the implementation as well in time for the target release of 2.1.

          Show
          Nicolas Favre-Felix added a comment - I realize that there hasn't been much progress on this ticket during the summer. Following Jonathan Ellis 's call for contributors on cassandra-dev, we (Acunu) are willing to dedicate more time and resources to this particular ticket to see it implemented in the near future. Aleksey Yeschenko , thanks for your comments and critique; I would like to summarize the above and hopefully move this discussion towards a design that we can agree on. As you pointed out, the set-based approach has some significant drawbacks: The design is complex and the distributed problem of merging deltas pushes some of this complexity to the client side. Reads are always more expensive as all replicas need to return the set of deltas they know about. The unpredictability in the distribution of read latencies is a serious issue. The set-based design also has two advantages, speed (skipping a random read) and idempotence; I'd like to address these two below. Although the read on the write path is an anti-pattern for Cassandra, it serves a useful purpose and ensures that the data sent from the coordinator to the replicas is always already summed up which means that reads stay predictably fast. The read during replicate_on_write is also more expensive than the one in a RMW design since we have to look up all the counter shards in (potentially) many sstables, and we have to do this every single time we increment a counter. RMW counters in Riak and HBase only need to read the latest value for the counter, and can immediately write it back to an in-memory data structure. I would also like to point out that reads will only become cheaper in the future as solid state drives become more commonplace, and not by a small amount. Designing a complex and unreliable solution to address a disappearing problem would be a mistake. Idempotence is also a very useful property and the lack of safety for Cassandra counters is probably the first drawback that people mention when they describe Cassandra counters. Their reliability is questioned pretty often, and this criticism is not without merit. Sylvain Lebresne 's suggestion of a retryable increment with a lock around the commit log entry is a great improvement over the current design, with the only limitation that the operation has to be commutative. I have written above that I had my doubts about the throughput of counters with such a lock, but I also recognize that there could be more optimisations as some suggested. Internally, I would suggest the sharded counter design remain similar with one shard per replica for all commutative replicated data types; implementing counters as PN-counters is also a great way to introduce more general data types, something I believe would be much welcomed by the Cassandra community. After this long discussion I believe the following design would be a viable alternative: Remove the replicate_on_write option and switch to a locked/retryable RMW. Provide a pluggable way to implement commutative types, with a PN-counter implementation. This solution could also be migrated to from an existing deployment; the proposed set-based design would be too complex for that. We are ready to work on the implementation as well in time for the target release of 2.1.
          Hide
          Aleksey Yeschenko added a comment -

          Nicolas Favre-Felix Right, the set-based approach has its advantages, but they don't outweigh the disadvantages, unfortunately.

          Remove the replicate_on_write option and switch to a locked/retryable RMW.

          This (+some optimizations) is what we are going for in 2.1, and will be ready soon-ish (returning back to this after one issue).

          Provide a pluggable way to implement commutative types, with a PN-counter implementation.

          Assuming that you are talking about a separate counter implementation from the primary one for now, I don't have an issue with it, I think. But the pluggable CRDTs are way out of scope for the 2.1 (that's supposed to be a stabilizing/tech debt release). Consider opening a separate 3.0 ticket for that, maybe?

          Show
          Aleksey Yeschenko added a comment - Nicolas Favre-Felix Right, the set-based approach has its advantages, but they don't outweigh the disadvantages, unfortunately. Remove the replicate_on_write option and switch to a locked/retryable RMW. This (+some optimizations) is what we are going for in 2.1, and will be ready soon-ish (returning back to this after one issue). Provide a pluggable way to implement commutative types, with a PN-counter implementation. Assuming that you are talking about a separate counter implementation from the primary one for now, I don't have an issue with it, I think. But the pluggable CRDTs are way out of scope for the 2.1 (that's supposed to be a stabilizing/tech debt release). Consider opening a separate 3.0 ticket for that, maybe?
          Hide
          Jonathan Ellis added a comment -

          FWIW, if we have code ready to ship for the 2.1 beta, I'd be okay with including it even if it's not part of the release "theme."

          Show
          Jonathan Ellis added a comment - FWIW, if we have code ready to ship for the 2.1 beta, I'd be okay with including it even if it's not part of the release "theme."
          Hide
          Ertio Lew added a comment -

          I also wish Counters could be a part of normal CFs only, so that we didn't need to do another separate query to fetch the counters data.

          Show
          Ertio Lew added a comment - I also wish Counters could be a part of normal CFs only, so that we didn't need to do another separate query to fetch the counters data.
          Hide
          Alain RODRIGUEZ added a comment -

          Remove the replicate_on_write option and switch to a locked/retryable RMW.

          What will this imply for the final user ? No more over-counts due to retry, no more read on write ? Any disadvantage switching to those locked RMW ?

          Counters are very useful, but also really hard to handle because of their specific characteristics. I hope that some of them will be removed here.

          This (+some optimizations) is what we are going for in 2.1, and will be ready soon-ish (returning back to this after one issue).

          How is this going ?

          Show
          Alain RODRIGUEZ added a comment - Remove the replicate_on_write option and switch to a locked/retryable RMW. What will this imply for the final user ? No more over-counts due to retry, no more read on write ? Any disadvantage switching to those locked RMW ? Counters are very useful, but also really hard to handle because of their specific characteristics. I hope that some of them will be removed here. This (+some optimizations) is what we are going for in 2.1, and will be ready soon-ish (returning back to this after one issue). How is this going ?
          Hide
          Aleksey Yeschenko added a comment -

          What will this imply for the final user ? No more over-counts due to retry, no more read on write ? Any disadvantage switching to those locked RMW ?

          No more overcounts b/c of internal replays. Read on write becomes explicit and non-optional, with replicate_on_write gone.

          How is this going ?

          In progress. The hard part is maintaining backward compatibility, and compatibility during a rolling upgrade. Working on this at the moment, and it's not trivial at all.

          Show
          Aleksey Yeschenko added a comment - What will this imply for the final user ? No more over-counts due to retry, no more read on write ? Any disadvantage switching to those locked RMW ? No more overcounts b/c of internal replays. Read on write becomes explicit and non-optional, with replicate_on_write gone. How is this going ? In progress. The hard part is maintaining backward compatibility, and compatibility during a rolling upgrade. Working on this at the moment, and it's not trivial at all.
          Hide
          Aleksey Yeschenko added a comment -

          So, this thread has become quite overloaded. Will summarize it shortly in this comment, and then move the actual work/discussion to CASSANDRA-6504.

          The initial idea for the new design (a new cell for each increment/decrement, then summing up on reads) and its variations didn't work out, for one reason or another. The largest problems are the required coordination for collapsing the increment history and difficulty in making it backward compatible with the current implementation.

          We decided to go for incremental improvements instead - namely, stop using 'local' shards altogether, and do explicit read-modify-write with just one shard type ('global') instead. See https://issues.apache.org/jira/browse/CASSANDRA-4775?focusedCommentId=13702042&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13702042 and the comments following it (plus https://issues.apache.org/jira/browse/CASSANDRA-4071?focusedCommentId=13483381&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13483381).

          This will fix, at minimum, the over counting issue with commit log replay, CASSANDRA-4417, and CASSANDRA-4071, and, together with some related improvements, drastically simplify counters code in general.

          Show
          Aleksey Yeschenko added a comment - So, this thread has become quite overloaded. Will summarize it shortly in this comment, and then move the actual work/discussion to CASSANDRA-6504 . The initial idea for the new design (a new cell for each increment/decrement, then summing up on reads) and its variations didn't work out, for one reason or another. The largest problems are the required coordination for collapsing the increment history and difficulty in making it backward compatible with the current implementation. We decided to go for incremental improvements instead - namely, stop using 'local' shards altogether, and do explicit read-modify-write with just one shard type ('global') instead. See https://issues.apache.org/jira/browse/CASSANDRA-4775?focusedCommentId=13702042&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13702042 and the comments following it (plus https://issues.apache.org/jira/browse/CASSANDRA-4071?focusedCommentId=13483381&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13483381 ). This will fix, at minimum , the over counting issue with commit log replay, CASSANDRA-4417 , and CASSANDRA-4071 , and, together with some related improvements, drastically simplify counters code in general.

            People

            • Assignee:
              Aleksey Yeschenko
              Reporter:
              Arya Goudarzi
            • Votes:
              26 Vote for this issue
              Watchers:
              55 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development