Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Duplicate
    • Fix Version/s: 0.7.1
    • Component/s: Core
    • Labels:
      None

      Description

      This could be described as a mix between CASSANDRA-1072 without clocks and CASSANDRA-1421.

      More details in the comment below.

      1. 0001-v2-Remove-IClock-from-internals.patch
        195 kB
        Sylvain Lebresne
      2. 0001-v4-Counters.patch
        140 kB
        Sylvain Lebresne
      3. 0002-v2-Counters.patch
        118 kB
        Sylvain Lebresne
      4. 0002-v4-thrift-changes.patch
        317 kB
        Sylvain Lebresne
      5. 0003-v2-Thrift-changes.patch
        300 kB
        Sylvain Lebresne
      6. ASF.LICENSE.NOT.GRANTED--0001-v3-Remove-IClock-from-internals.txt
        197 kB
        Jonathan Ellis
      7. ASF.LICENSE.NOT.GRANTED--0002-v3-Counters.txt
        124 kB
        Jonathan Ellis
      8. ASF.LICENSE.NOT.GRANTED--0003-v3-Thrift-changes.txt
        300 kB
        Jonathan Ellis
      9. marker_idea.txt
        4 kB
        Sylvain Lebresne

        Issue Links

          Activity

          Hide
          Jonathan Ellis added a comment -

          We're going to try to add the good parts of this patchset to the CASSANDRA-1072 approach already committed.

          Show
          Jonathan Ellis added a comment - We're going to try to add the good parts of this patchset to the CASSANDRA-1072 approach already committed.
          Hide
          Kelvin Kakugawa added a comment -

          Yes, I have the same concerns about post-stream repair. In 0.6, it was elegant, because we easily hooked into the AES-repair SST created pre-stream.

          We can deal w/ growing counters by using strategies from vector clocks--e.g. collapse counters, if not updated after a given time period. The potential problem w/ modifying the node ids on every range change is that it pushes the modifications related to maintenance into the broader cluster. I'm a little wary of this, though, because we'll have to reason about how this impacts the whole system.

          Show
          Kelvin Kakugawa added a comment - Yes, I have the same concerns about post-stream repair. In 0.6, it was elegant, because we easily hooked into the AES-repair SST created pre-stream. We can deal w/ growing counters by using strategies from vector clocks--e.g. collapse counters, if not updated after a given time period. The potential problem w/ modifying the node ids on every range change is that it pushes the modifications related to maintenance into the broader cluster. I'm a little wary of this, though, because we'll have to reason about how this impacts the whole system.
          Hide
          Sylvain Lebresne added a comment -

          Thinking a bit more on this unboostrap thing, I'm wondering if adding a fix
          post-streaming is the right solution. I fear it is too fragile. Because when a
          node receives ranges for which he was responsible before, we should make sure
          it don't get the same info from two nodes, or that he don't have still the
          range locally (because repair wasn't run for instance). Otherwise there is a
          risk to break the counter (counting twice the same value). Opinions ?

          Another solution I can see that I think wouldn't have this problem would be to
          make nodes regenerate a new node id (easy if we use uuids) each time the ranges
          they are responsible for changes (at least, when it grows). The only problem I
          see with that is that it will make the counters grow (more columns for a given
          counter) over time, because we'll accumulate column for node ids that are not
          used anymore. But I don't think it would be too hard to merge the columns for
          those forsaken node id. Does that make sense ?

          Show
          Sylvain Lebresne added a comment - Thinking a bit more on this unboostrap thing, I'm wondering if adding a fix post-streaming is the right solution. I fear it is too fragile. Because when a node receives ranges for which he was responsible before, we should make sure it don't get the same info from two nodes, or that he don't have still the range locally (because repair wasn't run for instance). Otherwise there is a risk to break the counter (counting twice the same value). Opinions ? Another solution I can see that I think wouldn't have this problem would be to make nodes regenerate a new node id (easy if we use uuids) each time the ranges they are responsible for changes (at least, when it grows). The only problem I see with that is that it will make the counters grow (more columns for a given counter) over time, because we'll accumulate column for node ids that are not used anymore. But I don't think it would be too hard to merge the columns for those forsaken node id. Does that make sense ?
          Hide
          Sylvain Lebresne added a comment -

          You are totally right. This case is not handled correctly and we'll have to distinguish between streaming operations indeed.

          Show
          Sylvain Lebresne added a comment - You are totally right. This case is not handled correctly and we'll have to distinguish between streaming operations indeed.
          Hide
          Kelvin Kakugawa added a comment -

          My assumption is that when you add a new node to the ring, the last replica in the replica set will eventually evict all of the keys that it's not responsible for. However, if that new node is subsequently removed, the last replica in the replica set will eventually have those keys it originally incremented streamed back to it.

          At this point, does your implementation modify the CC columns into LCC columns? If so, how does it distinguish this case?

          Show
          Kelvin Kakugawa added a comment - My assumption is that when you add a new node to the ring, the last replica in the replica set will eventually evict all of the keys that it's not responsible for. However, if that new node is subsequently removed, the last replica in the replica set will eventually have those keys it originally incremented streamed back to it. At this point, does your implementation modify the CC columns into LCC columns? If so, how does it distinguish this case?
          Hide
          Sylvain Lebresne added a comment -

          I'm not sure I fully understand your scenario. But thinking about it more too, I retract the fact that we should force regenerating the node id during bootstrap.
          If you decommission a node and boostrap it back, then as long you haven't done one of 1) deleting its sstable and not its system tables or 2) deleting its system tables (at least some of them) and not its sstables, then you are fine. You are fine because when bootstrapping the node back it'll reuse it's old node id saved in the system table.

          I'm not sure yet what is the best way to prevent 1) or 2), but I'm pretty sure the answer doesn't lie in qualifying streaming operations. My opinion is that it's probably fine to as people to not do 1) or 2), at least for starter. Maybe could we try to detect those case at startup (even though I'm not sure how to detect 1), but that's a fairly weird situation anyway) and refuse to start. In any case, I'm hopeful we can improve that later.

          Now maybe I haven't understand your point or I'm missing something, but I don't see problem with the scenario you've described above.

          Show
          Sylvain Lebresne added a comment - I'm not sure I fully understand your scenario. But thinking about it more too, I retract the fact that we should force regenerating the node id during bootstrap. If you decommission a node and boostrap it back, then as long you haven't done one of 1) deleting its sstable and not its system tables or 2) deleting its system tables (at least some of them) and not its sstables, then you are fine. You are fine because when bootstrapping the node back it'll reuse it's old node id saved in the system table. I'm not sure yet what is the best way to prevent 1) or 2), but I'm pretty sure the answer doesn't lie in qualifying streaming operations. My opinion is that it's probably fine to as people to not do 1) or 2), at least for starter. Maybe could we try to detect those case at startup (even though I'm not sure how to detect 1), but that's a fairly weird situation anyway) and refuse to start. In any case, I'm hopeful we can improve that later. Now maybe I haven't understand your point or I'm missing something, but I don't see problem with the scenario you've described above.
          Hide
          Kelvin Kakugawa added a comment -

          I thought about it more, I'm not sure that we can rely on generating new nodes, because it doesn't cover all cases.

          In particular, what if the ring topology changes? If we have a ring:
          A-B-C

          and, we add D, between B and C:
          A-B-D-C

          then, the replica set will be: A, B, D. However, if we remove D (for whatever reason), C will re-join the replica set. If this happened after an unbootstrap and major compaction on C, then it won't have it's previous SSTs.

          Show
          Kelvin Kakugawa added a comment - I thought about it more, I'm not sure that we can rely on generating new nodes, because it doesn't cover all cases. In particular, what if the ring topology changes? If we have a ring: A-B-C and, we add D, between B and C: A-B-D-C then, the replica set will be: A, B, D. However, if we remove D (for whatever reason), C will re-join the replica set. If this happened after an unbootstrap and major compaction on C, then it won't have it's previous SSTs.
          Hide
          Sylvain Lebresne added a comment -

          The limitation is during RESTORE_REPLICA_COUNT or BOOTSTRAP where a node that was previously in the replica set becomes part of the replica set, again. In this case, when CCs are streamed back to the, now re-added, host replica, it will discard those CCs that it should treat as LCCs.

          As you said, I think it is fine because of the fact the patches now use UUID as node ids. The idea is that you'll never add back someone with
          the same node id ever. That is, if you boostrap a node, it'll affect a brand new, different from every previously assigned node id (and maybe boostrap
          should just start to remove the node id from the system table if it exists to force this regeneration).
          If you add back a node previously removed, either the node still has its old sstables (and so, for his node id, he has the correct count), and thus
          you can add it back safely. Or you just force regenerating a new node id.

          So I believe that it means that whenever you lose a sstable on a given node (for any reason), you'll regenerate the node id. Now of course we
          should be careful to not regenerate the node id too often because it'll make the counter 'grow'. But I doubt it will be a problem.

          Another note on #1546, the digest creation for CC and LCC do not create the same hashes.

          True, that's the last bullet on my todo list (but my todo list may be lacking bullets). My plan is to special case the digest creation indeed.

          Show
          Sylvain Lebresne added a comment - The limitation is during RESTORE_REPLICA_COUNT or BOOTSTRAP where a node that was previously in the replica set becomes part of the replica set, again. In this case, when CCs are streamed back to the, now re-added, host replica, it will discard those CCs that it should treat as LCCs. As you said, I think it is fine because of the fact the patches now use UUID as node ids. The idea is that you'll never add back someone with the same node id ever. That is, if you boostrap a node, it'll affect a brand new, different from every previously assigned node id (and maybe boostrap should just start to remove the node id from the system table if it exists to force this regeneration). If you add back a node previously removed, either the node still has its old sstables (and so, for his node id, he has the correct count), and thus you can add it back safely. Or you just force regenerating a new node id. So I believe that it means that whenever you lose a sstable on a given node (for any reason), you'll regenerate the node id. Now of course we should be careful to not regenerate the node id too often because it'll make the counter 'grow'. But I doubt it will be a problem. Another note on #1546, the digest creation for CC and LCC do not create the same hashes. True, that's the last bullet on my todo list (but my todo list may be lacking bullets). My plan is to special case the digest creation indeed.
          Hide
          Kelvin Kakugawa added a comment -

          Another note on #1546, the digest creation for CC and LCC do not create the same hashes.

          During read repair and AES, the CC/LCC digest hashes should match. Otherwise, the LCC columns will always be repaired. If you special case digest creation, though, that would probably alleviate this issue.

          Show
          Kelvin Kakugawa added a comment - Another note on #1546, the digest creation for CC and LCC do not create the same hashes. During read repair and AES, the CC/LCC digest hashes should match. Otherwise, the LCC columns will always be repaired. If you special case digest creation, though, that would probably alleviate this issue.
          Hide
          Kelvin Kakugawa added a comment -

          I do like the less invasive approach to streaming that's implemented in #1546.

          However, it does have a limitation.

          The way ser/des is implemented, if you deserialize an LCC on a non-host replica, it'll become a CC, forever. It works well for AES, when the data file is deserialized post-stream.

          The limitation is during RESTORE_REPLICA_COUNT or BOOTSTRAP where a node that was previously in the replica set becomes part of the replica set, again. In this case, when CCs are streamed back to the, now re-added, host replica, it will discard those CCs that it should treat as LCCs.

          A rough solution would be to never re-add a previous node id / token to a ring, ever. However, I don't think this is feasible. A rough example:
          1) replica set: A,B,C
          2) new node: D (inserted between B and C)
          3) new replica set: A, B, D
          4) D, is then removed from the replica set (for whatever reason).
          5) when the replica set becomes A, B, and C, again, the above problem occurs.

          I'm not sure this can be accounted for w/o qualifying the types of streaming operations.

          Show
          Kelvin Kakugawa added a comment - I do like the less invasive approach to streaming that's implemented in #1546. However, it does have a limitation. The way ser/des is implemented, if you deserialize an LCC on a non-host replica, it'll become a CC, forever. It works well for AES, when the data file is deserialized post-stream. The limitation is during RESTORE_REPLICA_COUNT or BOOTSTRAP where a node that was previously in the replica set becomes part of the replica set, again. In this case, when CCs are streamed back to the, now re-added, host replica, it will discard those CCs that it should treat as LCCs. A rough solution would be to never re-add a previous node id / token to a ring, ever. However, I don't think this is feasible. A rough example: 1) replica set: A,B,C 2) new node: D (inserted between B and C) 3) new replica set: A, B, D 4) D, is then removed from the replica set (for whatever reason). 5) when the replica set becomes A, B, and C, again, the above problem occurs. I'm not sure this can be accounted for w/o qualifying the types of streaming operations.
          Hide
          Kelvin Kakugawa added a comment -

          And, yes, you're right that the core algorithm between #1072 and #1546 is the same. The differences revolve around implementation details.

          Show
          Kelvin Kakugawa added a comment - And, yes, you're right that the core algorithm between #1072 and #1546 is the same. The differences revolve around implementation details.
          Hide
          Kelvin Kakugawa added a comment -

          Using a more deterministic node id (UUID, instead of IP address) makes sense. It was mentioned earlier, in #580 or #1072, but not implemented.

          Forcing ser/des on SSTs streamed for AES sounds promising.

          Show
          Kelvin Kakugawa added a comment - Using a more deterministic node id (UUID, instead of IP address) makes sense. It was mentioned earlier, in #580 or #1072, but not implemented. Forcing ser/des on SSTs streamed for AES sounds promising.
          Hide
          Sylvain Lebresne added a comment -

          Attaching a version 4 set of patches. Apart from being rebased (again svn rev. 1023326), this mainly introduces two new things:

          1. It replaces the use of node's IP addresses for the counter's parts by a node UUID (generated once and saved into system tables). I think that relying on IP addresses is broken (and that goes for #1072 as well). Because, during columns reconciliation, a node merges the values for its ID but keeps the more recent value for other IDs, if we use IP addresses as ID, then changing the IP address of a node could result in loosing data. Using a node UUID solves this. It does have a small drawback: UUID's are 16 bytes long where IP addresses are 4 bytes long. But I'll pay that everyday compared to losing correctness. And actually, maybe we could use some custom unique ID instead of UUID if we want to get back some of the space. For instance, I'm pretty sure that an ID that would be generated by taking the IP address plus 4 bytes with the current time in sec would be fine. Finally, note that this node ID is not gossiped to other nodes in this patch, because this patch only need this information locally (this isn't true for #1072 for instance, because of the cleanContext logic).
          2. It fixes the streaming problem, by forcing a deserialization-reserialization after streaming (for counters CFs solely). Because of the change above, I don't think there is a need to distinguish the reason for streaming. Moreover, because what this serialization-deserialization does is just flipping a bit, it is safe to do this in-place on the streamed data file while rebuilding the index (contrarily to what #1072 does, where a new sstable is created in this process). Given this, the change is mostly localized to the SSTableWriter.Builder class.

          Now, because it isn't yet clear what we will decide for counters (that or something closer to #1072), allow me to sum up my stake on this. I'd like to distinguish two families of differences between this patch and 1072:

          1. There is the fact that this patch use a super column for the partitions of a given counter, while 1072 put those partitions in a context (a binary blob). Let me first stress that the main idea is the same in both case, they are not completely different idea, just two different implementation of the same idea (and not my idea btw), each having (I think) pros and cons. My opinion is that so far the approach of this ticket gives cleaner, simpler code, that mess less with the rest of Cassandra's code base (and I'm not saying it is a particularly objective argument). I like that and I think it's something important. However, the drawback of this approach is that, since we use super column for the counter, there is no native support for super columns of counters. And the context approach of 1072 doesn't have this drawback. Personally, I don't see that as a big drawback, because encoding super columns into standard columns is fairly simple. But that too, is a matter of opinion so I understand other may feel differently. At the end of the day, I simply hope we'll end up choosing what's best for cassandra, and that we'll do that choice soon.
          2. This patch introduces a bunch of things that are not in the currently attached patch of #1072 and that I believe are important : the marker strategy, support for all CLs, decrement support. It also fixes a few (important) bugs: a race condition in counter reads and the fragility of using IP addresses. I think we should keep the discussion on those separate from the discussion on the choice between counter-as-super-columns approach versus context-based approach, as they apply to both.
          Show
          Sylvain Lebresne added a comment - Attaching a version 4 set of patches. Apart from being rebased (again svn rev. 1023326), this mainly introduces two new things: It replaces the use of node's IP addresses for the counter's parts by a node UUID (generated once and saved into system tables). I think that relying on IP addresses is broken (and that goes for #1072 as well). Because, during columns reconciliation, a node merges the values for its ID but keeps the more recent value for other IDs, if we use IP addresses as ID, then changing the IP address of a node could result in loosing data . Using a node UUID solves this. It does have a small drawback: UUID's are 16 bytes long where IP addresses are 4 bytes long. But I'll pay that everyday compared to losing correctness. And actually, maybe we could use some custom unique ID instead of UUID if we want to get back some of the space. For instance, I'm pretty sure that an ID that would be generated by taking the IP address plus 4 bytes with the current time in sec would be fine. Finally, note that this node ID is not gossiped to other nodes in this patch, because this patch only need this information locally (this isn't true for #1072 for instance, because of the cleanContext logic). It fixes the streaming problem, by forcing a deserialization-reserialization after streaming (for counters CFs solely). Because of the change above, I don't think there is a need to distinguish the reason for streaming. Moreover, because what this serialization-deserialization does is just flipping a bit, it is safe to do this in-place on the streamed data file while rebuilding the index (contrarily to what #1072 does, where a new sstable is created in this process). Given this, the change is mostly localized to the SSTableWriter.Builder class. Now, because it isn't yet clear what we will decide for counters (that or something closer to #1072), allow me to sum up my stake on this. I'd like to distinguish two families of differences between this patch and 1072: There is the fact that this patch use a super column for the partitions of a given counter, while 1072 put those partitions in a context (a binary blob). Let me first stress that the main idea is the same in both case, they are not completely different idea, just two different implementation of the same idea (and not my idea btw), each having (I think) pros and cons. My opinion is that so far the approach of this ticket gives cleaner, simpler code, that mess less with the rest of Cassandra's code base (and I'm not saying it is a particularly objective argument). I like that and I think it's something important. However, the drawback of this approach is that, since we use super column for the counter, there is no native support for super columns of counters. And the context approach of 1072 doesn't have this drawback. Personally, I don't see that as a big drawback, because encoding super columns into standard columns is fairly simple. But that too, is a matter of opinion so I understand other may feel differently. At the end of the day, I simply hope we'll end up choosing what's best for cassandra, and that we'll do that choice soon. This patch introduces a bunch of things that are not in the currently attached patch of #1072 and that I believe are important : the marker strategy, support for all CLs, decrement support. It also fixes a few (important) bugs: a race condition in counter reads and the fragility of using IP addresses. I think we should keep the discussion on those separate from the discussion on the choice between counter-as-super-columns approach versus context-based approach, as they apply to both.
          Hide
          Jonathan Ellis added a comment -

          looks like there is consensus that we don't need IClock for either 1546 or 1072, so rebased and committed 0001.

          Show
          Jonathan Ellis added a comment - looks like there is consensus that we don't need IClock for either 1546 or 1072, so rebased and committed 0001.
          Hide
          Hudson added a comment -

          Integrated in Cassandra #563 (See https://hudson.apache.org/hudson/job/Cassandra/563/)

          Show
          Hudson added a comment - Integrated in Cassandra #563 (See https://hudson.apache.org/hudson/job/Cassandra/563/ )
          Hide
          Kelvin Kakugawa added a comment -

          zhu,

          (1), maintain a local cache, is the currently recommended deployment strategy.

          (2) would limit the scalability of #1072, because a cache miss would be severely penalized.

          (3) may be possible; however, it would require a lot of bookkeeping between the replicas. An interesting approach would be to require all updates on a given partition to have increasing timestamps. So, when an update is propagated to the other replicas via read repair, the timeframe of the MT / SST of the update would be included. This would allow partial timeframes (represented by the MT / SST) to be repaired. The real win would be that it would make a repair-on-write code path cheaper. However, it would make the read paths (read, read repair) more expensive.

          Show
          Kelvin Kakugawa added a comment - zhu, (1), maintain a local cache, is the currently recommended deployment strategy. (2) would limit the scalability of #1072, because a cache miss would be severely penalized. (3) may be possible; however, it would require a lot of bookkeeping between the replicas. An interesting approach would be to require all updates on a given partition to have increasing timestamps. So, when an update is propagated to the other replicas via read repair, the timeframe of the MT / SST of the update would be included. This would allow partial timeframes (represented by the MT / SST) to be repaired. The real win would be that it would make a repair-on-write code path cheaper. However, it would make the read paths (read, read repair) more expensive.
          Hide
          Jonathan Ellis added a comment -

          Looks like it will be 0.7.1 before we have something everyone is happy with. In the meantime, are we okay with committing the Remove-IClock-from-internals patch?

          Show
          Jonathan Ellis added a comment - Looks like it will be 0.7.1 before we have something everyone is happy with. In the meantime, are we okay with committing the Remove-IClock-from-internals patch?
          Hide
          zhu han added a comment - - edited

          Kelvin,

          Thanks for clarification. It is a good idea that the lead replica only aggregates counter in MT on CL.ONE. IIRC, what makes things more complicated is CL.QUORUM or CL.ALL. #1546 is intended to support these CL levels.

          CL.QUORUm and CL.ALL requires the lead replicas to propagate the sum of local columns to other replicas, which need iteration over all SST and MT in current implementation. As Sylvain pointed out:

          So, to answer the question about serializing the writes, there is no need (and
          I believe it's a good thing performance-wise). When a leader receives an
          update, it doesn't read-then-write. It writes-then-read. And as parts of the
          read, the newly inserted LocalCounterColumn will be 'merged' with the other,
          already present LocalCounterColumn and yield the actual value of the column,
          without the risk of loosing an increment.

          The read here is the sum up of local columns on write path, instead of normal read path.

          There are several possible solutions:
          1) Maintain a cache of sums of local column. We can reuse the row cache mechanism or build a specified cache for counters, as Sylvain said.

          2) Take the great idea from #1072, do read-then-write, aggregates all local columns over SSTables only when it's absent from MT. Otherwise, aggregate local columns with presented local column in MT. So that, write path hits SSTable when the local columns is absent from MT, and UUID marker is disabled.

          3) As #1072, on write path, only propagates sums of local columns in MT to remote replicas, and gives a different column name to remote replicas, each time when the local column is absent from MT. The column name is recorded in MT, either. Otherwise, reads the column name from MT and give it to remote replicas again. On read path, iterate all SSTables to get the latest value of local column and remote columns, add them all. During compaction, we can compact the remote columns from the same replica as one column to limit the number of remote columns.

          3) should be as cheap as #1072 and normal cassandra write path. UUID marker columns can take the same way as 3) to be aggregated. But it's not easy to implement and a bit ugly...

          Show
          zhu han added a comment - - edited Kelvin, Thanks for clarification. It is a good idea that the lead replica only aggregates counter in MT on CL.ONE. IIRC, what makes things more complicated is CL.QUORUM or CL.ALL. #1546 is intended to support these CL levels. CL.QUORUm and CL.ALL requires the lead replicas to propagate the sum of local columns to other replicas, which need iteration over all SST and MT in current implementation. As Sylvain pointed out: So, to answer the question about serializing the writes, there is no need (and I believe it's a good thing performance-wise). When a leader receives an update, it doesn't read-then-write. It writes-then-read. And as parts of the read, the newly inserted LocalCounterColumn will be 'merged' with the other, already present LocalCounterColumn and yield the actual value of the column, without the risk of loosing an increment. The read here is the sum up of local columns on write path, instead of normal read path. There are several possible solutions: 1) Maintain a cache of sums of local column. We can reuse the row cache mechanism or build a specified cache for counters, as Sylvain said. 2) Take the great idea from #1072, do read-then-write, aggregates all local columns over SSTables only when it's absent from MT. Otherwise, aggregate local columns with presented local column in MT. So that, write path hits SSTable when the local columns is absent from MT, and UUID marker is disabled. 3) As #1072, on write path, only propagates sums of local columns in MT to remote replicas, and gives a different column name to remote replicas, each time when the local column is absent from MT. The column name is recorded in MT, either. Otherwise, reads the column name from MT and give it to remote replicas again. On read path, iterate all SSTables to get the latest value of local column and remote columns, add them all. During compaction, we can compact the remote columns from the same replica as one column to limit the number of remote columns. 3) should be as cheap as #1072 and normal cassandra write path. UUID marker columns can take the same way as 3) to be aggregated. But it's not easy to implement and a bit ugly...
          Hide
          Kelvin Kakugawa added a comment -

          zhu,

          np, let me clarify. The way both #1546 and #1072 work is that counter updates are only aggregated in the MT. You can imagine that each SST on-disk represents the aggregated updates for those keys when they were MTs (in the past)--snapshots in time, if you will. So, on a counter read, the counts for a given key are aggregated across all the relevant SSTs on disk. On a related note, during compaction, counts across the compacted SSTs are aggregated into the resultant SST. So, writes are cheap.

          You may want to reference my presentation on distributed counters:
          http://www.slideshare.net/kakugawa/distributed-counters-in-cassandra-cassandra-summit-2010

          Show
          Kelvin Kakugawa added a comment - zhu, np, let me clarify. The way both #1546 and #1072 work is that counter updates are only aggregated in the MT. You can imagine that each SST on-disk represents the aggregated updates for those keys when they were MTs (in the past)--snapshots in time, if you will. So, on a counter read, the counts for a given key are aggregated across all the relevant SSTs on disk. On a related note, during compaction, counts across the compacted SSTs are aggregated into the resultant SST. So, writes are cheap. You may want to reference my presentation on distributed counters: http://www.slideshare.net/kakugawa/distributed-counters-in-cassandra-cassandra-summit-2010
          Hide
          zhu han added a comment -

          I wasn't clear in the last comment--the comments were UUID-specific.

          Kelvin, I was not clear that your questions are UUID-specific. So my question is generic. No matter UUID is used or not, IMHO, there is read on the write path. You should get the latest value of local columns (read), and then apply the increment to it, write the local column to memtable. Sylvain gave detailed explanation here.

          Although the read of local column is not protected by synchronized block, it may trigger disk seek and downgrade the performance on write path. Does #1072 has the same problem?

          Show
          zhu han added a comment - I wasn't clear in the last comment--the comments were UUID-specific. Kelvin, I was not clear that your questions are UUID-specific. So my question is generic. No matter UUID is used or not, IMHO, there is read on the write path. You should get the latest value of local columns (read), and then apply the increment to it, write the local column to memtable. Sylvain gave detailed explanation here . Although the read of local column is not protected by synchronized block, it may trigger disk seek and downgrade the performance on write path. Does #1072 has the same problem?
          Hide
          Kelvin Kakugawa added a comment -

          zhu,

          I wasn't clear in the last comment--the comments were UUID-specific.

          Sylvain,

          wrt (1), I may have better phrased it as writes are not as efficient / cheap as the default write path, because they require a read. And, bloom filters aren't perfect. Granted, if you want UUIDs, then it's prob a reasonable price to pay.

          Show
          Kelvin Kakugawa added a comment - zhu, I wasn't clear in the last comment--the comments were UUID-specific. Sylvain, wrt (1), I may have better phrased it as writes are not as efficient / cheap as the default write path, because they require a read. And, bloom filters aren't perfect. Granted, if you want UUIDs, then it's prob a reasonable price to pay.
          Hide
          Sylvain Lebresne added a comment -

          I read through your note on the v3 marker strategy. It sounds reasonable and does address the concerns that I raised, earlier. I think it's worth it for you to highlight the potential drawbacks. The three that seem to stick out the most are:

          Sure, I've tried to highlight them along the way but it's good to sum them up. And I mostly agree with your list (that is, I agree with 2) and 3), not so much with 1), at least not with what the wording suggests). Allow me a few comments:

          1) it requires 1-2 reads in a synchronized code path, which doesn't gel w/ cassandra's write-optimized design,

          Yes, there is a read in a synchronized code path, but I strongly disagree with the last part and overall I think this is by far the least important drawback of the list. This read in a synchronized code path (always 1 btw) happens only when we try to repair a replayed update. First and foremost, unless a write fails (timeout or disconnection from coordinator) and you replay it, you will never exercise that code path. Again, unless you replay an update, there is virtually no cost at all (as far as latency is involved at least). If you replay an update, then yes, you may have to pay that cost. But when I think of cassandra's write-optimized design, I also include the fact that you won't lost updates no matter what. Provided that the goal of the marker strategy is to ensure that, I would say that it actually fits very well the cassandra's write-optimized design.

          Also, from the technical side, the synchronized is not a global lock. The current implementation uses a small pool of locks to avoid too much contention while still avoiding allocating too much object. I'm pretty sure contention won't be a problem as is, but if we observe contention, we can always increase slightly this pool lock until contention disappear. The locks are needed for correctness, but I'm pretty sure they won't be costly.

          2) over-counts are repaired in an eventually consistent manner, and

          True, and that's a bit unfortunate. Sadly I really don't see how we can avoid this. I have hopes however that we can optimize that eventuality to be as short as possible (and as illustrated by the system test of the patch (that will never fail even in the absence of any delay between the replay and the read), in some situation it's not even eventual).

          3) UUIDs are only maintained for a configurable TTL.

          Yes. For now this TTL is gc_grace_seconds but I think it should be another configurable time for more flexibility. And actually, you can keep the uuids forever if you want. But I admit, to know the best TTL to choose could be a bit tricky and here too, I hope this can be optimized later.

          The above aren't showstoppers. However, anyone interested in using UUIDs to track updates should be aware of their limitations + trade-offs.

          Agreed and I'll be happy to write that documentation in time. And that doc will probably start by explaining the drawbacks of not using uuids

          Show
          Sylvain Lebresne added a comment - I read through your note on the v3 marker strategy. It sounds reasonable and does address the concerns that I raised, earlier. I think it's worth it for you to highlight the potential drawbacks. The three that seem to stick out the most are: Sure, I've tried to highlight them along the way but it's good to sum them up. And I mostly agree with your list (that is, I agree with 2) and 3), not so much with 1), at least not with what the wording suggests). Allow me a few comments: 1) it requires 1-2 reads in a synchronized code path, which doesn't gel w/ cassandra's write-optimized design, Yes, there is a read in a synchronized code path, but I strongly disagree with the last part and overall I think this is by far the least important drawback of the list. This read in a synchronized code path (always 1 btw) happens only when we try to repair a replayed update. First and foremost, unless a write fails (timeout or disconnection from coordinator) and you replay it, you will never exercise that code path. Again, unless you replay an update, there is virtually no cost at all (as far as latency is involved at least). If you replay an update, then yes, you may have to pay that cost. But when I think of cassandra's write-optimized design, I also include the fact that you won't lost updates no matter what. Provided that the goal of the marker strategy is to ensure that, I would say that it actually fits very well the cassandra's write-optimized design. Also, from the technical side, the synchronized is not a global lock. The current implementation uses a small pool of locks to avoid too much contention while still avoiding allocating too much object. I'm pretty sure contention won't be a problem as is, but if we observe contention, we can always increase slightly this pool lock until contention disappear. The locks are needed for correctness, but I'm pretty sure they won't be costly. 2) over-counts are repaired in an eventually consistent manner, and True, and that's a bit unfortunate. Sadly I really don't see how we can avoid this. I have hopes however that we can optimize that eventuality to be as short as possible (and as illustrated by the system test of the patch (that will never fail even in the absence of any delay between the replay and the read), in some situation it's not even eventual). 3) UUIDs are only maintained for a configurable TTL. Yes. For now this TTL is gc_grace_seconds but I think it should be another configurable time for more flexibility. And actually, you can keep the uuids forever if you want. But I admit, to know the best TTL to choose could be a bit tricky and here too, I hope this can be optimized later. The above aren't showstoppers. However, anyone interested in using UUIDs to track updates should be aware of their limitations + trade-offs. Agreed and I'll be happy to write that documentation in time. And that doc will probably start by explaining the drawbacks of not using uuids
          Hide
          zhu han added a comment -

          I think it's worth it for you to highlight the potential drawbacks. The three that seem to stick out the most are:
          1) it requires 1-2 reads in a synchronized code path, which doesn't gel w/ cassandra's write-optimized design,

          Kelvin, does #1072 suffer from the same drawback? If it does not, can the same tactics be used by #1546?

          Show
          zhu han added a comment - I think it's worth it for you to highlight the potential drawbacks. The three that seem to stick out the most are: 1) it requires 1-2 reads in a synchronized code path, which doesn't gel w/ cassandra's write-optimized design, Kelvin, does #1072 suffer from the same drawback? If it does not, can the same tactics be used by #1546?
          Hide
          Kelvin Kakugawa added a comment -

          Yes, I agree that my arguments for comparative space efficiency are partly speculative. Eventually, I may directly manage the memory for those byte arrays. And, as you point out, there are parts of #1072 that still can be improved.

          Splitting out the partitioned counter into separate columns allows less data to be replicated. You're right, on write, it requires only transferring the leader's updated count. #1072 could be modified to do this, as well. As you point out, it's only the leader's updated count that matters, and a #1072 column will create the same resultant column on receipt of just the leader's updated count.

          cleanContext may benefit from another audit of the code and strategy. I agree w/ you that it's not a strategy in the code that I'm the most proud of.

          The AES changes do need to be more invasive than it would first appear. ÅES streaming directly streams the data file over; it does not deserialize or transform the data file. It only rebuilds the index and filter files--the data file is not touched. If you have an approach to refactor #1072's AES code path, I would be more than happy to adopt your approach. Since, the transformation that #1072 has to perform is abstractly equivalent to the transformation that #1546 needs.

          Show
          Kelvin Kakugawa added a comment - Yes, I agree that my arguments for comparative space efficiency are partly speculative. Eventually, I may directly manage the memory for those byte arrays. And, as you point out, there are parts of #1072 that still can be improved. Splitting out the partitioned counter into separate columns allows less data to be replicated. You're right, on write, it requires only transferring the leader's updated count. #1072 could be modified to do this, as well. As you point out, it's only the leader's updated count that matters, and a #1072 column will create the same resultant column on receipt of just the leader's updated count. cleanContext may benefit from another audit of the code and strategy. I agree w/ you that it's not a strategy in the code that I'm the most proud of. The AES changes do need to be more invasive than it would first appear. ÅES streaming directly streams the data file over; it does not deserialize or transform the data file. It only rebuilds the index and filter files--the data file is not touched. If you have an approach to refactor #1072's AES code path, I would be more than happy to adopt your approach. Since, the transformation that #1072 has to perform is abstractly equivalent to the transformation that #1546 needs.
          Hide
          Sylvain Lebresne added a comment -

          The complexity of the context-based logic is to be space efficient. In practice, cassandra nodes are typically I/O bound, not CPU bound.

          True in current patches. But provided you add back timestamp to contexts for decrement and provided my math doesn't suck too much, the disk overhead for the logic of 1546 is of 3 bytes * (#replicas-1). Because the overhead of a column is 1 byte for the 'flags' (deleted, expiring) and 2 bytes for each value to record it's length. That's 3 bytes out of 20 you have to record for each replica (name (the host ip) + value + timestamp). As it turns out, since for counter columns, we know the size of the value, it's super easy to optimize out 2 of those 3 bytes (I'll be happy to add it). To be fair, there is the overhead of using super columns, but overall I'm not totally convinced by this argument.

          I'd like to add that the splitting of the context in multiple columns of 1546 offers some optimisation opportunity. After the write on the leader, when we read the value to replicate it to other nodes. In 1546, we only read the value for the leader parts of the counter (since this is what has been updated). This will save I/Os and network bandwidth. Not saying this is a crucial thing, just saying that it seems not so clear to me that context-based logic is intrinsically an I/O saver.

          So, I'm not confident in the statement that #1546 is clearly faster than #1072. As a rule of thumb, it's better to directly manage your memory usage, as opposed to relying on the runtime's GC.

          I was merely talking about that the cleanContext() logic. But I'll admit, saying this parts is faster in 1546 doesn't really matters much, that was a stupid argument. It remains that I don't like this cleanContext logic. I find it fragile (as in, hard to maintain) and not very clean, in that it relies on the fact that nodes have to clean up the columns before sending them over to other nodes. I wouldn't say that this cleanContext logic is a killer for the context-based approach but I don't like it.

          As for the creation of objects, you may be right. But I'm not even sure. The byte array manipulations of 1072 does create a bunch of temporary byte arrays that have to be garbaged out. So like you, I'm not very confident on any statement related to whether the context-based logic is faster or slower than the counter-as-supercolumns one of #1546.

          #1546 will need to special case AES-related streaming, as well

          That is true. Which reminded me of a question for you Kelvin. The changes for AES repair in #1072 are fairly extensive and I kind of wonder why ? I expect the change to fix streaming in #1546 to be a few lines: that is, when you rebuild the sstable after streaming, you'll deserialize and re-serialize the rows instead of just copying the bytes directly. I don't see a reason to differentiate the reason for streaming for instance.

          Show
          Sylvain Lebresne added a comment - The complexity of the context-based logic is to be space efficient. In practice, cassandra nodes are typically I/O bound, not CPU bound. True in current patches. But provided you add back timestamp to contexts for decrement and provided my math doesn't suck too much, the disk overhead for the logic of 1546 is of 3 bytes * (#replicas-1). Because the overhead of a column is 1 byte for the 'flags' (deleted, expiring) and 2 bytes for each value to record it's length. That's 3 bytes out of 20 you have to record for each replica (name (the host ip) + value + timestamp). As it turns out, since for counter columns, we know the size of the value, it's super easy to optimize out 2 of those 3 bytes (I'll be happy to add it). To be fair, there is the overhead of using super columns, but overall I'm not totally convinced by this argument. I'd like to add that the splitting of the context in multiple columns of 1546 offers some optimisation opportunity. After the write on the leader, when we read the value to replicate it to other nodes. In 1546, we only read the value for the leader parts of the counter (since this is what has been updated). This will save I/Os and network bandwidth. Not saying this is a crucial thing, just saying that it seems not so clear to me that context-based logic is intrinsically an I/O saver. So, I'm not confident in the statement that #1546 is clearly faster than #1072. As a rule of thumb, it's better to directly manage your memory usage, as opposed to relying on the runtime's GC. I was merely talking about that the cleanContext() logic. But I'll admit, saying this parts is faster in 1546 doesn't really matters much, that was a stupid argument. It remains that I don't like this cleanContext logic. I find it fragile (as in, hard to maintain) and not very clean, in that it relies on the fact that nodes have to clean up the columns before sending them over to other nodes. I wouldn't say that this cleanContext logic is a killer for the context-based approach but I don't like it. As for the creation of objects, you may be right. But I'm not even sure. The byte array manipulations of 1072 does create a bunch of temporary byte arrays that have to be garbaged out. So like you, I'm not very confident on any statement related to whether the context-based logic is faster or slower than the counter-as-supercolumns one of #1546. #1546 will need to special case AES-related streaming, as well That is true. Which reminded me of a question for you Kelvin. The changes for AES repair in #1072 are fairly extensive and I kind of wonder why ? I expect the change to fix streaming in #1546 to be a few lines: that is, when you rebuild the sstable after streaming, you'll deserialize and re-serialize the rows instead of just copying the bytes directly. I don't see a reason to differentiate the reason for streaming for instance.
          Hide
          Kelvin Kakugawa added a comment - - edited

          I read through your note on the v3 marker strategy. It sounds reasonable and does address the concerns that I raised, earlier.

          I think it's worth it for you to highlight the potential drawbacks. The three that seem to stick out the most are:
          1) it requires 1-2 reads in a synchronized code path, which doesn't gel w/ cassandra's write-optimized design,
          2) over-counts are repaired in an eventually consistent manner, and
          3) UUIDs are only maintained for a configurable TTL.

          The above aren't showstoppers. However, anyone interested in using UUIDs to track updates should be aware of their limitations + trade-offs.

          Show
          Kelvin Kakugawa added a comment - - edited I read through your note on the v3 marker strategy. It sounds reasonable and does address the concerns that I raised, earlier. I think it's worth it for you to highlight the potential drawbacks. The three that seem to stick out the most are: 1) it requires 1-2 reads in a synchronized code path, which doesn't gel w/ cassandra's write-optimized design, 2) over-counts are repaired in an eventually consistent manner, and 3) UUIDs are only maintained for a configurable TTL. The above aren't showstoppers. However, anyone interested in using UUIDs to track updates should be aware of their limitations + trade-offs.
          Hide
          Jonathan Ellis added a comment -

          took the liberty of rebasing v3 patches here.

          Show
          Jonathan Ellis added a comment - took the liberty of rebasing v3 patches here.
          Hide
          Kelvin Kakugawa added a comment -

          Just a quick clarification:

          • #1072 can natively support decrement (the abstract strategy is the same as #1546)
          • #1546 will need to special case AES-related streaming, as well

          However, #1072 does have a special case for read repair that #1546 does not need.

          Show
          Kelvin Kakugawa added a comment - Just a quick clarification: #1072 can natively support decrement (the abstract strategy is the same as #1546) #1546 will need to special case AES-related streaming, as well However, #1072 does have a special case for read repair that #1546 does not need.
          Hide
          Jonathan Ellis added a comment -

          While this is not untrue, the code you are mentioning is simpler than the complex counter manipulation and I think, has a strong cassandra flavor.

          I agree. I think we've entered the realm of subjectivity here, so as my personal opinion, 1072 feels like a separate database that happens to use parts of cassandra internals. I feel comfortable committing to maintain 1546 in a way that I never did with 1072, in large part because there are so many fewer special cases that need to be handled (AES, streaming, decrement, )

          The other challenge is that the existing cassandra data model is co-opted, so the end user has to learn a modified data model that is counter-specific. I don't believe this is the right encapsulation.

          Again subjectively, I think leveraging the existing data model under the hood is a Good Thing. I believe this is directly responsible for the relative lack of need for special case handling with this approach.

          I don't understand the end-user/encapsulation argument. I may be missing something again, but either way we are talking about providing add() and get_counter() methods or their equivalent, no? The end user just deals with Counter structs and doesn't have to care about the implementation.

          Can 1546 meet Twitter's business needs? IMO that is the most important question.

          Show
          Jonathan Ellis added a comment - While this is not untrue, the code you are mentioning is simpler than the complex counter manipulation and I think, has a strong cassandra flavor. I agree. I think we've entered the realm of subjectivity here, so as my personal opinion, 1072 feels like a separate database that happens to use parts of cassandra internals. I feel comfortable committing to maintain 1546 in a way that I never did with 1072, in large part because there are so many fewer special cases that need to be handled (AES, streaming, decrement, ) The other challenge is that the existing cassandra data model is co-opted, so the end user has to learn a modified data model that is counter-specific. I don't believe this is the right encapsulation. Again subjectively, I think leveraging the existing data model under the hood is a Good Thing. I believe this is directly responsible for the relative lack of need for special case handling with this approach. I don't understand the end-user/encapsulation argument. I may be missing something again, but either way we are talking about providing add() and get_counter() methods or their equivalent, no? The end user just deals with Counter structs and doesn't have to care about the implementation. Can 1546 meet Twitter's business needs? IMO that is the most important question.
          Hide
          Kelvin Kakugawa added a comment -

          I have no issues w/ a counter-specific API.

          UUIDs are interesting enough that they're worth investigating.

          The complexity of the context-based logic is to be space efficient. In practice, cassandra nodes are typically I/O bound, not CPU bound. Beyond that, a reasonable amount of effort was invested to optimize the implementation--a modified quicksort and dynamic programming for set comparison. ntm, it leverages byte arrays, instead of creating more objects for GC. So, I'm not confident in the statement that #1546 is clearly faster than #1072. As a rule of thumb, it's better to directly manage your memory usage, as opposed to relying on the runtime's GC.

          Supporting decrements, natively, is not complex. The original implementation of #1072 against 0.6 re-used #580's vector clock tuples. So, it could track the latest count (for a partition) by the timestamp of the tuple. The tuple format was modified to remove the timestamp, because I wanted to cut down on the size of the partitioned counter. However, in retrospect, it definitely is useful for decrements. I wrote and deployed a modified version of #1072 that widens the tuple, again, w/ a logical clock--# of operations computed, to support decrements.

          I'll look into refactoring the context-based logic for partitioned counters. I have no problems supporting your UUID implementation as an optional code path.

          Show
          Kelvin Kakugawa added a comment - I have no issues w/ a counter-specific API. UUIDs are interesting enough that they're worth investigating. The complexity of the context-based logic is to be space efficient. In practice, cassandra nodes are typically I/O bound, not CPU bound. Beyond that, a reasonable amount of effort was invested to optimize the implementation--a modified quicksort and dynamic programming for set comparison. ntm, it leverages byte arrays, instead of creating more objects for GC. So, I'm not confident in the statement that #1546 is clearly faster than #1072. As a rule of thumb, it's better to directly manage your memory usage, as opposed to relying on the runtime's GC. Supporting decrements, natively, is not complex. The original implementation of #1072 against 0.6 re-used #580's vector clock tuples. So, it could track the latest count (for a partition) by the timestamp of the tuple. The tuple format was modified to remove the timestamp, because I wanted to cut down on the size of the partitioned counter. However, in retrospect, it definitely is useful for decrements. I wrote and deployed a modified version of #1072 that widens the tuple, again, w/ a logical clock--# of operations computed, to support decrements. I'll look into refactoring the context-based logic for partitioned counters. I have no problems supporting your UUID implementation as an optional code path.
          Hide
          Sylvain Lebresne added a comment -

          @Johan: the last patch set (v3) is rebased on trunk from now (svn rev 1004704).

          Putting that on github would be a good idea, I'll try to put that in place at some point. But I'm a bit overloaded right now so I don't know when.

          Show
          Sylvain Lebresne added a comment - @Johan: the last patch set (v3) is rebased on trunk from now (svn rev 1004704). Putting that on github would be a good idea, I'll try to put that in place at some point. But I'm a bit overloaded right now so I don't know when.
          Hide
          Sylvain Lebresne added a comment -

          Kelvin,

          Thanks for your comment.

          The counter storage strategy moves the complexity away from the column class
          and into the surrounding mutation and read logic. The problem this causes is
          that to understand a column-specific property--it's partitioned value, we have
          to read code in mutation and read commands and put together the logic in
          conjunction w/ how it interacts w/ the cassandra data model, at large.

          While this is not untrue, the code you are mentioning is simpler than the
          complex counter manipulation and I think, has a strong cassandra flavor. I
          don't think that having the code slightly more split is a big deal (but I
          could be wrong as I'm probably not objective on this). Imho, as far as code
          complexity is involved, I'm still on the side of the actual patch (but that's
          clearly debatable).

          The other challenge is that the existing cassandra data model is co-opted, so
          the end user has to learn a modified data model that is counter-specific. I
          don't believe this is the right encapsulation.

          It's true. However I would mitigate that with two points:

          1. The API for counters in the last patch is pretty close to the standard
            one. The main difference being that there is no super columns support. The
            other difference is the fact that there is a optional uuid on inserts.
            But I don't expect anyone familiar with the Cassandra data model to be
            thrown off balance by the counter API of this patch.
          2. I think that, in any case, it is a good idea to have a specific API for
            counters, at the very least for insertions. First because we need it for
            uuids (and I do believe on the usefulness of uuids, see below). Second,
            cause the guarantees on counter inserts are not the usual ones. Hence, to
            not make that very clear by having a specific client API call is imho a
            mistake. More anecdotally, if you make counters fit in the standard API,
            you force client to pack/unpack the counter value, which you can avoid
            with a specific API.
            So, while I agree that 1546 forces a specific counter thrift API while a
            context-based approach would not necessarily, I ask the question: do we want a
            specific counter API or not. But maybe I'm the only one to think that a
            specific API could actually be a good idea

          A better encapsulation would be to re-use the context-based partition value
          from #1072 in a custom Column sub-class. All the logic has already been
          implemented, they just need to be refactored out of the clock class. I
          understand that the context-based logic is relatively complex.

          Despite what's above, I'm not totally opposed to that. In the end, I just
          want what's best for Cassandra. And I agree that it wouldn't be too hard to do
          and also that it's tested already and that is good.
          The complexity is still something I would be happy to avoid, even if well
          tested, as it's something we would have to maintain. Adding new tests is a one
          shot, maintaining something complex is forever (for some definition of
          forever). That being said, the context manipulation code is not crazy
          complicated either.
          Still, there is a few thing that comes to mind if we try such a reuse the
          context of #1072:

          1. it doesn't support decrement as is. Not only the uuid strategy needs it,
            but even not considering that, I strongly feel it would be stupid to defer
            the addition of decrements. It ain't hard to add to the context, but
            something to keep in mind nevertheless.
          2. it would be ugly to do what 1546 does to avoid the cleanContext() function
            of 1072 with contexts (not undoable but ugly). Agreed the AES repair needs
            fixing, but for normal write/read, the 1546 way is faster than the
            cleanContext option and I feel, cleaner.

          All this being said, I willing to give it a try. I can try to modify the
          (newly attached) patch for #1546 to use contexts (moved into a specific Column
          sub-class and adding timestamps to the context for decrements). We could then
          compare the two approaches. Only thing is, don't know exactly when I'll have
          the time to do that (but I'll try asap).

          The uuid strategy is worth investigating. However, a limitation of its design
          is that uuids are not coordinated across replicas. i.e. you cannot re-try and
          update w/ a given uuid on multiple replicas. The leader for a given uuid must
          be coordinated, at the client or coordinator level, to always correspond to
          the same replica. e.g. if this is done at the coordinator level, on a failed
          write, the leader has to be maintained and the mutation has to go through
          hinted hand-off. However, a limitation of this is that if the chosen leader
          goes down permanently, there may be orphaned HH mutations. However, it at
          least permits retries on the same replica and may be interesting for certain
          use cases.

          I agree, the uuid strategy as implemented in the actual (v2) patch is broken.
          It does try to deal with the replay on another coordinator but it doesn't work.
          I've change it to something that I believe deals with this problem and is
          better overall anyway. The code is attached as v3 but this comment is already
          long enough as is so I'm attaching a file (marker_idea.txt) to jira that tries
          to explain the basic idea.

          Show
          Sylvain Lebresne added a comment - Kelvin, Thanks for your comment. The counter storage strategy moves the complexity away from the column class and into the surrounding mutation and read logic. The problem this causes is that to understand a column-specific property--it's partitioned value, we have to read code in mutation and read commands and put together the logic in conjunction w/ how it interacts w/ the cassandra data model, at large. While this is not untrue, the code you are mentioning is simpler than the complex counter manipulation and I think, has a strong cassandra flavor. I don't think that having the code slightly more split is a big deal (but I could be wrong as I'm probably not objective on this). Imho, as far as code complexity is involved, I'm still on the side of the actual patch (but that's clearly debatable). The other challenge is that the existing cassandra data model is co-opted, so the end user has to learn a modified data model that is counter-specific. I don't believe this is the right encapsulation. It's true. However I would mitigate that with two points: The API for counters in the last patch is pretty close to the standard one. The main difference being that there is no super columns support. The other difference is the fact that there is a optional uuid on inserts. But I don't expect anyone familiar with the Cassandra data model to be thrown off balance by the counter API of this patch. I think that, in any case, it is a good idea to have a specific API for counters, at the very least for insertions. First because we need it for uuids (and I do believe on the usefulness of uuids, see below). Second, cause the guarantees on counter inserts are not the usual ones. Hence, to not make that very clear by having a specific client API call is imho a mistake. More anecdotally, if you make counters fit in the standard API, you force client to pack/unpack the counter value, which you can avoid with a specific API. So, while I agree that 1546 forces a specific counter thrift API while a context-based approach would not necessarily, I ask the question: do we want a specific counter API or not. But maybe I'm the only one to think that a specific API could actually be a good idea A better encapsulation would be to re-use the context-based partition value from #1072 in a custom Column sub-class. All the logic has already been implemented, they just need to be refactored out of the clock class. I understand that the context-based logic is relatively complex. Despite what's above, I'm not totally opposed to that. In the end, I just want what's best for Cassandra. And I agree that it wouldn't be too hard to do and also that it's tested already and that is good. The complexity is still something I would be happy to avoid, even if well tested, as it's something we would have to maintain. Adding new tests is a one shot, maintaining something complex is forever (for some definition of forever). That being said, the context manipulation code is not crazy complicated either. Still, there is a few thing that comes to mind if we try such a reuse the context of #1072: it doesn't support decrement as is. Not only the uuid strategy needs it, but even not considering that, I strongly feel it would be stupid to defer the addition of decrements. It ain't hard to add to the context, but something to keep in mind nevertheless. it would be ugly to do what 1546 does to avoid the cleanContext() function of 1072 with contexts (not undoable but ugly). Agreed the AES repair needs fixing, but for normal write/read, the 1546 way is faster than the cleanContext option and I feel, cleaner. All this being said, I willing to give it a try. I can try to modify the (newly attached) patch for #1546 to use contexts (moved into a specific Column sub-class and adding timestamps to the context for decrements). We could then compare the two approaches. Only thing is, don't know exactly when I'll have the time to do that (but I'll try asap). The uuid strategy is worth investigating. However, a limitation of its design is that uuids are not coordinated across replicas. i.e. you cannot re-try and update w/ a given uuid on multiple replicas. The leader for a given uuid must be coordinated, at the client or coordinator level, to always correspond to the same replica. e.g. if this is done at the coordinator level, on a failed write, the leader has to be maintained and the mutation has to go through hinted hand-off. However, a limitation of this is that if the chosen leader goes down permanently, there may be orphaned HH mutations. However, it at least permits retries on the same replica and may be interesting for certain use cases. I agree, the uuid strategy as implemented in the actual (v2) patch is broken. It does try to deal with the replay on another coordinator but it doesn't work. I've change it to something that I believe deals with this problem and is better overall anyway. The code is attached as v3 but this comment is already long enough as is so I'm attaching a file (marker_idea.txt) to jira that tries to explain the basic idea.
          Hide
          Johan Oskarsson added a comment -

          The last set of patches doesn't apply cleanly for me, can you upload an update? Or if it's easier, point me to a git repo with the patch applied. Thanks.

          Show
          Johan Oskarsson added a comment - The last set of patches doesn't apply cleanly for me, can you upload an update? Or if it's easier, point me to a git repo with the patch applied. Thanks.
          Hide
          Kelvin Kakugawa added a comment -

          Sylvain,

          I read through your patches. I'd like to discuss two aspects of #1546:
          1) the counter storage strategy, and
          2) the effectiveness of uuids.

          The counter storage strategy moves the complexity away from the column class and into the surrounding mutation and read logic. The problem this causes is that to understand a column-specific property--it's partitioned value, we have to read code in mutation and read commands and put together the logic in conjunction w/ how it interacts w/ the cassandra data model, at large. The other challenge is that the existing cassandra data model is co-opted, so the end user has to learn a modified data model that is counter-specific. I don't believe this is the right encapsulation.

          A better encapsulation would be to re-use the context-based partition value from #1072 in a custom Column sub-class. All the logic has already been implemented, they just need to be refactored out of the clock class. I understand that the context-based logic is relatively complex. However, it has far better unit test coverage than the rest of the cassandra codebase and, more importantly, has been deployed successfully in production at scale. ntm, the existing cassandra data model would not be modified. I feel like this would be a better encapsulation that is easier to reason about.

          The uuid strategy is worth investigating. However, a limitation of its design is that uuids are not coordinated across replicas. i.e. you cannot re-try and update w/ a given uuid on multiple replicas. The leader for a given uuid must be coordinated, at the client or coordinator level, to always correspond to the same replica. e.g. if this is done at the coordinator level, on a failed write, the leader has to be maintained and the mutation has to go through hinted hand-off. However, a limitation of this is that if the chosen leader goes down permanently, there may be orphaned HH mutations. However, it at least permits retries on the same replica and may be interesting for certain use cases.

          Show
          Kelvin Kakugawa added a comment - Sylvain, I read through your patches. I'd like to discuss two aspects of #1546: 1) the counter storage strategy, and 2) the effectiveness of uuids. The counter storage strategy moves the complexity away from the column class and into the surrounding mutation and read logic. The problem this causes is that to understand a column-specific property--it's partitioned value, we have to read code in mutation and read commands and put together the logic in conjunction w/ how it interacts w/ the cassandra data model, at large. The other challenge is that the existing cassandra data model is co-opted, so the end user has to learn a modified data model that is counter-specific. I don't believe this is the right encapsulation. A better encapsulation would be to re-use the context-based partition value from #1072 in a custom Column sub-class. All the logic has already been implemented, they just need to be refactored out of the clock class. I understand that the context-based logic is relatively complex. However, it has far better unit test coverage than the rest of the cassandra codebase and, more importantly, has been deployed successfully in production at scale. ntm, the existing cassandra data model would not be modified. I feel like this would be a better encapsulation that is easier to reason about. The uuid strategy is worth investigating. However, a limitation of its design is that uuids are not coordinated across replicas. i.e. you cannot re-try and update w/ a given uuid on multiple replicas. The leader for a given uuid must be coordinated, at the client or coordinator level, to always correspond to the same replica. e.g. if this is done at the coordinator level, on a failed write, the leader has to be maintained and the mutation has to go through hinted hand-off. However, a limitation of this is that if the chosen leader goes down permanently, there may be orphaned HH mutations. However, it at least permits retries on the same replica and may be interesting for certain use cases.
          Hide
          Sylvain Lebresne added a comment -

          I'm a little wary of having "bad" state on disk, because it introduces additional variables that have to be reasoned about.
          As of 2 months ago, AES doesn't perform a read-only major compaction like it did in 0.6. It streams across the relevant subsections of each SST on disk to the target node, which is concatenated into a new SST. So, if:
          1) A streams an AES repair SST to B,
          2) that streamed SST is never transformed (e.g. via compaction), then
          3) B streams an AES repair SST back to A.

          It will send the untransformed columns from A, back to A. When I have time, I'll re-read the compaction code to make sure this hasn't changed.

          I'll admit that if AES doesn't perform any deserialization, I have missed that. I'll re-read the AES code too.
          If so, then yes, it's broken. And in any case, I don't disagree with you that it may well not be a good idea to
          rely on such variable. I'm not against forcing deserialization after streaming to rebuild the sstable, at least
          for counter CFs. Anyway, the AES code should also be updated since right now the building of the merkle tree is
          broken (the has of counter supercolumns will be different on all node right now).

          Show
          Sylvain Lebresne added a comment - I'm a little wary of having "bad" state on disk, because it introduces additional variables that have to be reasoned about. As of 2 months ago, AES doesn't perform a read-only major compaction like it did in 0.6. It streams across the relevant subsections of each SST on disk to the target node, which is concatenated into a new SST. So, if: 1) A streams an AES repair SST to B, 2) that streamed SST is never transformed (e.g. via compaction), then 3) B streams an AES repair SST back to A. It will send the untransformed columns from A, back to A. When I have time, I'll re-read the compaction code to make sure this hasn't changed. I'll admit that if AES doesn't perform any deserialization, I have missed that. I'll re-read the AES code too. If so, then yes, it's broken. And in any case, I don't disagree with you that it may well not be a good idea to rely on such variable. I'm not against forcing deserialization after streaming to rebuild the sstable, at least for counter CFs. Anyway, the AES code should also be updated since right now the building of the merkle tree is broken (the has of counter supercolumns will be different on all node right now).
          Hide
          Kelvin Kakugawa added a comment -

          I'm a little wary of having "bad" state on disk, because it introduces additional variables that have to be reasoned about.

          As of 2 months ago, AES doesn't perform a read-only major compaction like it did in 0.6. It streams across the relevant subsections of each SST on disk to the target node, which is concatenated into a new SST. So, if:
          1) A streams an AES repair SST to B,
          2) that streamed SST is never transformed (e.g. via compaction), then
          3) B streams an AES repair SST back to A.

          It will send the untransformed columns from A, back to A. When I have time, I'll re-read the compaction code to make sure this hasn't changed.

          I believe you're right that #1072 also suffers from this potential race condition during reads, but I'd have to review that code path.

          I'm definitely in favor of moving the metadata into a different CF. Of interest, since all the columns have the same TTL, a modified compaction strategy could be implemented that completely deletes the SSTs for this CF w/o inspection based on when the SST was written.

          I'll read through your v2 patch. Thanks for rebasing.

          Show
          Kelvin Kakugawa added a comment - I'm a little wary of having "bad" state on disk, because it introduces additional variables that have to be reasoned about. As of 2 months ago, AES doesn't perform a read-only major compaction like it did in 0.6. It streams across the relevant subsections of each SST on disk to the target node, which is concatenated into a new SST. So, if: 1) A streams an AES repair SST to B, 2) that streamed SST is never transformed (e.g. via compaction), then 3) B streams an AES repair SST back to A. It will send the untransformed columns from A, back to A. When I have time, I'll re-read the compaction code to make sure this hasn't changed. I believe you're right that #1072 also suffers from this potential race condition during reads, but I'd have to review that code path. I'm definitely in favor of moving the metadata into a different CF. Of interest, since all the columns have the same TTL, a modified compaction strategy could be implemented that completely deletes the SSTs for this CF w/o inspection based on when the SST was written. I'll read through your v2 patch. Thanks for rebasing.
          Hide
          Sylvain Lebresne added a comment -

          I forgot to attach the said patch earlier but that's fine because I've rebased
          the thing. So I attach a v2 set of patches that is rebased to current trunk and
          includes updates of my previous comment.

          Show
          Sylvain Lebresne added a comment - I forgot to attach the said patch earlier but that's fine because I've rebased the thing. So I attach a v2 set of patches that is rebased to current trunk and includes updates of my previous comment.
          Hide
          Sylvain Lebresne added a comment -

          You're right that there is a problem w/ AE repair. Last time I read the 0.7 code, AE repair works like so:

          Actually, I do think that AE repair are fine in current patch. The idea is the
          following: for a counter, each host has its specific LocalCounterColumn named
          after the node ip. The node should be the only one to insert such columns
          (because those columns merge there values, so only one on each new counter
          increment should be created). The patch achieve this in (columns)
          deserialization. Eeach node that deserialize a LocalCounterColumn will
          deserialize it as a CounterColumn (that acts as a normal column except that it
          has no effect on the node this column originate from) unless this is its
          LocalCounterColumn. So no host can send us back one of our LocalCounterColumn
          since it will never see it as a LocalCounterColumn. It is true that because of
          streaming, a node can have a LocalCounterColumn for another host in one of its
          sstable. But as soon as it deserialize it, it will become a (non dangerous)
          CounterColumn. So, as long a we don't stream back the exact same sstable that
          we have stream in (which we never do to my knowledge), nothing's broken. Feel
          free to correct me of course.

          Apart from this, I'm attaching an updated version of the patch (that i'm
          calling v2 to keep some history). It includes two important updates:

          • It fixes (I hope) a fairly nasty race condition (that I think #1072 also
            suffers from btw). The idea is that since we merge the values of
            LocalCounterColumns, we should never, during a read, merge twice the same
            exact column (the risk being to return a wrong result). Even though this
            usually doesn't happen, this could happen in the following situation:
            1. When we switch memtables
            2. when a memtable that has been fully flushed becomes an active sstable
            3. At the end of a compaction, when the newly created sstable becomes
              active
            4. Any other place I'm forgetting

          This could happen because those operations aren't atomic (for reads) and
          thus there a small delay during which we could read twice from the same
          memtable/sstable. The patch introduces a new readWriteLock that should
          solve this. The changes are fully in Table.java and CFStore.java for those
          interested (and I'll be happy to have some feedback, cause I'm not sure
          how to test this).

          • It moves the handling of the 'marker columns' to a different (and optional)
            CF. Having done this, I've removed the counter-as-row option, so the patch
            only have counters-as-superColumns, which in turns simplify the thrift API,
            making the api for counters more similar to the rest of the API.
            I want to mention that the marker idea is still work in progress and I'm
            trying some changes. So feel free to ignore it on a first read (it's fully
            optional anyway). But the overall goal I'm pursuing with this, is to be
            able to replay a update when we don't know if it made it in. This is
            clearly hard if you want to be partition tolerant and not incur too much
            overhead on normal (non failing) operations. So I'm leaning towards a
            system where a replayed update could mean temporal inconsistency, but will
            ensure that eventually, the count will be right. But more on this later.
          Show
          Sylvain Lebresne added a comment - You're right that there is a problem w/ AE repair. Last time I read the 0.7 code, AE repair works like so: Actually, I do think that AE repair are fine in current patch. The idea is the following: for a counter, each host has its specific LocalCounterColumn named after the node ip. The node should be the only one to insert such columns (because those columns merge there values, so only one on each new counter increment should be created). The patch achieve this in (columns) deserialization. Eeach node that deserialize a LocalCounterColumn will deserialize it as a CounterColumn (that acts as a normal column except that it has no effect on the node this column originate from) unless this is its LocalCounterColumn. So no host can send us back one of our LocalCounterColumn since it will never see it as a LocalCounterColumn. It is true that because of streaming, a node can have a LocalCounterColumn for another host in one of its sstable. But as soon as it deserialize it, it will become a (non dangerous) CounterColumn. So, as long a we don't stream back the exact same sstable that we have stream in (which we never do to my knowledge), nothing's broken. Feel free to correct me of course. Apart from this, I'm attaching an updated version of the patch (that i'm calling v2 to keep some history). It includes two important updates: It fixes (I hope) a fairly nasty race condition (that I think #1072 also suffers from btw). The idea is that since we merge the values of LocalCounterColumns, we should never, during a read, merge twice the same exact column (the risk being to return a wrong result). Even though this usually doesn't happen, this could happen in the following situation: When we switch memtables when a memtable that has been fully flushed becomes an active sstable At the end of a compaction, when the newly created sstable becomes active Any other place I'm forgetting This could happen because those operations aren't atomic (for reads) and thus there a small delay during which we could read twice from the same memtable/sstable. The patch introduces a new readWriteLock that should solve this. The changes are fully in Table.java and CFStore.java for those interested (and I'll be happy to have some feedback, cause I'm not sure how to test this). It moves the handling of the 'marker columns' to a different (and optional) CF. Having done this, I've removed the counter-as-row option, so the patch only have counters-as-superColumns, which in turns simplify the thrift API, making the api for counters more similar to the rest of the API. I want to mention that the marker idea is still work in progress and I'm trying some changes. So feel free to ignore it on a first read (it's fully optional anyway). But the overall goal I'm pursuing with this, is to be able to replay a update when we don't know if it made it in. This is clearly hard if you want to be partition tolerant and not incur too much overhead on normal (non failing) operations. So I'm leaning towards a system where a replayed update could mean temporal inconsistency, but will ensure that eventually, the count will be right. But more on this later.
          Hide
          Kelvin Kakugawa added a comment -

          Sylvain,

          You're right that there is a problem w/ AE repair. Last time I read the 0.7 code, AE repair works like so:
          1) stream (selected subsections) of the SST's data file to the target replica,
          2) rebuild the index and filter files on the target.

          However, in #1072, we account for this by rebuilding the whole SST from the streamed SST data file. In order to go through the counter-specific deserialization code path.

          As for deletes, #1546 follows the same abstract model as #1072, so you're right that the situation is the same.

          I'm about to take a deep dive through your patch. Thank you for thinking about partitioned counters.

          Show
          Kelvin Kakugawa added a comment - Sylvain, You're right that there is a problem w/ AE repair. Last time I read the 0.7 code, AE repair works like so: 1) stream (selected subsections) of the SST's data file to the target replica, 2) rebuild the index and filter files on the target. However, in #1072, we account for this by rebuilding the whole SST from the streamed SST data file. In order to go through the counter-specific deserialization code path. As for deletes, #1546 follows the same abstract model as #1072, so you're right that the situation is the same. I'm about to take a deep dive through your patch. Thank you for thinking about partitioned counters.
          Hide
          Sylvain Lebresne added a comment -

          Actually, there is a bunch of change I want to do to the marker idea to
          improve it. One one which being that I'm leaning towards using a separate
          (optional) CF for those marker (not unlike the way it is done for secondary
          indexes). Anyway, if that is done, it's probably worth just keeping
          counter-as-supercolumns. Which would make thing slightly cleaner.

          Show
          Sylvain Lebresne added a comment - Actually, there is a bunch of change I want to do to the marker idea to improve it. One one which being that I'm leaning towards using a separate (optional) CF for those marker (not unlike the way it is done for secondary indexes). Anyway, if that is done, it's probably worth just keeping counter-as-supercolumns. Which would make thing slightly cleaner.
          Hide
          Sylvain Lebresne added a comment - - edited

          Attaching a new version of the patch that includes the support for counter as super columns.
          I want to keep the counter-as-row too, because I believe in the marker idea a lot and that won't
          fly for counter as super columns until we don't deserialize super columns entirely.

          The details of the thrift API is a proposal, it can be discussed. In particular, for
          the batch_add call, it would have made more sense for it to take a map of
          CounterPath -> CounterUpdate. However, that doesn't work in python (because
          CounterPath is not hashable and thus the map can't be constructed). So it takes
          something similar to batch_mutate. However, if you use batch_add to insert non
          nested counters (the terminology of the patch for counter-as-row, by opposition to
          counter-as-super-columns that are called nested counters), then the first key of the
          map can be anything (which is fine but weird).

          Also, the get_counter_slice and multiget_counter_slice can't be used for non nested
          counters. It won't be hard to add an equivalent of get_range_slices for counters, but
          that will have to wait.

          Lastly, the implementation of batch_add sucks right now as it creates a new mutation
          for each update. Again, not hard to fix, just need some time for that.

          (in an unrelated note, please tell if you prefer me to attach new versions when I update
          the patch instead of replacing them)

          Show
          Sylvain Lebresne added a comment - - edited Attaching a new version of the patch that includes the support for counter as super columns. I want to keep the counter-as-row too, because I believe in the marker idea a lot and that won't fly for counter as super columns until we don't deserialize super columns entirely. The details of the thrift API is a proposal, it can be discussed. In particular, for the batch_add call, it would have made more sense for it to take a map of CounterPath -> CounterUpdate. However, that doesn't work in python (because CounterPath is not hashable and thus the map can't be constructed). So it takes something similar to batch_mutate. However, if you use batch_add to insert non nested counters (the terminology of the patch for counter-as-row, by opposition to counter-as-super-columns that are called nested counters), then the first key of the map can be anything (which is fine but weird). Also, the get_counter_slice and multiget_counter_slice can't be used for non nested counters. It won't be hard to add an equivalent of get_range_slices for counters, but that will have to wait. Lastly, the implementation of batch_add sucks right now as it creates a new mutation for each update. Again, not hard to fix, just need some time for that. (in an unrelated note, please tell if you prefer me to attach new versions when I update the patch instead of replacing them)
          Hide
          Sylvain Lebresne added a comment -

          Clearly, the read will involves IO unless we have a cache.
          But as said in a previous comment, there is a cache already, the row cache.
          It will do the job just fine.

          Now, maybe it could make sense to have a specific counter cache, because
          it could be more space efficient for instance. And I'm not even saying it would
          be hard to add, it probably wouldn't. But right now, I think that it is a premature
          optimization and I much prefer keep the decision to implement one and, if so, the
          actual implementation to another future ticket.

          Show
          Sylvain Lebresne added a comment - Clearly, the read will involves IO unless we have a cache. But as said in a previous comment, there is a cache already, the row cache. It will do the job just fine. Now, maybe it could make sense to have a specific counter cache, because it could be more space efficient for instance. And I'm not even saying it would be hard to add, it probably wouldn't. But right now, I think that it is a premature optimization and I much prefer keep the decision to implement one and, if so, the actual implementation to another future ticket.
          Hide
          zhu han added a comment -

          So, to answer the question about serializing the writes, there is no need (and
          I believe it's a good thing performance-wise). When a leader receives an
          update, it doesn't read-then-write. It writes-then-read.

          I agree. "Write-then-read" fits better with current code base than "read-then-write".
          But CounterMutation#readLocalCounterColumn() needs to read all present LocalCounterColumns, merge
          them together and get the latest value of LocalCounterColumn. This may hurt the performance because
          it may trigger disk IO for each counter update.

          Can we add a "CounterCache" option to Counter column family? CounterMutation#applyUpdateLocally() applies
          the delta value to cached counter atomatically if the cache is hit. If it's missed, it just gives up.
          CounterMutation#readLocalCounterColumn() fetch the counter value from the cache if it's hit, otherwise,
          goes to the current slow path implementation, and update the cache with the value returned by the slow path.

          But special care should be taken, as the execution of slow path and insertion to the cache is not atomic. Fortunately,
          we can use the timestamp of cached LocalCounterColumn to detect the race, and discard the stale value silently.

          We can implement it as a simple N-way associate counter column cache, backed by a fixed-size plain array. Of course,
          N should be configurable. So that the memory footprint of the cache is very dense, and the performance overhead is very lightweight.
          If the collision rate of Java hashCode() implementation is not good, crc32 or other fast hash method can be used to distribute
          the LocalCounterColumn to the cache, depends on their row key.

          I think some workload which maintains counter which are frequently updated, can benifit a lot from this simple cache.

          Show
          zhu han added a comment - So, to answer the question about serializing the writes, there is no need (and I believe it's a good thing performance-wise). When a leader receives an update, it doesn't read-then-write. It writes-then-read. I agree. "Write-then-read" fits better with current code base than "read-then-write". But CounterMutation#readLocalCounterColumn() needs to read all present LocalCounterColumns, merge them together and get the latest value of LocalCounterColumn. This may hurt the performance because it may trigger disk IO for each counter update. Can we add a "CounterCache" option to Counter column family? CounterMutation#applyUpdateLocally() applies the delta value to cached counter atomatically if the cache is hit. If it's missed, it just gives up. CounterMutation#readLocalCounterColumn() fetch the counter value from the cache if it's hit, otherwise, goes to the current slow path implementation, and update the cache with the value returned by the slow path. But special care should be taken, as the execution of slow path and insertion to the cache is not atomic. Fortunately, we can use the timestamp of cached LocalCounterColumn to detect the race, and discard the stale value silently. We can implement it as a simple N-way associate counter column cache, backed by a fixed-size plain array. Of course, N should be configurable. So that the memory footprint of the cache is very dense, and the performance overhead is very lightweight. If the collision rate of Java hashCode() implementation is not good, crc32 or other fast hash method can be used to distribute the LocalCounterColumn to the cache, depends on their row key. I think some workload which maintains counter which are frequently updated, can benifit a lot from this simple cache.
          Hide
          Sylvain Lebresne added a comment -

          (I updated the patch because I've found a way to simplify a bit of the code
          (I've removed the special deserialization function in ColumnSerializer if some
          had read the code already))

          Allow me to explain a little further how this work before answering the
          preceding questions (sorry if that a tad long).

          Let's consider a counter c whose replicas is node A, B and C. Let's say that
          we have updated 3 times the counter, with values 1, 2 and 3 respectively and
          with node A, B and C for respective 'update leader'.
          The row for c (I don't consider marker columns here) will be the following one
          on node A:

            c : {
              <A ip address> : 1, (LocalCounterColumn)
              <B ip address> : 2, (CounterColumn)
              <C ip address> : 3, (CounterColumn)
            }
          

          and on node B, the row for c will be:

            c : {
              <A ip address> : 1, (CounterColumn)
              <B ip address> : 2, (LocalCounterColumn)
              <C ip address> : 3, (CounterColumn)
            }
          

          In parenthesis are the actual class implementing the column. Note that on each
          node, the column with its id is special. And the difference is that when a
          LocalCounterColumn c1 conflicts with another LocalCounterColumn c2, then we
          resolve this by returning a new LocalCounterColumn c3, whose value is
          c1.value() + c2.value() (and the timestamp is the max of c1 and c2 timestmap).
          CounterColumn in contrast have the exact same resolution than standard column
          (that is, if two CounterColumn conflicts, the result is the one with higher
          timestamp).

          So, to answer the question about serializing the writes, there is no need (and
          I believe it's a good thing performance-wise). When a leader receives an
          update, it doesn't read-then-write. It writes-then-read. And as parts of the
          read, the newly inserted LocalCounterColumn will be 'merged' with the other,
          already present LocalCounterColumn and yield the actual value of the column,
          without the risk of loosing an increment.

          But now, we see that the data is not exactly mirrored in the nodes. In
          particular, there is one thing that we must absolutely avoid: we should never
          have a repair operation (read repair or AE repair) that inserts to node A a
          LocalCounterColumn whose name is <A ip address> (otherwise, this would get
          added to the actual value and screw up the total counter value). Another way
          to say this is that the value of the column <A ip address> is always equals to
          the sum of all the update A have leads, and we are sure of that. So we need
          not repair the value of this column on node A and we must never do it.
          Moreover, when A sends it's value parts to B, it sends a LocalCounterColumn,
          but when received by B (or any other host for this matter), it should become a
          CounterColumn.

          The implementation enforces this in the ColumnSerializer, during
          deserialization. When a node deserialize a (serialized) LocalCounterColumn, it
          will always deserialize it as a CounterColumn unless, it is its
          locaCounterColumn. So when A sends it LocalCounterColumn to B (for a read
          repair say), B will deserialize it as a CounterColumn. If now B sends this
          back to A, A will receive a CounterColumn for its local counter column and
          it will discard it. So, because we ensure that an host different from A will
          never 'see' a LocalCounterColumn whose name is <A ip address> (but it will see
          such CounterColumn), we know that we will never wrongfully repair the local
          counter of A.

          During AE repair, because we use streaming, we could end up with a SStable on
          B having a LocalCounterColumn of name <A ip address>. However, as soon a this
          column is deserialized, it is deserialized as a CounterColumn. So here again,
          we will not wrongfully repair A.
          Unless ... we stream back the exact same sstable to A. But I think this can
          never happen (anybody more familiar with AE repair and streaming could confirm?).

          Show
          Sylvain Lebresne added a comment - (I updated the patch because I've found a way to simplify a bit of the code (I've removed the special deserialization function in ColumnSerializer if some had read the code already)) Allow me to explain a little further how this work before answering the preceding questions (sorry if that a tad long). Let's consider a counter c whose replicas is node A, B and C. Let's say that we have updated 3 times the counter, with values 1, 2 and 3 respectively and with node A, B and C for respective 'update leader'. The row for c (I don't consider marker columns here) will be the following one on node A : c : { <A ip address> : 1, (LocalCounterColumn) <B ip address> : 2, (CounterColumn) <C ip address> : 3, (CounterColumn) } and on node B , the row for c will be: c : { <A ip address> : 1, (CounterColumn) <B ip address> : 2, (LocalCounterColumn) <C ip address> : 3, (CounterColumn) } In parenthesis are the actual class implementing the column. Note that on each node, the column with its id is special. And the difference is that when a LocalCounterColumn c1 conflicts with another LocalCounterColumn c2, then we resolve this by returning a new LocalCounterColumn c3, whose value is c1.value() + c2.value() (and the timestamp is the max of c1 and c2 timestmap). CounterColumn in contrast have the exact same resolution than standard column (that is, if two CounterColumn conflicts, the result is the one with higher timestamp). So, to answer the question about serializing the writes, there is no need (and I believe it's a good thing performance-wise). When a leader receives an update, it doesn't read-then-write. It writes-then-read. And as parts of the read, the newly inserted LocalCounterColumn will be 'merged' with the other, already present LocalCounterColumn and yield the actual value of the column, without the risk of loosing an increment. But now, we see that the data is not exactly mirrored in the nodes. In particular, there is one thing that we must absolutely avoid: we should never have a repair operation (read repair or AE repair) that inserts to node A a LocalCounterColumn whose name is <A ip address> (otherwise, this would get added to the actual value and screw up the total counter value). Another way to say this is that the value of the column <A ip address> is always equals to the sum of all the update A have leads, and we are sure of that. So we need not repair the value of this column on node A and we must never do it . Moreover, when A sends it's value parts to B, it sends a LocalCounterColumn, but when received by B (or any other host for this matter), it should become a CounterColumn. The implementation enforces this in the ColumnSerializer, during deserialization. When a node deserialize a (serialized) LocalCounterColumn, it will always deserialize it as a CounterColumn unless, it is its locaCounterColumn. So when A sends it LocalCounterColumn to B (for a read repair say), B will deserialize it as a CounterColumn. If now B sends this back to A, A will receive a CounterColumn for its local counter column and it will discard it. So, because we ensure that an host different from A will never 'see' a LocalCounterColumn whose name is <A ip address> (but it will see such CounterColumn), we know that we will never wrongfully repair the local counter of A. During AE repair, because we use streaming, we could end up with a SStable on B having a LocalCounterColumn of name <A ip address>. However, as soon a this column is deserialized, it is deserialized as a CounterColumn. So here again, we will not wrongfully repair A. Unless ... we stream back the exact same sstable to A. But I think this can never happen (anybody more familiar with AE repair and streaming could confirm?).
          Hide
          zhu han added a comment -

          One more question on the local counter column.

          Are all local column update operation are serialized, i.e atomic and isolated?

          For example, there are two CounterMutation, both are +1. When these two requests reach the leader replica, the current value of local column is 2. If the two operations are not serialized, both of them can read the same current value 2, apply the "+1" to it, and get 3 as the final result. One increment request just lost silently. Does the current patch work against it?

          Show
          zhu han added a comment - One more question on the local counter column. Are all local column update operation are serialized, i.e atomic and isolated? For example, there are two CounterMutation, both are +1. When these two requests reach the leader replica, the current value of local column is 2. If the two operations are not serialized, both of them can read the same current value 2, apply the "+1" to it, and get 3 as the final result. One increment request just lost silently. Does the current patch work against it?
          Hide
          zhu han added a comment -

          Streaming/repair will only be broken if at a particular instant in time, 2 replicas are not supposed to be holding the same data. Based on a cursory glance, all replicas should be holding the same data, so I think streaming should be happy, but please confirm.

          Current implementation can bring the counter rows on different replicas to eventual consistency, as normal rows. Streaming/repair should not be a problem, per my understanding.

          Show
          zhu han added a comment - Streaming/repair will only be broken if at a particular instant in time, 2 replicas are not supposed to be holding the same data. Based on a cursory glance, all replicas should be holding the same data, so I think streaming should be happy, but please confirm. Current implementation can bring the counter rows on different replicas to eventual consistency, as normal rows. Streaming/repair should not be a problem, per my understanding.
          Hide
          Jonathan Ellis added a comment -

          That makes sense, thanks for the clarification.

          Show
          Jonathan Ellis added a comment - That makes sense, thanks for the clarification.
          Hide
          Sylvain Lebresne added a comment -

          Say at the beginning A has for K the value 4 (the row counter will have only one column, it's 'local counter
          column', that is a column whose name is A id).
          We add B. The sstable containing K with value 4 is streamed to B. For B, the column streamed from
          A is not it's 'local counter column' (it's the one of A), so it treats it as a normal column.

          During the bootstrap, A receives update to K as a leader, say we add 2. A will insert 2 as
          a LocalCounterColumn (terminology from the patch). Then A will read it's 'local counter column'.
          This read will merge the old value 4, with the newly inserted update 2, and thus the result of
          the read is a column with value 6. Then A will replicate the update to B by sending to B the
          value 6 (more precisely a column whose name is A id, value is 6 and timestamp is the timestamp
          of the new update. B receives that and treat that as a standard column (it will overwrite the streamed
          column with the value 4 because the timestamp will be greater).

          Show
          Sylvain Lebresne added a comment - Say at the beginning A has for K the value 4 (the row counter will have only one column, it's 'local counter column', that is a column whose name is A id). We add B. The sstable containing K with value 4 is streamed to B. For B, the column streamed from A is not it's 'local counter column' (it's the one of A), so it treats it as a normal column. During the bootstrap, A receives update to K as a leader, say we add 2. A will insert 2 as a LocalCounterColumn (terminology from the patch). Then A will read it's 'local counter column'. This read will merge the old value 4, with the newly inserted update 2, and thus the result of the read is a column with value 6. Then A will replicate the update to B by sending to B the value 6 (more precisely a column whose name is A id, value is 6 and timestamp is the timestamp of the new update. B receives that and treat that as a standard column (it will overwrite the streamed column with the value 4 because the timestamp will be greater).
          Hide
          Jonathan Ellis added a comment -

          Say you have a single node A and RF=1. You add another node B, which becomes responsible for counter row K.

          While B is bootstrapping we continue to update K (on the only valid leader, A), so when bootstrap finishes, the value B has is obsolete. How do we propagate the updates that happened during bootstrap from A to B?

          Show
          Jonathan Ellis added a comment - Say you have a single node A and RF=1. You add another node B, which becomes responsible for counter row K. While B is bootstrapping we continue to update K (on the only valid leader, A), so when bootstrap finishes, the value B has is obsolete. How do we propagate the updates that happened during bootstrap from A to B?
          Hide
          Sylvain Lebresne added a comment -

          I'm not of what you are referring to by 'the switching of master nodes'. Each update has a 'leader', that
          takes responsibility for the update. But for the counter row itself, there is no distinguished node.
          For bootstraping nodes, as long as they aren't choose as leader (and they won't until they are
          bootstrapped), a counter row is really a run-of-the-mill row. So I don't see a particular problem here
          (unless I haven't understand what you're refering to).

          Show
          Sylvain Lebresne added a comment - I'm not of what you are referring to by 'the switching of master nodes'. Each update has a 'leader', that takes responsibility for the update. But for the counter row itself, there is no distinguished node. For bootstraping nodes, as long as they aren't choose as leader (and they won't until they are bootstrapped), a counter row is really a run-of-the-mill row. So I don't see a particular problem here (unless I haven't understand what you're refering to).
          Hide
          Jonathan Ellis added a comment -

          What worries me isn't the streaming per se so much as the switching of "master" nodes when a new node assumes responsibility for a counter row. For normal data, we add the new node into the CL requirements and everything Just Works. Here, it seems like we need an extra step post-bootstrap to make sure the new "master" is current on operations that happened during streaming. (1072 has the same problem.)

          Show
          Jonathan Ellis added a comment - What worries me isn't the streaming per se so much as the switching of "master" nodes when a new node assumes responsibility for a counter row. For normal data, we add the new node into the CL requirements and everything Just Works. Here, it seems like we need an extra step post-bootstrap to make sure the new "master" is current on operations that happened during streaming. (1072 has the same problem.)
          Hide
          Stu Hood added a comment -

          > It could be that streaming is broken with this
          Streaming/repair will only be broken if at a particular instant in time, 2 replicas are not supposed to be holding the same data. Based on a cursory glance, all replicas should be holding the same data, so I think streaming should be happy, but please confirm.

          Show
          Stu Hood added a comment - > It could be that streaming is broken with this Streaming/repair will only be broken if at a particular instant in time, 2 replicas are not supposed to be holding the same data. Based on a cursory glance, all replicas should be holding the same data, so I think streaming should be happy, but please confirm.
          Hide
          zhu han added a comment -

          Thank you for you detailed response.

          During a write, after having apply the increment locally, there is a read a one column (the one corresponding to the local count).
          This is this value that is sent for replication (this thus integrate the fleshly written update). This read is a normal read, so it hits as
          many sstables as need be, if that's what you mean.

          Yep. What I mean is we don't need to read multiple sstables but the most fresh one to get the latest value of the single column. If the counter column is updated frequently, it should reside in memtable. So, We can read it from memtable directly, even without touching on disk sstables. That is, we do not need any disk IO for the counter incr/decr mutation, just like the normal column mutation.

          This can keep the update of counter still faster than read, which is the keen competitive advantage of cassandra.

          Ok, the problem is the following: suppose you issue one increment (+1), then you remove the counter, then you increment again (+1).
          Say the leader replicate is always the same one, but he receives the two increments first. It will 'merge' those two increment, and
          we'll end up with one column, whose count is 2 and whose timestamp is the one of the last increment. Then it receives the delete.
          But as far as he's concerned, this delete is obsolete and will be discarded. Even if we were somehow able to detect that the delete
          should have delete something, how can we know which parts of the now merged count should be kept or not.

          I see. What I said does not work here.

          What makes things more complicated, is commands from two clients does not have any total order at all. For example, two clients, one issued increment, the other one issued deletion, both at time t1. Whether the effect of increment left after execution of these two commands are not deterministic.

          So, I agree with you, this issue should not be blocker of this feature. Cassandra can not provide atomic incr/decr, or delete, no matter how hard we try, as long as CAP theorem is right . I even thought we should not solve this tricky problem. Let's expose this constraint to the client application directly.

          Show
          zhu han added a comment - Thank you for you detailed response. During a write, after having apply the increment locally, there is a read a one column (the one corresponding to the local count). This is this value that is sent for replication (this thus integrate the fleshly written update). This read is a normal read, so it hits as many sstables as need be, if that's what you mean. Yep. What I mean is we don't need to read multiple sstables but the most fresh one to get the latest value of the single column. If the counter column is updated frequently, it should reside in memtable. So, We can read it from memtable directly, even without touching on disk sstables. That is, we do not need any disk IO for the counter incr/decr mutation, just like the normal column mutation. This can keep the update of counter still faster than read, which is the keen competitive advantage of cassandra. Ok, the problem is the following: suppose you issue one increment (+1), then you remove the counter, then you increment again (+1). Say the leader replicate is always the same one, but he receives the two increments first. It will 'merge' those two increment, and we'll end up with one column, whose count is 2 and whose timestamp is the one of the last increment. Then it receives the delete. But as far as he's concerned, this delete is obsolete and will be discarded. Even if we were somehow able to detect that the delete should have delete something, how can we know which parts of the now merged count should be kept or not. I see. What I said does not work here. What makes things more complicated, is commands from two clients does not have any total order at all. For example, two clients, one issued increment, the other one issued deletion, both at time t1. Whether the effect of increment left after execution of these two commands are not deterministic. So, I agree with you, this issue should not be blocker of this feature. Cassandra can not provide atomic incr/decr, or delete, no matter how hard we try, as long as CAP theorem is right . I even thought we should not solve this tricky problem. Let's expose this constraint to the client application directly.
          Hide
          Sylvain Lebresne added a comment -

          Realized while answering you comment that I had forgot something, so I updated the patch.

          Does the lead replica has to iterate all SSTables, and get the latest value of th counter before applying the decr/incr mutation? If so, the read path can be a performance bottleneck. But we can leverage some tricks: only the counter columns in the latest SSTable are valid and others in the old SSTable can be ignored safely.

          So, the frequently updated counter column can resides in memtable, and local read-modify-write operation only brings negligible performance lost. The counter update path is almost as fast as normal column update path.

          During a write, after having apply the increment locally, there is a read a one column (the one corresponding to the local count).
          This is this value that is sent for replication (this thus integrate the fleshly written update). This read is a normal read, so it hits as
          many sstables as need be, if that's what you mean. But only one column is read.
          One way to make this read fast is to use row cache on the counter CF. It is true however that because of the marker columns, the
          row may become fairly large with high volume counters (even though the row is never read entirely). You can play on the ttl of
          the marker column however to keep that manageable (the ttl on the marker can be pretty small, in the order a minute or so). As said,
          you can also not use marker column if you're ready to accept the potential drawbacks, in which case the counter row will be really
          small and a very good candidate for row cache. I don't know if that is what you were proposing ?
          Lastly, note that at CL.ONE and without marker column, the counter update path will be as fast as normal column, as far as client are
          concerned at least. Because on the leader replica we do write then read and replicate.

          I have no idea about the detail of the removal before incr/decr problem. But a quick solution could be let the deletion operation snapshots the current value of counter column, write it in another column. Just let the read path to merge these columns, including different counter columns, and the deletion snapshot column.

          Ok, the problem is the following: suppose you issue one increment (+1), then you remove the counter, then you increment again (+1).
          Say the leader replicate is always the same one, but he receives the two increments first. It will 'merge' those two increment, and
          we'll end up with one column, whose count is 2 and whose timestamp is the one of the last increment. Then it receives the delete.
          But as far as he's concerned, this delete is obsolete and will be discarded. Even if we were somehow able to detect that the delete
          should have delete something, how can we know which parts of the now merged count should be kept or not.

          So basically, remove works if you don't reuse the counter afterwards Or after a sufficient time has elapsed. Otherwise, it may
          work or it may not

          Even though this is really unfortunate, I don't see that as a blocker, since people can always reset the counter by reading the value v
          of the counter and then insert -v. Then I'm sure we can come up with something smarter.

          Show
          Sylvain Lebresne added a comment - Realized while answering you comment that I had forgot something, so I updated the patch. Does the lead replica has to iterate all SSTables, and get the latest value of th counter before applying the decr/incr mutation? If so, the read path can be a performance bottleneck. But we can leverage some tricks: only the counter columns in the latest SSTable are valid and others in the old SSTable can be ignored safely. So, the frequently updated counter column can resides in memtable, and local read-modify-write operation only brings negligible performance lost. The counter update path is almost as fast as normal column update path. During a write, after having apply the increment locally, there is a read a one column (the one corresponding to the local count). This is this value that is sent for replication (this thus integrate the fleshly written update). This read is a normal read, so it hits as many sstables as need be, if that's what you mean. But only one column is read. One way to make this read fast is to use row cache on the counter CF. It is true however that because of the marker columns, the row may become fairly large with high volume counters (even though the row is never read entirely). You can play on the ttl of the marker column however to keep that manageable (the ttl on the marker can be pretty small, in the order a minute or so). As said, you can also not use marker column if you're ready to accept the potential drawbacks, in which case the counter row will be really small and a very good candidate for row cache. I don't know if that is what you were proposing ? Lastly, note that at CL.ONE and without marker column, the counter update path will be as fast as normal column, as far as client are concerned at least. Because on the leader replica we do write then read and replicate. I have no idea about the detail of the removal before incr/decr problem. But a quick solution could be let the deletion operation snapshots the current value of counter column, write it in another column. Just let the read path to merge these columns, including different counter columns, and the deletion snapshot column. Ok, the problem is the following: suppose you issue one increment (+1), then you remove the counter, then you increment again (+1). Say the leader replicate is always the same one, but he receives the two increments first. It will 'merge' those two increment, and we'll end up with one column, whose count is 2 and whose timestamp is the one of the last increment. Then it receives the delete. But as far as he's concerned, this delete is obsolete and will be discarded. Even if we were somehow able to detect that the delete should have delete something, how can we know which parts of the now merged count should be kept or not. So basically, remove works if you don't reuse the counter afterwards Or after a sufficient time has elapsed. Otherwise, it may work or it may not Even though this is really unfortunate, I don't see that as a blocker, since people can always reset the counter by reading the value v of the counter and then insert -v. Then I'm sure we can come up with something smarter.
          Hide
          zhu han added a comment -

          That's soooooo cool. Thank you.

          Does the lead replica has to iterate all SSTables, and get the latest value of th counter before applying the decr/incr mutation? If so, the read path can be a performance bottleneck. But we can leverage some tricks: only the counter columns in the latest SSTable are valid and others in the old SSTable can be ignored safely.

          So, the frequently updated counter column can resides in memtable, and local read-modify-write operation only brings negligible performance lost. The counter update path is almost as fast as normal column update path.

          I have no idea about the detail of the removal before incr/decr problem. But a quick solution could be let the deletion operation snapshots the current value of counter column, write it in another column. Just let the read path to merge these columns, including different counter columns, and the deletion snapshot column.

          The deletion snapshot column are reconciled by the timestamp as usual. So if the counter is deleted multiple times, only the latest snapshot is kept and decreased from the merged counter value in read path.

          Show
          zhu han added a comment - That's soooooo cool. Thank you. Does the lead replica has to iterate all SSTables, and get the latest value of th counter before applying the decr/incr mutation? If so, the read path can be a performance bottleneck. But we can leverage some tricks: only the counter columns in the latest SSTable are valid and others in the old SSTable can be ignored safely. So, the frequently updated counter column can resides in memtable, and local read-modify-write operation only brings negligible performance lost. The counter update path is almost as fast as normal column update path. I have no idea about the detail of the removal before incr/decr problem. But a quick solution could be let the deletion operation snapshots the current value of counter column, write it in another column. Just let the read path to merge these columns, including different counter columns, and the deletion snapshot column. The deletion snapshot column are reconciled by the timestamp as usual. So if the counter is deleted multiple times, only the latest snapshot is kept and decreased from the merged counter value in read path.
          Hide
          Sylvain Lebresne added a comment -

          At its core, this patch uses the same main idea than CASSANDRA-1072, but
          instead of using a Clock, it splits the subparts of the context of
          CASSANDRA-1072 into individual columns. This has, at the very least, two
          advantages over the clock structure:

          1. it is much simpler in terms of code as its reuse of lot of what exists. In
            particular, it avoids the fairly complex byte array manipulation of
            CASSANDRA-1072.
          2. each parts of the counter (that is, each pair (host id, count)) have a
            timestamp. This means we get full counters (increment and decrement) for
            free (that alone is worth it if you ask me).
            More anecdotally, this allows for a different approach than the one of
            CASSANDRA-1072 for the 'cleanContext' logic, which I believe is faster and
            cleaner (and hopefully right). This is more of an implementation detail
            however, so we can discuss it if someone find a problem.

          The patch also includes some modification of the patch I had once attached to
          CASSANDRA-580 and CASSANDRA-1397. It adds a new verb handler when submitting a
          counter update that first forward the update to an actual replica of the
          counter. This replica, applies the update, and then replicate the update. So
          this patch does support the usual consistency level (for read and writes).
          This involves a local read on the first replicate (called leader in the
          patch), but at least, the patch ensures that at CL.ONE, this read happens
          after the write is acked to the client.

          In this patch, a counter is thus a (small) bunch of columns (one by replica),
          and hence a counter is internally a row. It could have been a super column,
          but it's not, for reasons that follow. But let me add that when (in 0.8 maybe?)
          we have super columns that we don't need to deserialize fully, it will make a
          lot of sense to change the counter to be super columns instead (and it won't
          be hard).

          CASSANDRA-1072 has a fairly important problem: it is a bit too much
          unreliable. If you do a write and for some reason (TimeoutException or loss of
          connection to the coordinator) this doesn't succeed, you have NO WAY to know
          if the increment have been counted or not. Nothing really new, standard writes
          have the same problem, but with standard writes, you just replay the update.
          With CASSANDRA-1072, you can't because that can mean incrementing the value
          twice .. or not. Given that TimeoutException can really happens, this is far
          from perfect, even though I perfectly understand that some can live with that.

          In this patch, I propose an improvement that, while maybe not perfect, improve
          this situation a lot. The idea is to tag each counter update with a unique id
          (typically a uuid, even though the implementation doesn't assume it). If an
          update timeout, you simply replay it with the same uuid and the system will
          make sure it didn't get applied twice. To do that, when a new update for a
          counter is applied, a marked column with the provided uuid will be inserted in
          the same row (remember, counters are rows). But before even applying the read,
          we try to read the marker. If we find one, we just skip the update, otherwise
          we apply (and insert the marker). The details are more complicated, in
          particular because we don't know if the same 'leader' will be chosen for the
          replay of the update, but I let people interested look at the implementation
          for details. Those marker columns are expiring column, so stay only for some
          time (gcGrace in the patch, that can change). You can choose your TTL here,
          choosing the window of time during which you can replay a failed update
          (btw, CASSANDRA-1537 will make a lot of sense for those marker columns).

          Note that those marker are only used on writes. The read path only read the
          columns that holds the counts. So the read performance is as good as it gets.

          Let's sum up: this means 2 reads on the write path, one for the marker and one
          before replication. BUT:

          • the marker is totally optional. If you don't provide one, you will not be
            able to replay a failed update, but if you can live with that, you can.
          • all those columns (for a given counter), being in the same row, you can
            assign a fair amount of row cache and make those read fairly painless.
            For the actual user of CASSANDRA-1072, it means that if you don't use uuids
            and you write at CL.ONE, you should have virtually the same write performances
            with this patch.

          All this means that all operation on counters goes trough new thrift API calls.

          The patch is split in 3 parts. The first one remove the clock structure (it is
          CASSANDRA-1502 in fact). The second part is the actual patch. The third is the
          thrift generated file changes (I join it for convenience sake only, nothing
          worth reading there).

          Now the less good news. This patch is really just out of the oven (written
          over the course of the week-end). It is clearly under-tested. Actually, the
          extends of my test are the system tests included in the patch. That is: no test
          on multiple hosts or stress tests yet. I will work on those, but I felt
          the patch was in a good enough shape that it was worth sharing and it was a
          good time to get feedbacks.

          The patch also has a few know problems:

          1. Even though I include a system test for it, the remove is broken. In some
            case, where an update issued after a delete gets in before the delete, the
            delete could be wrongly discarded. For what it's worth, I'm pretty sure
            CASSANDRA-1072 also suffers from the exact same problem. I have to think
            more about that, but any idea is welcome.
          2. There is no hints for the CounterMutation (cf. the patch). This makes the
            marker idea more fragile that it should. Those should be added at some
            point. It's also worth noting that the maker idea is not bulletproof and
            will probably never be. But it's my strong believe that it's much better
            than nothing.
          3. It could be that streaming is broken with this (for the same reason that
            CASSANDRA-1072 has code for this). It could be that it's not. I'm not
            hugely familiar with this part of the code. I have to check and fix it if
            needs be.
          4. Because it's not the coordinator but the first replica (leader) that does
            the replication, we could have to send UnavailableException back to the
            coordinator (and then back to the client). It's almost done but not
            completely.

          I will try to address all of what's above asap. But I will have very little
          time to give to this during this week, so any help (in tests and reviews in
          particular) will be appreciated. All opinions, remarks, criticisms are also
          highly appreciated.

          Show
          Sylvain Lebresne added a comment - At its core, this patch uses the same main idea than CASSANDRA-1072 , but instead of using a Clock, it splits the subparts of the context of CASSANDRA-1072 into individual columns. This has, at the very least, two advantages over the clock structure: it is much simpler in terms of code as its reuse of lot of what exists. In particular, it avoids the fairly complex byte array manipulation of CASSANDRA-1072 . each parts of the counter (that is, each pair (host id, count)) have a timestamp. This means we get full counters (increment and decrement) for free (that alone is worth it if you ask me). More anecdotally, this allows for a different approach than the one of CASSANDRA-1072 for the 'cleanContext' logic, which I believe is faster and cleaner (and hopefully right). This is more of an implementation detail however, so we can discuss it if someone find a problem. The patch also includes some modification of the patch I had once attached to CASSANDRA-580 and CASSANDRA-1397 . It adds a new verb handler when submitting a counter update that first forward the update to an actual replica of the counter. This replica, applies the update, and then replicate the update. So this patch does support the usual consistency level (for read and writes). This involves a local read on the first replicate (called leader in the patch), but at least, the patch ensures that at CL.ONE, this read happens after the write is acked to the client. In this patch, a counter is thus a (small) bunch of columns (one by replica), and hence a counter is internally a row. It could have been a super column, but it's not, for reasons that follow. But let me add that when (in 0.8 maybe?) we have super columns that we don't need to deserialize fully, it will make a lot of sense to change the counter to be super columns instead (and it won't be hard). CASSANDRA-1072 has a fairly important problem: it is a bit too much unreliable. If you do a write and for some reason (TimeoutException or loss of connection to the coordinator) this doesn't succeed, you have NO WAY to know if the increment have been counted or not. Nothing really new, standard writes have the same problem, but with standard writes, you just replay the update. With CASSANDRA-1072 , you can't because that can mean incrementing the value twice .. or not. Given that TimeoutException can really happens, this is far from perfect, even though I perfectly understand that some can live with that. In this patch, I propose an improvement that, while maybe not perfect, improve this situation a lot. The idea is to tag each counter update with a unique id (typically a uuid, even though the implementation doesn't assume it). If an update timeout, you simply replay it with the same uuid and the system will make sure it didn't get applied twice. To do that, when a new update for a counter is applied, a marked column with the provided uuid will be inserted in the same row (remember, counters are rows). But before even applying the read, we try to read the marker. If we find one, we just skip the update, otherwise we apply (and insert the marker). The details are more complicated, in particular because we don't know if the same 'leader' will be chosen for the replay of the update, but I let people interested look at the implementation for details. Those marker columns are expiring column, so stay only for some time (gcGrace in the patch, that can change). You can choose your TTL here, choosing the window of time during which you can replay a failed update (btw, CASSANDRA-1537 will make a lot of sense for those marker columns). Note that those marker are only used on writes. The read path only read the columns that holds the counts. So the read performance is as good as it gets. Let's sum up: this means 2 reads on the write path, one for the marker and one before replication. BUT: the marker is totally optional. If you don't provide one, you will not be able to replay a failed update, but if you can live with that, you can. all those columns (for a given counter), being in the same row, you can assign a fair amount of row cache and make those read fairly painless. For the actual user of CASSANDRA-1072 , it means that if you don't use uuids and you write at CL.ONE, you should have virtually the same write performances with this patch. All this means that all operation on counters goes trough new thrift API calls. The patch is split in 3 parts. The first one remove the clock structure (it is CASSANDRA-1502 in fact). The second part is the actual patch. The third is the thrift generated file changes (I join it for convenience sake only, nothing worth reading there). Now the less good news. This patch is really just out of the oven (written over the course of the week-end). It is clearly under-tested. Actually, the extends of my test are the system tests included in the patch. That is: no test on multiple hosts or stress tests yet. I will work on those, but I felt the patch was in a good enough shape that it was worth sharing and it was a good time to get feedbacks. The patch also has a few know problems: Even though I include a system test for it, the remove is broken. In some case, where an update issued after a delete gets in before the delete, the delete could be wrongly discarded. For what it's worth, I'm pretty sure CASSANDRA-1072 also suffers from the exact same problem. I have to think more about that, but any idea is welcome. There is no hints for the CounterMutation (cf. the patch). This makes the marker idea more fragile that it should. Those should be added at some point. It's also worth noting that the maker idea is not bulletproof and will probably never be. But it's my strong believe that it's much better than nothing. It could be that streaming is broken with this (for the same reason that CASSANDRA-1072 has code for this). It could be that it's not. I'm not hugely familiar with this part of the code. I have to check and fix it if needs be. Because it's not the coordinator but the first replica (leader) that does the replication, we could have to send UnavailableException back to the coordinator (and then back to the client). It's almost done but not completely. I will try to address all of what's above asap. But I will have very little time to give to this during this week, so any help (in tests and reviews in particular) will be appreciated. All opinions, remarks, criticisms are also highly appreciated.

            People

            • Assignee:
              Unassigned
              Reporter:
              Sylvain Lebresne
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development