Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Fix Version/s: 0.8 beta 1
    • Component/s: Core
    • Labels:
      None

      Description

      Break out the increment counters out of CASSANDRA-580. Classes are shared between the two features but without the plain version vector code the changeset becomes smaller and more manageable.

      1. Partitionedcountersdesigndoc.pdf
        170 kB
        Kelvin Kakugawa
      2. increment_test.py
        14 kB
        Kelvin Kakugawa
      3. CASSANDRA-1072.pre-value_refactor.patch
        253 kB
        Kelvin Kakugawa
      4. CASSANDRA-1072.121710.2.patch
        254 kB
        Kelvin Kakugawa

        Issue Links

          Activity

          Hide
          Martin Hill added a comment -

          Kelvin, thanks for working on this.

          Would it be possible to allow adding/subtracting a specific amount rather than limiting it to single incr/decr operations? (Incr/decr could be achieved by specifying 1 or -1 respectively)

          Show
          Martin Hill added a comment - Kelvin, thanks for working on this. Would it be possible to allow adding/subtracting a specific amount rather than limiting it to single incr/decr operations? (Incr/decr could be achieved by specifying 1 or -1 respectively)
          Hide
          Kelvin Kakugawa added a comment - - edited

          All of the counters being added are delta-based. So, yes, what you're looking for is supported.

          Show
          Kelvin Kakugawa added a comment - - edited All of the counters being added are delta-based. So, yes, what you're looking for is supported.
          Hide
          Johan Oskarsson added a comment -

          This is a work in progress patch, it is not complete yet but comments are welcome. Most of the code by Kelvin from CASSANDRA-580 with some adjustments by me and Adam.

          There are a few todos left such as cleaning up the AES code and further testing before finalizing the patch.

          Show
          Johan Oskarsson added a comment - This is a work in progress patch, it is not complete yet but comments are welcome. Most of the code by Kelvin from CASSANDRA-580 with some adjustments by me and Adam. There are a few todos left such as cleaning up the AES code and further testing before finalizing the patch.
          Hide
          Johan Oskarsson added a comment -

          This patch implements an increment-only counter with support for both standard and super columns. Most of the code by Kelvin Kakugawa from CASSANDRA-580 with cleanup and bug fixes by Johan Oskarsson, Adam Samet and Sylvain Lebresne. The patch also paves the way for other clock types that use contexts in a similar way, for example CASSANDRA-1132

          For example on how to use it see this thrift system test: test_incr_standard_insert

          Changes include

          • Adding a context byte array to thrift and avro clock structs, required for reconciliation of different versions
          • Adding an IncrementCounterClock that together with IncrementCounterReconciler is responsible for reconciliation etc of increment columns.
          • Adding an IContext interface and IncrementCounterContext implementation for dealing with the context byte arrays.
          • Update AES code to account for context-based clock types
          • StorageProxy: modification of write path for counter clock types (enforce CL.ONE to primary replica)
          • Exposing some methods needed in the column/container interfaces that exists in both standard and super colum types
          • Various helper methods in FBUtilities for working with byte arrays
          • System tests for incr counters (insert, remove, batch operations)
          • Unit tests for new classes and methods mentioned above
          Show
          Johan Oskarsson added a comment - This patch implements an increment-only counter with support for both standard and super columns. Most of the code by Kelvin Kakugawa from CASSANDRA-580 with cleanup and bug fixes by Johan Oskarsson, Adam Samet and Sylvain Lebresne. The patch also paves the way for other clock types that use contexts in a similar way, for example CASSANDRA-1132 For example on how to use it see this thrift system test: test_incr_standard_insert Changes include Adding a context byte array to thrift and avro clock structs, required for reconciliation of different versions Adding an IncrementCounterClock that together with IncrementCounterReconciler is responsible for reconciliation etc of increment columns. Adding an IContext interface and IncrementCounterContext implementation for dealing with the context byte arrays. Update AES code to account for context-based clock types StorageProxy: modification of write path for counter clock types (enforce CL.ONE to primary replica) Exposing some methods needed in the column/container interfaces that exists in both standard and super colum types Various helper methods in FBUtilities for working with byte arrays System tests for incr counters (insert, remove, batch operations) Unit tests for new classes and methods mentioned above
          Hide
          Johan Oskarsson added a comment -

          Updated to apply cleanly to trunk.

          Show
          Johan Oskarsson added a comment - Updated to apply cleanly to trunk.
          Hide
          Kelvin Kakugawa added a comment -
          context-based clocks
          
          interface extensions to cassandra.thrift:
            replace timestamp w/ Clock()
              Clock:
                optional long timestamp
                optional byte[] context
          
          data structure code changes:
            db.ColumnFamilyType + db.ClockType
              enums
              db.ColumnFamilyType:
                Super / Standard
              db.ClockType:
                Timestamp / IncrementCounter
          
              applied to all IColumnContainer sub-classes (CF / SC)
                checked to determine switches in code
          
            db.context package
              IContext:
                context creation + manipulation
              AbstractReconciler
                context-based clock reconciliation
          
            IncrementCounterContext
              context structure (current):
                {timestamp of last update + [(node id, count), ...]
          
              compare():
                timestamp-based compare (of last update) -- highest
          
              diff():
                tuple-based comparison
                  greater than:
                    has at least every node and each count is larger (than comparison context)
          
            db.IClock
              concrete *Clock representations
              encapsulates db.context.IContext functionality
              current sub-classes:
                TimestampClock
                IncrementCounterClock
              where the ClockType knows which contextManager (db.context.IContext) to use
          
            db.IColumn
              timestamp replaced w/ IClock
              markedForDeleteAt replaced w/ IClock
          
          algorithm code changes:
          1) on insert
            a) thrift.CassandraServer : doInsert(...)
              thrift.ThriftValidation : validateClock(Clock)
                takes a thrift Clock and creates the appropriate IClock impl
          
            b) service.StorageProxy : mutateBlocking(...)
              db.RowMutation : updateClocks()
                iterates through all CFs w/in RM
                  for any context-based CF type
                    creates appropriate context structure
                      i) counter
                        looks at value being inserted, then creates appropriate context
                          e.g. {timestamp + [(replica node id, value as long in bytes)]}
          
            c) local / remote insert
              db.Table : apply()
                CF.addColumn()
                  inserts into CSLM (ConcurrentSkipListMap) of columns_
                  if null returned,
                    then success and exit
          
                  else:
                    save delta (the associated count for the XClock being inserted)
                    pull old Column
                    use Reconciler to collapse saved delta Column w/ old Column counter clocks:
                      e.g. for incremental counters
                        i) aggregate this replica's counts
                        ii) take max of every other replica's counts
          
          2) read
            CL.ONE read:
              just pull from the first replica that answers
          
            read repair (used by QUORUM and, in the background, ONE):
              check step:
                read from each replica
                blockFor QUORUM # of replicas
                  where one replica is randomly chosen to be non-digest
                check results in service.ReadResponseResolver : resolve()
                  calculate digest for non-digest CF against all digests received
                  if they don't match:
                    then kick off repair step
          
              repair step:
                read non-digest from every replica
                blockFor QUORUM # of replicas
                fix results in service.RRR : resolve() + two other methods
                  i) assemble all versions of the CF from replicas received
                  ii) create a "resolved" CF via CF.resolve()
                    CF.resolve(other CF)
                      CF.addAll(other CF)
                        calls CF.addColumn() for each IColumn in the other CF
                  iii) for each version received, create a repair version to be sent to that replica
                    repairCF = reconciledCF.diff(versionCF)
                    if null,
                      skip
                    call: repairCF.cleanNodeCounts(replica to repair)
                      wipes out all the counts for the given replica in every *CounterClock in the CF 
                    otherwise, send RM w/ repairCF under read-repair verb 
          
          3) compaction
            uses same CF.addColumn() code path to aggregate Columns across SSTs
              nothing special
          
          4) AES
            uses a modified compaction iterator
              service.AntiEntropyService : doAESCompaction()
                that applies the same code path from read-repair:
                  XCounterClock : cleanNodeCounts(InetAddress replica)
          
                so, that the IClock contexts being created to repair the remote replicas
                  do not send over the counts for that given replica
          
          Show
          Kelvin Kakugawa added a comment - context-based clocks interface extensions to cassandra.thrift: replace timestamp w/ Clock() Clock: optional long timestamp optional byte[] context data structure code changes: db.ColumnFamilyType + db.ClockType enums db.ColumnFamilyType: Super / Standard db.ClockType: Timestamp / IncrementCounter applied to all IColumnContainer sub-classes (CF / SC) checked to determine switches in code db.context package IContext: context creation + manipulation AbstractReconciler context-based clock reconciliation IncrementCounterContext context structure (current): {timestamp of last update + [(node id, count), ...] compare(): timestamp-based compare (of last update) -- highest diff(): tuple-based comparison greater than: has at least every node and each count is larger (than comparison context) db.IClock concrete *Clock representations encapsulates db.context.IContext functionality current sub-classes: TimestampClock IncrementCounterClock where the ClockType knows which contextManager (db.context.IContext) to use db.IColumn timestamp replaced w/ IClock markedForDeleteAt replaced w/ IClock algorithm code changes: 1) on insert a) thrift.CassandraServer : doInsert(...) thrift.ThriftValidation : validateClock(Clock) takes a thrift Clock and creates the appropriate IClock impl b) service.StorageProxy : mutateBlocking(...) db.RowMutation : updateClocks() iterates through all CFs w/in RM for any context-based CF type creates appropriate context structure i) counter looks at value being inserted, then creates appropriate context e.g. {timestamp + [(replica node id, value as long in bytes)]} c) local / remote insert db.Table : apply() CF.addColumn() inserts into CSLM (ConcurrentSkipListMap) of columns_ if null returned, then success and exit else: save delta (the associated count for the XClock being inserted) pull old Column use Reconciler to collapse saved delta Column w/ old Column counter clocks: e.g. for incremental counters i) aggregate this replica's counts ii) take max of every other replica's counts 2) read CL.ONE read: just pull from the first replica that answers read repair (used by QUORUM and, in the background, ONE): check step: read from each replica blockFor QUORUM # of replicas where one replica is randomly chosen to be non-digest check results in service.ReadResponseResolver : resolve() calculate digest for non-digest CF against all digests received if they don't match: then kick off repair step repair step: read non-digest from every replica blockFor QUORUM # of replicas fix results in service.RRR : resolve() + two other methods i) assemble all versions of the CF from replicas received ii) create a "resolved" CF via CF.resolve() CF.resolve(other CF) CF.addAll(other CF) calls CF.addColumn() for each IColumn in the other CF iii) for each version received, create a repair version to be sent to that replica repairCF = reconciledCF.diff(versionCF) if null, skip call: repairCF.cleanNodeCounts(replica to repair) wipes out all the counts for the given replica in every *CounterClock in the CF otherwise, send RM w/ repairCF under read-repair verb 3) compaction uses same CF.addColumn() code path to aggregate Columns across SSTs nothing special 4) AES uses a modified compaction iterator service.AntiEntropyService : doAESCompaction() that applies the same code path from read-repair: XCounterClock : cleanNodeCounts(InetAddress replica) so, that the IClock contexts being created to repair the remote replicas do not send over the counts for that given replica
          Hide
          Kelvin Kakugawa added a comment -

          merge from trunk

          conflicts:
          service.AntiEntropyService

          note:
          TimestampClock uses the same path as trunk
          IncrementCounterClock uses the path in trunk-1072 (i.e. prone to streaming problems)

          Show
          Kelvin Kakugawa added a comment - merge from trunk conflicts: service.AntiEntropyService note: TimestampClock uses the same path as trunk IncrementCounterClock uses the path in trunk-1072 (i.e. prone to streaming problems)
          Hide
          Johan Oskarsson added a comment -

          Update to make sure mutate and blocking mutate in StorageProxy both have support for counters.

          Show
          Johan Oskarsson added a comment - Update to make sure mutate and blocking mutate in StorageProxy both have support for counters.
          Hide
          Sylvain Lebresne added a comment -

          A bunch of remarks/comments/questions on the patch.

          • I think the replication logic should be fixed before the patch could get in.
            About that, for consistency >= ONE, wouldn't it work to pick a replica, send
            it the update, wait for the ack and then, send it to all other nodes
            (then blocking for whatever number of node we need to achieve consistency) ?
            Please tell me if I'm completely off here.
          • I think that if client insert negative value for increment counters, bad
            stuffs will happened. The code should check for it (in Thrift validation
            most probably). It should also check that the value is a long.
          • IncrementCounterReconciler.updateDeleteTimestamp() sets the delete
            timestamp of the live column to the max of the timestamps of the live and
            deleted columns. Shouldn't it set the max of the 'delete' timestamps ?
          • I'm not convinced by the thrift API for creating increment counter. Calling
            Clock() doesn't at all make it explicit that we want increment counter. More
            generally, will we ever want to expose the binary context to the client ?
          • There is a bunch of code duplication for AESCompactionIterator, even though
            it is just an AntiCompactionIterator that cleans contexts. Would it be better
            to merge those two by adding a flag saying if the context must be cleaned or
            not ? Even if we don't, the switch in doAESCompaction could be removed.
          • In ReadResponseResolver.resolveSuperset(), the cf.cloneMe() is changed in
            cf.cloneMeShallow(). Out of curiosity, is there a reason for it other than
            for efficiency and are we sure it is safe to do ?
          • What about increment counters when we have
            https://issues.apache.org/jira/browse/CASSANDRA-1210 ? I don't know what was
            planned for the latter one but if it requires few changes with respect to
            the increment only counters (don't a map of id -> (count, version) instead
            of a map of id -> counts suffices ?), maybe we should got with #1210 right
            away ?

          Other more minor comments:

          • In IncrementCounterReconciler.reconcile(), I think it would be more clear to
            replace
            if (clock.size() == DBConstants.intSize_ + IncrementCounterContext.HEADER_LENGTH)
            by
            if (clock.context.length == IncrementCounterContext.HEADER_LENGTH)
            since it's weird to compare the serialized size here.
            Moreover, if we check upfront that clients cannot put bad values in counter
            updates, the checks that follows are unnecessary.
          • In IncrementCountContext.merge(), when computing the sum for a given id, the
            code checks if id is in contextsMap and if not do a get and a put. It should
            be a bit more efficient to start by doing a put of (id, count), and since a
            put returns the old value, to correct by a second put if needed.
          • Column.comparePriority() is now dead code.
          • IncrementCounterContext.diff() doesn't respect the code style for a few
            braces.
          • The constructor with no argument of TimestampClock should be removed.
          • Last, and probably least, but I'll still mention it. The code uses at
            multiple times the pattern:
            someloop
            Unknown macro: { if (somecondition) { doSomething; continue; } doSomethingElse; }

            I'm not fond of it as I find it more difficult to follow what gets executed
            than with a good old fashioned 'else'. But maybe that's me.

          Show
          Sylvain Lebresne added a comment - A bunch of remarks/comments/questions on the patch. I think the replication logic should be fixed before the patch could get in. About that, for consistency >= ONE, wouldn't it work to pick a replica, send it the update, wait for the ack and then, send it to all other nodes (then blocking for whatever number of node we need to achieve consistency) ? Please tell me if I'm completely off here. I think that if client insert negative value for increment counters, bad stuffs will happened. The code should check for it (in Thrift validation most probably). It should also check that the value is a long. IncrementCounterReconciler.updateDeleteTimestamp() sets the delete timestamp of the live column to the max of the timestamps of the live and deleted columns. Shouldn't it set the max of the 'delete' timestamps ? I'm not convinced by the thrift API for creating increment counter. Calling Clock() doesn't at all make it explicit that we want increment counter. More generally, will we ever want to expose the binary context to the client ? There is a bunch of code duplication for AESCompactionIterator, even though it is just an AntiCompactionIterator that cleans contexts. Would it be better to merge those two by adding a flag saying if the context must be cleaned or not ? Even if we don't, the switch in doAESCompaction could be removed. In ReadResponseResolver.resolveSuperset(), the cf.cloneMe() is changed in cf.cloneMeShallow(). Out of curiosity, is there a reason for it other than for efficiency and are we sure it is safe to do ? What about increment counters when we have https://issues.apache.org/jira/browse/CASSANDRA-1210 ? I don't know what was planned for the latter one but if it requires few changes with respect to the increment only counters (don't a map of id -> (count, version) instead of a map of id -> counts suffices ?), maybe we should got with #1210 right away ? Other more minor comments: In IncrementCounterReconciler.reconcile(), I think it would be more clear to replace if (clock.size() == DBConstants.intSize_ + IncrementCounterContext.HEADER_LENGTH) by if (clock.context.length == IncrementCounterContext.HEADER_LENGTH) since it's weird to compare the serialized size here. Moreover, if we check upfront that clients cannot put bad values in counter updates, the checks that follows are unnecessary. In IncrementCountContext.merge(), when computing the sum for a given id, the code checks if id is in contextsMap and if not do a get and a put. It should be a bit more efficient to start by doing a put of (id, count), and since a put returns the old value, to correct by a second put if needed. Column.comparePriority() is now dead code. IncrementCounterContext.diff() doesn't respect the code style for a few braces. The constructor with no argument of TimestampClock should be removed. Last, and probably least, but I'll still mention it. The code uses at multiple times the pattern: someloop Unknown macro: { if (somecondition) { doSomething; continue; } doSomethingElse; } I'm not fond of it as I find it more difficult to follow what gets executed than with a good old fashioned 'else'. But maybe that's me.
          Hide
          Kelvin Kakugawa added a comment -

          Handling CL > ONE:
          My thoughts would be to send a write to multiple nodes (but, still, for one node id). And, have a node (that's not responsible for that node id) still aggregate the counts it has for the given node id. i.e. it would still help nodes "catch up" to that node id's total count. The big caveat would be that the initial write path would need to be special-cased.

          thrift API:
          Yes, you're right. And, right now, the binary context is primary useful for debugging. The thrift interface isn't very explicit. It's necessary for vector clocks, 580, but not very useful for 1072 / 1210.

          RRR.resolveSuperset():
          As you suspected, it's not an efficiency-motivated modification. If cloneMe() is used, there's the potential to aggregate a given node id's counts an extra time (from that initial cloneMe() call).

          1210 inclusion:
          It's a relatively distinct extension, so we figured that we could wait for 1072 to go through, first.

          coding style--avoiding else:
          Yeah, it's personal preference. I hate indents, so I consciously avoid else.

          Show
          Kelvin Kakugawa added a comment - Handling CL > ONE: My thoughts would be to send a write to multiple nodes (but, still, for one node id). And, have a node (that's not responsible for that node id) still aggregate the counts it has for the given node id. i.e. it would still help nodes "catch up" to that node id's total count. The big caveat would be that the initial write path would need to be special-cased. thrift API: Yes, you're right. And, right now, the binary context is primary useful for debugging. The thrift interface isn't very explicit. It's necessary for vector clocks, 580, but not very useful for 1072 / 1210. RRR.resolveSuperset(): As you suspected, it's not an efficiency-motivated modification. If cloneMe() is used, there's the potential to aggregate a given node id's counts an extra time (from that initial cloneMe() call). 1210 inclusion: It's a relatively distinct extension, so we figured that we could wait for 1072 to go through, first. coding style--avoiding else: Yeah, it's personal preference. I hate indents, so I consciously avoid else.
          Hide
          Jonathan Ellis added a comment -

          I'm going to need help understanding why special-casing some write paths isn't a sign we're Doing It Wrong.

          Show
          Jonathan Ellis added a comment - I'm going to need help understanding why special-casing some write paths isn't a sign we're Doing It Wrong.
          Hide
          Sylvain Lebresne added a comment -

          Handling CL > ONE:
          My thoughts would be to send a write to multiple nodes (but, still, for one node id). And, have a node (that's not responsible for that node id) still aggregate the counts it has for the given node id. i.e. it would still help nodes "catch up" to that node id's total count. The big caveat would be that the initial write path would need to be special-cased.

          I realize now that it can be a bit tricky. I agree that we would kind of want
          that nodes always aggregate the counts it has for all id (or did I
          misunderstand what you're suggesting ?), as otherwise you'll fairly rarely get
          the more up to date version for a counter without CL.ALL (or am I missing
          something ?). For the 'the initial write path would need to be special-cased',
          couldn't that be dealt using the context ? From what I understand, what we
          want is to differentiate between initial update coming from the client (that
          can be added to our previously known count) and update that comes for conflict
          resolution (read repairs and such, where we want to keep the more up to date
          value). Maybe a 'freshness' flag could be added to the context header, that
          would be initially to true and switch to false by any resolution. Is that
          completely stupid ?

          In any case, if we sent the write to multiple nodes, shouldn't we ensure that
          the node responsible for the write did get it ? Seems to me the algorithm rely
          on the fact that each node have the right count for it's count id.

          Not sure I see very clearly yet in all that (But I'm willing to see the light ).

          RRR.resolveSuperset():
          As you suspected, it's not an efficiency-motivated modification. If cloneMe() is used, there's the potential to aggregate a given node id's counts an extra time (from that initial cloneMe() call).

          Very true.

          Show
          Sylvain Lebresne added a comment - Handling CL > ONE: My thoughts would be to send a write to multiple nodes (but, still, for one node id). And, have a node (that's not responsible for that node id) still aggregate the counts it has for the given node id. i.e. it would still help nodes "catch up" to that node id's total count. The big caveat would be that the initial write path would need to be special-cased. I realize now that it can be a bit tricky. I agree that we would kind of want that nodes always aggregate the counts it has for all id (or did I misunderstand what you're suggesting ?), as otherwise you'll fairly rarely get the more up to date version for a counter without CL.ALL (or am I missing something ?). For the 'the initial write path would need to be special-cased', couldn't that be dealt using the context ? From what I understand, what we want is to differentiate between initial update coming from the client (that can be added to our previously known count) and update that comes for conflict resolution (read repairs and such, where we want to keep the more up to date value). Maybe a 'freshness' flag could be added to the context header, that would be initially to true and switch to false by any resolution. Is that completely stupid ? In any case, if we sent the write to multiple nodes, shouldn't we ensure that the node responsible for the write did get it ? Seems to me the algorithm rely on the fact that each node have the right count for it's count id. Not sure I see very clearly yet in all that (But I'm willing to see the light ). RRR.resolveSuperset(): As you suspected, it's not an efficiency-motivated modification. If cloneMe() is used, there's the potential to aggregate a given node id's counts an extra time (from that initial cloneMe() call). Very true.
          Hide
          Kelvin Kakugawa added a comment -

          I like the line of thinking that you have: using the context to differentiate writes and repairs. Let me investigate this approach and work out the code.

          Yes, you're right about the primary node needing to receive the write. maybe hinted-handoff? Although, I'll investigate a solution that doesn't rely on HH.

          Show
          Kelvin Kakugawa added a comment - I like the line of thinking that you have: using the context to differentiate writes and repairs. Let me investigate this approach and work out the code. Yes, you're right about the primary node needing to receive the write. maybe hinted-handoff? Although, I'll investigate a solution that doesn't rely on HH.
          Hide
          Sylvain Lebresne added a comment -

          Not sure we can 'trust' HH since we can't be sure that they will be
          delivered correctly (plus a node could not be hinted but still not get
          the write if we don't wait for it and are unlucky).
          But it's probably not too hard to change the writeResponseHandler
          so that it at least wait for a specific node. And if we choose a live node
          and the 'closest' one (either local or using the snitch), it's probably
          fine.

          Other than that, I must say that I find a bit sad that to be sure you
          get the more up to date version of a count you have to use CL.ALL.
          But truth is I don't see a way around it.

          Show
          Sylvain Lebresne added a comment - Not sure we can 'trust' HH since we can't be sure that they will be delivered correctly (plus a node could not be hinted but still not get the write if we don't wait for it and are unlucky). But it's probably not too hard to change the writeResponseHandler so that it at least wait for a specific node. And if we choose a live node and the 'closest' one (either local or using the snitch), it's probably fine. Other than that, I must say that I find a bit sad that to be sure you get the more up to date version of a count you have to use CL.ALL. But truth is I don't see a way around it.
          Hide
          Johan Oskarsson added a comment -

          This updated patch addresses most concerns in the review, with the exception of the replication logic that Kelvin is working on.

          • Check for invalid/negative input. Includes unit test.
          • Pick the correct timestamp in updateDeleteTimestamp, added unit test
          • Collapsed duplicate AES code into one
          • Clarified IncrementCounterReconciler.reconcile() code
          • IncrementCountContext.merge() refactor as suggested
          • Removed dead code in Column
          • IncrementCounterContext.diff() reformat
          • Removed TimestampClock constructor
          Show
          Johan Oskarsson added a comment - This updated patch addresses most concerns in the review, with the exception of the replication logic that Kelvin is working on. Check for invalid/negative input. Includes unit test. Pick the correct timestamp in updateDeleteTimestamp, added unit test Collapsed duplicate AES code into one Clarified IncrementCounterReconciler.reconcile() code IncrementCountContext.merge() refactor as suggested Removed dead code in Column IncrementCounterContext.diff() reformat Removed TimestampClock constructor
          Hide
          Ben Standefer added a comment -

          What is the probable release that would include this? 0.7 in 6-8 weeks-ish?

          Show
          Ben Standefer added a comment - What is the probable release that would include this? 0.7 in 6-8 weeks-ish?
          Hide
          Johan Oskarsson added a comment -

          Updated patch, the following changes and some minor fixes are in it. Most code by Kelvin and some testing + minor changes by me.

          • Don't guess clock type from thrift input, read from DatabaseDescriptor.
          • Context is removed from public api since it's not needed yet.
          • Removed cleanup logic and replaced with a flag in context as suggested by Sylvain.
          Show
          Johan Oskarsson added a comment - Updated patch, the following changes and some minor fixes are in it. Most code by Kelvin and some testing + minor changes by me. Don't guess clock type from thrift input, read from DatabaseDescriptor. Context is removed from public api since it's not needed yet. Removed cleanup logic and replaced with a flag in context as suggested by Sylvain.
          Hide
          Jonathan Ellis added a comment -

          As commented above, I need help understanding why this is necessary:

          + /**
          + * perform secondary writes, if necessary
          + *
          + * use case: support distributed counter writes for CL > ONE
          + * rationale:
          + * distributed counters need to be first written to one replica
          + * to be correctly accounted for
          + * then, for CL > ONE, writes can be sent to other replicas
          + * to help them catch up to the total count of the first replica
          + */

          Is that what you mean by "the replication logic that Kelvin is working on?"

          Show
          Jonathan Ellis added a comment - As commented above, I need help understanding why this is necessary: + /** + * perform secondary writes, if necessary + * + * use case: support distributed counter writes for CL > ONE + * rationale: + * distributed counters need to be first written to one replica + * to be correctly accounted for + * then, for CL > ONE, writes can be sent to other replicas + * to help them catch up to the total count of the first replica + */ Is that what you mean by "the replication logic that Kelvin is working on?"
          Hide
          Kelvin Kakugawa added a comment -

          The secondary write path is necessary, because we need to make sure that the total writes to the primary (target) replica are correct. Let's take a scenario, we have 3 replicas w/ associated counts:
          A: [(A, 10)]
          B: [(A, 3)]
          C: [(A, 5)]

          i.e. node A accepted some combination of increments until its count reached 10. node B was repaired (via read repair or AES) from A's counter value when it was at 3. node C was repaired when A's count was at 5.

          So, if we use the unmodified write path and attempt this write:
          [(A, 1)]

          i.e. commit a write to node A w/ a delta of 1.

          Let's imagine a scenario where the write to node A fails, but the writes to B and C succeed:
          A: [(A, 10)]
          B: [(A, 4)]
          C: [(A, 6)]

          When the counter eventually repairs itself, that write will be considered lost. Since, the primary replica did not receive that write and the other replicas were behind. So, the reason why we need to ensure the primary write succeeds, first, is to ensure that the write has been accounted for in the system. The secondary writes are just optimistic writes to help the other replicas play catch-up.

          Show
          Kelvin Kakugawa added a comment - The secondary write path is necessary, because we need to make sure that the total writes to the primary (target) replica are correct. Let's take a scenario, we have 3 replicas w/ associated counts: A: [(A, 10)] B: [(A, 3)] C: [(A, 5)] i.e. node A accepted some combination of increments until its count reached 10. node B was repaired (via read repair or AES) from A's counter value when it was at 3. node C was repaired when A's count was at 5. So, if we use the unmodified write path and attempt this write: [(A, 1)] i.e. commit a write to node A w/ a delta of 1. Let's imagine a scenario where the write to node A fails, but the writes to B and C succeed: A: [(A, 10)] B: [(A, 4)] C: [(A, 6)] When the counter eventually repairs itself, that write will be considered lost. Since, the primary replica did not receive that write and the other replicas were behind. So, the reason why we need to ensure the primary write succeeds, first, is to ensure that the write has been accounted for in the system. The secondary writes are just optimistic writes to help the other replicas play catch-up.
          Hide
          Jonathan Ellis added a comment -

          Sounds like the problem is you are trying to be too clever by making increment too low-level. It's fine to have increment be a special op at the thrift level, but internally we need to deal with values and clocks otherwise we lose the ability to detect conflicts, as in your example. If A should send out (11:

          {A: 11}

          ) – that is, (value,

          {clock}

          ), then B and C can tell that their clocks are ancestors of A's w/ no conflicts.

          This does mean that we need to keep conflict history for up to GCGraceSeconds (good discussion at http://pl.atyp.us/wordpress/?p=2601) but that is much more acceptable than compromising our fully-distributed design. I also suspect it would be much less code, as well as more generalized.

          Show
          Jonathan Ellis added a comment - Sounds like the problem is you are trying to be too clever by making increment too low-level. It's fine to have increment be a special op at the thrift level, but internally we need to deal with values and clocks otherwise we lose the ability to detect conflicts, as in your example. If A should send out (11: {A: 11} ) – that is, (value, {clock} ), then B and C can tell that their clocks are ancestors of A's w/ no conflicts. This does mean that we need to keep conflict history for up to GCGraceSeconds (good discussion at http://pl.atyp.us/wordpress/?p=2601 ) but that is much more acceptable than compromising our fully-distributed design. I also suspect it would be much less code, as well as more generalized.
          Hide
          Sylvain Lebresne added a comment -

          I may be wrong here, but I think that it is a bit harder in
          Cassandra. At least, here's a scenario I'm unsure how to deal with unless you
          distinguish a primary replica for each write (as done in Kelvin algorithm).

          Take a cluster with RP=3. Let's call A, B and C the 3 node hosting some
          counter c. Now say you do writes with CL.QUORUM (or less) and the writes are
          initially done on some other node D that dispatch the writes to A, B and C.
          Consider the two following scenario:

          S1: you do 2 simple increments (+1) and everything works fine.
          A, B and C receives those and each have 2 columns whose values are 1 and whose
          clocks are ... (that's the point, not sure what are those clock to make it
          work).

          S2: you do 3 simple increments (+1). For some reason you are (really) unlucky
          and on each write one of the node (each time a different one) don't get it.
          So you also end up with each node (A, B and C) having 2 columns whose values
          are 1 and whose clocks are ... (same as in S1). But note that it's CL.QUORUM,
          we have acknowledged the writes and should return 3 on a CL.QUORUM or CL.ALL
          read.

          I don't see any reasonable values for the clocks so that upon a read (even at
          CL.ALL) both those scenario returns the right value.

          Sorry if I miss something simple, but I'll be interested by any idea/solution.

          Show
          Sylvain Lebresne added a comment - I may be wrong here, but I think that it is a bit harder in Cassandra. At least, here's a scenario I'm unsure how to deal with unless you distinguish a primary replica for each write (as done in Kelvin algorithm). Take a cluster with RP=3. Let's call A, B and C the 3 node hosting some counter c. Now say you do writes with CL.QUORUM (or less) and the writes are initially done on some other node D that dispatch the writes to A, B and C. Consider the two following scenario: S1: you do 2 simple increments (+1) and everything works fine. A, B and C receives those and each have 2 columns whose values are 1 and whose clocks are ... (that's the point, not sure what are those clock to make it work). S2: you do 3 simple increments (+1). For some reason you are (really) unlucky and on each write one of the node (each time a different one) don't get it. So you also end up with each node (A, B and C) having 2 columns whose values are 1 and whose clocks are ... (same as in S1). But note that it's CL.QUORUM, we have acknowledged the writes and should return 3 on a CL.QUORUM or CL.ALL read. I don't see any reasonable values for the clocks so that upon a read (even at CL.ALL) both those scenario returns the right value. Sorry if I miss something simple, but I'll be interested by any idea/solution.
          Hide
          Kelvin Kakugawa added a comment -

          Attached a design document for the incremental counters.

          We would like to help bring more ppl into the discussion of implementation details. So, a design doc that presents the underlying logic in a more composed format was put together. It's a little rough, as in the prose isn't fully fleshed out, but the major ideas are all present.

          We also decided to remove the secondary write path and write flag logic. We discovered an edge case that makes it unfeasible. So, the 0.7 AES implementation needs to be modified, accordingly, as well.

          Show
          Kelvin Kakugawa added a comment - Attached a design document for the incremental counters. We would like to help bring more ppl into the discussion of implementation details. So, a design doc that presents the underlying logic in a more composed format was put together. It's a little rough, as in the prose isn't fully fleshed out, but the major ideas are all present. We also decided to remove the secondary write path and write flag logic. We discovered an edge case that makes it unfeasible. So, the 0.7 AES implementation needs to be modified, accordingly, as well.
          Hide
          Mike Peters added a comment -

          Kelvin, thank you for putting the document together. Much appreciated.

          The one thing that is a big concern to us is the limitation of reads requiring cl.ALL. With a large number of nodes, it sounds like this will render reads not suitable for any real-time user-facing queries. There has to be a way around this? Didn't the folks at Digg implement version-clocks with client side resolution not requiring cl.ALL for reads?

          Server side resolution is definitely the way to go. Just wondering if we can have the aes/resolution take place "offline", not requiring cl.ALL for reads.

          Show
          Mike Peters added a comment - Kelvin, thank you for putting the document together. Much appreciated. The one thing that is a big concern to us is the limitation of reads requiring cl.ALL. With a large number of nodes, it sounds like this will render reads not suitable for any real-time user-facing queries. There has to be a way around this? Didn't the folks at Digg implement version-clocks with client side resolution not requiring cl.ALL for reads? Server side resolution is definitely the way to go. Just wondering if we can have the aes/resolution take place "offline", not requiring cl.ALL for reads.
          Hide
          Ryan King added a comment -

          Mike-

          You might have a misunderstanding- cl.ALL means all replicas, not all nodes. So a "large number of nodes" doesn't matter, on the replication factor.

          Show
          Ryan King added a comment - Mike- You might have a misunderstanding- cl.ALL means all replicas, not all nodes. So a "large number of nodes" doesn't matter, on the replication factor.
          Hide
          Mike Peters added a comment -

          Ah you're totally right Ryan! My bad. Just being anxious about this feature

          Show
          Mike Peters added a comment - Ah you're totally right Ryan! My bad. Just being anxious about this feature
          Hide
          Johan Oskarsson added a comment -

          Updates since last version of the patch.

          • Removed secondary write path and write flag due to concerns voiced and issues discovered.
          • Due to the above changes we have restored the clean operation for AES. The core of this from the 0.6 patch has been tried and tested in production.
          • Had to modify parts of the new streaming code to pass on the type of streaming operation performed in order to incorporate the clean operation.
          • Minor code cleanup, license headers, javadoc adjustments etc.

          For further information about this patch see the design doc attached to this jira.
          All unit and system tests pass for this patch. Including custom tests on a 10 node cluster.
          Work done by the usual suspects seen above.

          Show
          Johan Oskarsson added a comment - Updates since last version of the patch. Removed secondary write path and write flag due to concerns voiced and issues discovered. Due to the above changes we have restored the clean operation for AES. The core of this from the 0.6 patch has been tried and tested in production. Had to modify parts of the new streaming code to pass on the type of streaming operation performed in order to incorporate the clean operation. Minor code cleanup, license headers, javadoc adjustments etc. For further information about this patch see the design doc attached to this jira. All unit and system tests pass for this patch. Including custom tests on a 10 node cluster. Work done by the usual suspects seen above.
          Hide
          Johan Oskarsson added a comment -

          An issue was found in the AES code, will reroll patch and upload again.

          Show
          Johan Oskarsson added a comment - An issue was found in the AES code, will reroll patch and upload again.
          Hide
          Jonathan Ellis added a comment -

          I looked at the design document, but I didn't see the motivation for special-case code for counters, instead of dealing with it with the more general version vector code from CASSANDRA-580? Did I miss that?

          Requiring CL.ALL on reads means it's basically unusable across multiple DCs, which seems to me that it's not really all that much of an improvement over using something like Cages w/in a single DC.

          Show
          Jonathan Ellis added a comment - I looked at the design document, but I didn't see the motivation for special-case code for counters, instead of dealing with it with the more general version vector code from CASSANDRA-580 ? Did I miss that? Requiring CL.ALL on reads means it's basically unusable across multiple DCs, which seems to me that it's not really all that much of an improvement over using something like Cages w/in a single DC.
          Hide
          Kelvin Kakugawa added a comment -

          The caveat wrt version vectors is that we need to do a read before a write.

          Let me investigate Cages. I believe you mean this project:
          http://code.google.com/p/cages/

          Show
          Kelvin Kakugawa added a comment - The caveat wrt version vectors is that we need to do a read before a write. Let me investigate Cages. I believe you mean this project: http://code.google.com/p/cages/
          Hide
          Kelvin Kakugawa added a comment -

          I took a look at Cages. It's interesting as a platform for transactions on top of Cassandra.

          Unfortunately, that's not amenable to high performance counts. ZK clusters only have one leader--i.e. only one node can be written to, at any given time. It's not feasible to:
          1) acquire a ZK lock
          2) perform a quorum read
          3) perform a quorum write, then
          4) release the ZK lock

          for every increment. I've heard of the aftermath of such an approach (ZK locks to coordinate high performance counts on top of Cassandra) and it was not pretty. Even this blog post:
          http://ria101.wordpress.com/2010/05/12/locking-and-transactions-over-cassandra-using-cages/

          mentions that to scale the locks, you'll have to start sharding the ZK clusters. I completely understand this service for more complicated operations on top of Cassandra, but not for high performance counters.

          The distributed counter code allows increments to be written straight into a cass cluster. (No reads or locks are required.) And, each replica aggregates all of the deltas that were originally written to it, plus idempotently repairs the increments written to the other replicas (to eventually learn the total counts for a given key).

          Granted, a CL.ONE read may be inconsistent to a certain degree. However, eventually, each replica will learn all the increments written to all replicas. If you need to know the exact count for a given key, then you will have to pay for a CL.ALL read. However, this is true for any use case where writes aren't CL.QUORUM.

          Please review Helland's Building on Quicksand paper (sections 5 & 6) for a more academic understanding of the approach implemented by #1072.

          Show
          Kelvin Kakugawa added a comment - I took a look at Cages. It's interesting as a platform for transactions on top of Cassandra. Unfortunately, that's not amenable to high performance counts. ZK clusters only have one leader--i.e. only one node can be written to, at any given time. It's not feasible to: 1) acquire a ZK lock 2) perform a quorum read 3) perform a quorum write, then 4) release the ZK lock for every increment. I've heard of the aftermath of such an approach (ZK locks to coordinate high performance counts on top of Cassandra) and it was not pretty. Even this blog post: http://ria101.wordpress.com/2010/05/12/locking-and-transactions-over-cassandra-using-cages/ mentions that to scale the locks, you'll have to start sharding the ZK clusters. I completely understand this service for more complicated operations on top of Cassandra, but not for high performance counters. The distributed counter code allows increments to be written straight into a cass cluster. (No reads or locks are required.) And, each replica aggregates all of the deltas that were originally written to it, plus idempotently repairs the increments written to the other replicas (to eventually learn the total counts for a given key). Granted, a CL.ONE read may be inconsistent to a certain degree. However, eventually, each replica will learn all the increments written to all replicas. If you need to know the exact count for a given key, then you will have to pay for a CL.ALL read. However, this is true for any use case where writes aren't CL.QUORUM. Please review Helland's Building on Quicksand paper (sections 5 & 6) for a more academic understanding of the approach implemented by #1072.
          Hide
          Johan Oskarsson added a comment -

          An updated version of the patch.

          • merged with the latest streaming changes in trunk
          • contains the fix to the AES code and a bit of refactoring.

          We have once again created a new issue with a chunk of 1072 in it for simplicity, making it easier to review and commit.
          The new AES code can be found in CASSANDRA-1375, it is however also in this patch until 1375 has been committed, so this issue can be reviewed on it's own.

          Show
          Johan Oskarsson added a comment - An updated version of the patch. merged with the latest streaming changes in trunk contains the fix to the AES code and a bit of refactoring. We have once again created a new issue with a chunk of 1072 in it for simplicity, making it easier to review and commit. The new AES code can be found in CASSANDRA-1375 , it is however also in this patch until 1375 has been committed, so this issue can be reviewed on it's own.
          Hide
          Jonathan Ellis added a comment -

          True, you do have to read-before-write for version vectors, but that's a pretty low price compared to

          • losing availability if a single node is down
          • a large amount of special-case code for one new op type

          I'd very much like to explore the version vector approach in 580 more, before deciding that we need this extra complexity to achieve reasonable performance. Because I don't think that will be necessary.

          Show
          Jonathan Ellis added a comment - True, you do have to read-before-write for version vectors, but that's a pretty low price compared to losing availability if a single node is down a large amount of special-case code for one new op type I'd very much like to explore the version vector approach in 580 more, before deciding that we need this extra complexity to achieve reasonable performance. Because I don't think that will be necessary.
          Hide
          Sylvain Lebresne added a comment -

          Having made some though, I see a few important difficulties/problem (in the
          context of Cassandra) with a version vector (or vector clock, even though I
          believe it is ill-suited for counters) approach :

          • as said, it involves a read-before-write. But I think it's worth than that
            in that it involves an atomic 'read-then-write' (but maybe you already included
            that in your comment, in which case I disagree on the 'pretty low price').
            I, at least, don't see how, without atomicity between the read and the
            write, you can ensure that you don't fall into one of:
            1. ending with two different values having exact same vector
            2. missing some increments
          • http://pl.atyp.us/wordpress/?p=2601 explains clearly why in such approach
            you'll have to keep some history to be able to resolve later conflict. But
            it only show the beginning of the problems. Take, as in this post, the
            example of 3 nodes (and RF=3). Because stuffs can get lost, it could be very
            well be that on node 1 you'll end up receiving say both:
            • the value 3 with a version clock [0, 1, 2]
            • the value 3 with a version clock [0, 2, 1]
              and nothing before that (cause node 1 was dead during those first updates).
              Then you're kinda screwed. You could get back on your feet if you have the
              historic of the update of node 2 and 3, but that means that not only you
              have to keep an historic of updates for some time, you'll have to send this
              historic between nodes. Pretty sure that'll get messy.

          If I'm wrong here, please feel free to correct me, I'd love to be wrong.

          Now, about the current idea implemented, I want to believe that we could lift
          some of (what I believe to be) the main limitation, namely that we have to
          read at CL.ALL, by using some kind of repair-on-write (btw, the idea is not
          mine, but Kelvin's).
          That is, a write would be:

          1. send the write to one chosen replica.
          2. this replica write the increment locally.
          3. then he reads locally (which ensure that he have the current correct
            count for the part of the counter he is responsible for)
          4. he sends what he has read to other replicas and wait for a number of
            acks that depends on CL.

          By choosing how many replicas we wait for in 4 before answering the whole
          write query, we'll ensure the usual consistency level insurances (that is
          write then read at QUORUM ensures consistency).
          Sure, a write will require a local read (but only 1 btw), but in that context
          I do believe it's a low price to pay (plus CL.ONE don't have to wait for it to
          succeed, if high write throughput counter is what you really need).

          It is true that this will require a special verb handler for the operation.
          But there is nothing incredibly new in the operations it will perform so I
          believe that it could be written with not so much new specific code.

          It is also true that point 1 makes the approach slightly more fragile that
          usual writes, as the chosen replica may timeout on us even though other nodes
          wouldn't have, but after all it makes it only as fragile as our reads (since
          we ever only ask the actual value to a chosen node).

          Sorry for the long post, especially if that turns out to be stupid.

          Show
          Sylvain Lebresne added a comment - Having made some though, I see a few important difficulties/problem (in the context of Cassandra) with a version vector (or vector clock, even though I believe it is ill-suited for counters) approach : as said, it involves a read-before-write. But I think it's worth than that in that it involves an atomic 'read-then-write' (but maybe you already included that in your comment, in which case I disagree on the 'pretty low price'). I, at least, don't see how, without atomicity between the read and the write, you can ensure that you don't fall into one of: ending with two different values having exact same vector missing some increments http://pl.atyp.us/wordpress/?p=2601 explains clearly why in such approach you'll have to keep some history to be able to resolve later conflict. But it only show the beginning of the problems. Take, as in this post, the example of 3 nodes (and RF=3). Because stuffs can get lost, it could be very well be that on node 1 you'll end up receiving say both: the value 3 with a version clock [0, 1, 2] the value 3 with a version clock [0, 2, 1] and nothing before that (cause node 1 was dead during those first updates). Then you're kinda screwed. You could get back on your feet if you have the historic of the update of node 2 and 3, but that means that not only you have to keep an historic of updates for some time, you'll have to send this historic between nodes. Pretty sure that'll get messy. If I'm wrong here, please feel free to correct me, I'd love to be wrong. Now, about the current idea implemented, I want to believe that we could lift some of (what I believe to be) the main limitation, namely that we have to read at CL.ALL, by using some kind of repair-on-write (btw, the idea is not mine, but Kelvin's). That is, a write would be: send the write to one chosen replica. this replica write the increment locally. then he reads locally (which ensure that he have the current correct count for the part of the counter he is responsible for) he sends what he has read to other replicas and wait for a number of acks that depends on CL. By choosing how many replicas we wait for in 4 before answering the whole write query, we'll ensure the usual consistency level insurances (that is write then read at QUORUM ensures consistency). Sure, a write will require a local read (but only 1 btw), but in that context I do believe it's a low price to pay (plus CL.ONE don't have to wait for it to succeed, if high write throughput counter is what you really need). It is true that this will require a special verb handler for the operation. But there is nothing incredibly new in the operations it will perform so I believe that it could be written with not so much new specific code. It is also true that point 1 makes the approach slightly more fragile that usual writes, as the chosen replica may timeout on us even though other nodes wouldn't have, but after all it makes it only as fragile as our reads (since we ever only ask the actual value to a chosen node). Sorry for the long post, especially if that turns out to be stupid.
          Hide
          Jeff Darcy added a comment -

          Sylvain, what does "wait for a number of acks" in your/Kelvin's step 4 mean? What happens if one or more replicas are on the other side of a partition? What values are returned while the chosen replica is waiting for acks? It's all very well to make a "good faith" attempt to reduce the window of inconsistency by sending updates to other replicas early, but waiting indefinitely seems like trying to enforce strong consistency and if the wait terminates then we have to handle the repair after the partition is resolved anyway. This may be the same objection as Jonathan made at 8pm on August 10, although it covers more cases than just multi-DC because partitions can and do occur within DCs as well. Reading the current version vector as part of a write doesn't seem like enough of a problem to justify complex workarounds, since it's probably in memory anyway (especially for a hot counter).

          Show
          Jeff Darcy added a comment - Sylvain, what does "wait for a number of acks" in your/Kelvin's step 4 mean? What happens if one or more replicas are on the other side of a partition? What values are returned while the chosen replica is waiting for acks? It's all very well to make a "good faith" attempt to reduce the window of inconsistency by sending updates to other replicas early, but waiting indefinitely seems like trying to enforce strong consistency and if the wait terminates then we have to handle the repair after the partition is resolved anyway. This may be the same objection as Jonathan made at 8pm on August 10, although it covers more cases than just multi-DC because partitions can and do occur within DCs as well. Reading the current version vector as part of a write doesn't seem like enough of a problem to justify complex workarounds, since it's probably in memory anyway (especially for a hot counter).
          Hide
          Sylvain Lebresne added a comment -

          You left an important part of the sentence. This important part is 'that depends on CL'. Of course network partitions can occur. Actually what I'm proposing is all about handling better network partitions. It aims to establish the exact same behavior and consistency level insurances that Cassandra already ensures. The 'number of acks' will be equal to what you, the client issuing the write, ask for when specifying the consistency level.

          Moreover I've never suggested wait indefinitely. Again, it that wasn't clear is really just what Cassandra already do, that is, send the update to all replica and wait for only as many responses as the client has asked you to. Sorry if that wasn't clear.

          As for the last sentence of you comment, I'm not sure I understand it. But I sure don't meant to propose a complex workarounds, quite the contrary, and right now, I think this is neither really complex, nor a workaround as this addresses real problems.

          But don't get me wrong, I appreciate your questions/concerns as there could very well be something wrong in my reasoning.

          Show
          Sylvain Lebresne added a comment - You left an important part of the sentence. This important part is 'that depends on CL'. Of course network partitions can occur. Actually what I'm proposing is all about handling better network partitions. It aims to establish the exact same behavior and consistency level insurances that Cassandra already ensures. The 'number of acks' will be equal to what you, the client issuing the write, ask for when specifying the consistency level. Moreover I've never suggested wait indefinitely. Again, it that wasn't clear is really just what Cassandra already do, that is, send the update to all replica and wait for only as many responses as the client has asked you to. Sorry if that wasn't clear. As for the last sentence of you comment, I'm not sure I understand it. But I sure don't meant to propose a complex workarounds, quite the contrary, and right now, I think this is neither really complex, nor a workaround as this addresses real problems. But don't get me wrong, I appreciate your questions/concerns as there could very well be something wrong in my reasoning.
          Hide
          Jeff Darcy added a comment -

          If I'm reading this correctly, then the prior approach would have required reading at CL.ALL to get a consistent result and this reduces it to CL.QUORUM instead. At least, that's the only way I can reconcile "lift
          some of (what I believe to be) the main limitation" from August 10 with both "handling better network partitions" and "exact same behavior and consistency" today. Is that correct? If it is, let's look at the simultaneous-write case some more. Let's say two nodes get updates at the exact same instant, both at CL.QUORUM. A reader at CL.QUORUM will therefore see at least one value reflecting each of those updates. If that reader does see inconsistent values, how will it reconcile them? In particular, if it sees A=10/B=11/C=12 from three replicas, how will it distinguish between the following?

          (a) Initial value was 10, B got a +1 which hasn't propagated, C got a +2 which hasn't propagated -> reconciliation should yield 13.
          (b) Initial value was 10, B got a +1 which propagated to C but not A, C got a +1 which hasn't propagated at all -> reconciliation should yield 12.

          I'm not trying to poke holes here; I'm just curious because I don't see clearly how the scheme you laid out would handle this case.

          Show
          Jeff Darcy added a comment - If I'm reading this correctly, then the prior approach would have required reading at CL.ALL to get a consistent result and this reduces it to CL.QUORUM instead. At least, that's the only way I can reconcile "lift some of (what I believe to be) the main limitation" from August 10 with both "handling better network partitions" and "exact same behavior and consistency" today. Is that correct? If it is, let's look at the simultaneous-write case some more. Let's say two nodes get updates at the exact same instant, both at CL.QUORUM. A reader at CL.QUORUM will therefore see at least one value reflecting each of those updates. If that reader does see inconsistent values, how will it reconcile them? In particular, if it sees A=10/B=11/C=12 from three replicas, how will it distinguish between the following? (a) Initial value was 10, B got a +1 which hasn't propagated, C got a +2 which hasn't propagated -> reconciliation should yield 13. (b) Initial value was 10, B got a +1 which propagated to C but not A, C got a +1 which hasn't propagated at all -> reconciliation should yield 12. I'm not trying to poke holes here; I'm just curious because I don't see clearly how the scheme you laid out would handle this case.
          Hide
          Kelvin Kakugawa added a comment -

          Hi Jeff,

          Helland's Building on Quicksand paper discusses the abstract strategy behind our distributed counters. It would useful to review it:
          http://blogs.msdn.com/b/pathelland/archive/2008/12/12/building-on-quicksand-paper-for-cidr-conference-on-innovative-database-research.aspx

          The relevant sections are 5 and 6.

          Briefly, Helland postulates that 3 useful properties of distributed commutative operations are:
          1) commutative operation,
          2) partitioned work, and
          3) idempotent repair.

          I also go through #1072 in detail, here:
          http://www.slideshare.net/kakugawa/distributed-counters-in-cassandra-cassandra-summit-2010

          The important detail that we're missing from your example is the way the work was partitioned. A given counter's value is actually constructed from the incremental updates received by each replica. Let me clarify w/ an example.

          Let's assume that all replicas start w/ this distributed counter vector:
          [(A, 4), (B, 3), (C, 3)] = 10

          Example (a):
          initial value:
          [(A, 4), (B, 3), (C, 3)] = 10
          node B (+1):
          [(A, 4), (B, 4), (C, 3)] = 11
          node C (+1):
          [(A, 4), (B, 3), (C, 5)] = 12

          On reconciliation (between any node), we take the highest count for a remote replica. So, if node A received a repair from both nodes B and C, the resulting vector would be:
          [(A, 4), (B, 4), (C, 5)] = 13

          Example (b):
          initial value:
          [(A, 4), (B, 3), (C, 3)] = 10
          node B (+1):
          [(A, 4), (B, 4), (C, 3)] = 11
          node C (after repair from B):
          [(A, 4), (B, 4), (C, 3)] = 11
          node C (+1):
          [(A, 4), (B, 4), (C, 4)] = 12

          Now, let's take the situation where node A receives a repair from C (and, optionally, B), the resulting vector would be:
          [(A, 4), (B, 4), (C, 4)] = 12

          I did gloss over some details in the above. If you would like me to clarify, I would be happy to do so.

          Show
          Kelvin Kakugawa added a comment - Hi Jeff, Helland's Building on Quicksand paper discusses the abstract strategy behind our distributed counters. It would useful to review it: http://blogs.msdn.com/b/pathelland/archive/2008/12/12/building-on-quicksand-paper-for-cidr-conference-on-innovative-database-research.aspx The relevant sections are 5 and 6. Briefly, Helland postulates that 3 useful properties of distributed commutative operations are: 1) commutative operation, 2) partitioned work, and 3) idempotent repair. I also go through #1072 in detail, here: http://www.slideshare.net/kakugawa/distributed-counters-in-cassandra-cassandra-summit-2010 The important detail that we're missing from your example is the way the work was partitioned. A given counter's value is actually constructed from the incremental updates received by each replica. Let me clarify w/ an example. Let's assume that all replicas start w/ this distributed counter vector: [(A, 4), (B, 3), (C, 3)] = 10 Example (a): initial value: [(A, 4), (B, 3), (C, 3)] = 10 node B (+1): [(A, 4), (B, 4), (C, 3)] = 11 node C (+1): [(A, 4), (B, 3), (C, 5)] = 12 On reconciliation (between any node), we take the highest count for a remote replica. So, if node A received a repair from both nodes B and C, the resulting vector would be: [(A, 4), (B, 4), (C, 5)] = 13 Example (b): initial value: [(A, 4), (B, 3), (C, 3)] = 10 node B (+1): [(A, 4), (B, 4), (C, 3)] = 11 node C (after repair from B): [(A, 4), (B, 4), (C, 3)] = 11 node C (+1): [(A, 4), (B, 4), (C, 4)] = 12 Now, let's take the situation where node A receives a repair from C (and, optionally, B), the resulting vector would be: [(A, 4), (B, 4), (C, 4)] = 12 I did gloss over some details in the above. If you would like me to clarify, I would be happy to do so.
          Hide
          Jeff Darcy added a comment -

          Thanks for the explanation, Kelvin. I see now where the requirement to read at CL.ALL comes from, and how the proposed scheme avoids it. Too bad decrements had to be abandoned in the process.

          Show
          Jeff Darcy added a comment - Thanks for the explanation, Kelvin. I see now where the requirement to read at CL.ALL comes from, and how the proposed scheme avoids it. Too bad decrements had to be abandoned in the process.
          Hide
          Kelvin Kakugawa added a comment -

          minor bug fix: ensure vector tuples are deterministically ordered

          Show
          Kelvin Kakugawa added a comment - minor bug fix: ensure vector tuples are deterministically ordered
          Hide
          Jonathan Ellis added a comment -

          -2 patch seems all kinds of messed up, unless it really ballooned by 6x and includes patches to bin/ among lots of other stuff now.

          I still don't understand the objection to 580 – it requires read before write, but 1072 still does read before write to find previous clock. am I missing something?

          Show
          Jonathan Ellis added a comment - -2 patch seems all kinds of messed up, unless it really ballooned by 6x and includes patches to bin/ among lots of other stuff now. I still don't understand the objection to 580 – it requires read before write, but 1072 still does read before write to find previous clock. am I missing something?
          Hide
          Kelvin Kakugawa added a comment -

          Yes, let me rebase and create the patch properly.

          The problem w/ #580 is that vector clocks only provide a partial ordering of the logical updates. However, for distributed commutative operations, we need to: 1) partition the updates and, 2) be able to idempotently repair updates between partitions, as well. Bear in mind that we have 2 proposed implementations of #580: 1) node ids are chosen by the replica that updates the value, and 2) node ids are chosen by the client connection that updates the value.

          Let's take an example using #580 (replica-based node ids):
          The current value + vector clock is:
          10 + [(A, 5), (B, 3), (C, 4)]

          on all the replicas (A, B, and C). Now, let's say we have 2 clients, X and Y, that are trying to update the value.
          X reads from replica A, then increments the value by 2 and writes it to A (the clock is pre-constructed on the coordinator):
          12 + [(A, 6), (B, 3), (C, 4)]

          Y also reads from replica A (before X's update), then increments the value by 3 and writes it to A (the clock is pre-constructed on the coordinator):
          13 + [(A, 6), (B, 3), (C, 4)]

          Now, replica A sees both variants. However, it doesn't have enough information to properly reconcile both columns to arrive at 15.

          Let's take an example using #580 (connection-based node ids):
          The current value + vector clock is:
          10 + [(X, 5), (Y, 3)]

          on all the replicas (A, B, and C). X and Y were previous client connections that updated this value. Now, let's say we have a client Z that tries to update this value twice.
          Z reads from replica A, then increments by 2 and writes it to B (clock is pre-constructed on coordinator):
          12 + [(X, 5), (Y, 3), (Z, 1)]

          Next, Z reads from replica C, then increments by 3 and writes it to B (clock is pre-constructed on coordinator):
          13 + [(X, 5), (Y, 3), (Z, 1)]

          As in the previous example, replica B does not have enough information to properly reconcile both columns.

          Further note, in both examples above, a CL.QUORUM read, then CL.QUORUM write would not fix the reconciliation problem. #580 does not facilitate the partitioning of updates, so the reconciler does not know which portion of the value is "new" when it sees two updates w/ the same version vector. If updates aren't partitioned, then a distributed lock system, like Cages, would be needed to enforce exact ordering throughout the distributed system.

          If you are doing a set union, like in the Dynamo shopping cart example, then a partial ordering is sufficient and similar items in the shopping cart can be collapsed.

          Show
          Kelvin Kakugawa added a comment - Yes, let me rebase and create the patch properly. The problem w/ #580 is that vector clocks only provide a partial ordering of the logical updates. However, for distributed commutative operations, we need to: 1) partition the updates and, 2) be able to idempotently repair updates between partitions, as well. Bear in mind that we have 2 proposed implementations of #580: 1) node ids are chosen by the replica that updates the value, and 2) node ids are chosen by the client connection that updates the value. Let's take an example using #580 (replica-based node ids): The current value + vector clock is: 10 + [(A, 5), (B, 3), (C, 4)] on all the replicas (A, B, and C). Now, let's say we have 2 clients, X and Y, that are trying to update the value. X reads from replica A, then increments the value by 2 and writes it to A (the clock is pre-constructed on the coordinator): 12 + [(A, 6), (B, 3), (C, 4)] Y also reads from replica A (before X's update), then increments the value by 3 and writes it to A (the clock is pre-constructed on the coordinator): 13 + [(A, 6), (B, 3), (C, 4)] Now, replica A sees both variants. However, it doesn't have enough information to properly reconcile both columns to arrive at 15. Let's take an example using #580 (connection-based node ids): The current value + vector clock is: 10 + [(X, 5), (Y, 3)] on all the replicas (A, B, and C). X and Y were previous client connections that updated this value. Now, let's say we have a client Z that tries to update this value twice. Z reads from replica A, then increments by 2 and writes it to B (clock is pre-constructed on coordinator): 12 + [(X, 5), (Y, 3), (Z, 1)] Next, Z reads from replica C, then increments by 3 and writes it to B (clock is pre-constructed on coordinator): 13 + [(X, 5), (Y, 3), (Z, 1)] As in the previous example, replica B does not have enough information to properly reconcile both columns. Further note, in both examples above, a CL.QUORUM read, then CL.QUORUM write would not fix the reconciliation problem. #580 does not facilitate the partitioning of updates, so the reconciler does not know which portion of the value is "new" when it sees two updates w/ the same version vector. If updates aren't partitioned, then a distributed lock system, like Cages, would be needed to enforce exact ordering throughout the distributed system. If you are doing a set union, like in the Dynamo shopping cart example, then a partial ordering is sufficient and similar items in the shopping cart can be collapsed.
          Hide
          Jonathan Ellis added a comment -

          Right, I understand the reconciliation problem. I read Jeff Darcy's blog too. I was concerned that I was missing something about read-before-write that would apply to 580 but not 1072.

          Show
          Jonathan Ellis added a comment - Right, I understand the reconciliation problem. I read Jeff Darcy's blog too. I was concerned that I was missing something about read-before-write that would apply to 580 but not 1072.
          Hide
          Jonathan Ellis added a comment - - edited

          I actually think the main problem with the 1072 approach is on the write side, not the read. Writes are fragile. Here is what I mean by that:

          Because 1072 "shards" the increments across multiple machines, it can tolerate temporary failures. This is good. But because the shards are no longer replicas in the normal Cassandra sense – each is responsible for a different set of increments, rather than all maintaining the same data – there is no way to create a write CL greater than ONE, and thus, no defense against permanent failures of single machines. That is, if a single machine dies at the right time, you will lose data, and unlike normal cassandra you can't prevent that by requesting that writes not be acked until a higher CL is achieved. (You can try to band-aid the problem with Sylvain's repair-on-write, but only reduces the failure window, it does not eliminate it.)

          A related source of fragility is that operations here are not idempotent from the client's perspective. This is why repair-on-write can't be used to simulate higher CLs – if a client issues an increment and it comes back TimedOut (or a socket exception to the coordinator), it has no safe way to retry: if the operation went on to succeed later, but the client responded to the TimedOut by issuing another increment, we have added new data erroneously; but if we do not re-issue the operation, and the original operation failed permanently, we have lost data. Thus, even temporary node failures can cause data corruption. (I say corruption, because I mean to distinguish it from the sort of inconsistency that RR and AES can repair. Once introduced into the system, this corruption is not distinguishable from real increments and thus un-repairable.)

          Show
          Jonathan Ellis added a comment - - edited I actually think the main problem with the 1072 approach is on the write side, not the read. Writes are fragile. Here is what I mean by that: Because 1072 "shards" the increments across multiple machines, it can tolerate temporary failures. This is good. But because the shards are no longer replicas in the normal Cassandra sense – each is responsible for a different set of increments, rather than all maintaining the same data – there is no way to create a write CL greater than ONE, and thus, no defense against permanent failures of single machines. That is, if a single machine dies at the right time, you will lose data, and unlike normal cassandra you can't prevent that by requesting that writes not be acked until a higher CL is achieved. (You can try to band-aid the problem with Sylvain's repair-on-write, but only reduces the failure window, it does not eliminate it.) A related source of fragility is that operations here are not idempotent from the client's perspective. This is why repair-on-write can't be used to simulate higher CLs – if a client issues an increment and it comes back TimedOut (or a socket exception to the coordinator), it has no safe way to retry: if the operation went on to succeed later, but the client responded to the TimedOut by issuing another increment, we have added new data erroneously; but if we do not re-issue the operation, and the original operation failed permanently, we have lost data. Thus, even temporary node failures can cause data corruption. (I say corruption, because I mean to distinguish it from the sort of inconsistency that RR and AES can repair. Once introduced into the system, this corruption is not distinguishable from real increments and thus un-repairable.)
          Hide
          Johan Oskarsson added a comment -

          New cut of the patch Kelvin uploaded earlier.

          Show
          Johan Oskarsson added a comment - New cut of the patch Kelvin uploaded earlier.
          Hide
          Stu Hood added a comment -

          I don't see a clear solution to prevent the read-before-write step inherent in #580, but if read-before-write / CAS loop is a common enough step (for this issue and 1311), perhaps pushing the CAS loop to the server would be useful. I'll comment on 1311 with some thoughts.


          I think I agree with jbellis on this one: implementing write scaling (striping?) in a way that isn't generic breaks EC, so I'd like to see a solution there.

          > Because 1072 "shards" the increments across multiple machines
          A generic way to improve write throughput would be to implement striping. The outcome would essentially be duplicated rings overlaid on one another, so that multiple replica sets "own" the same tokens. For example, with N = 3, and a "striping factor" of 2, a single range would have 2 stripes of 3 nodes each, where each stripe holds disjoint data, and must be queried at read time. Writes go to a random stripe, so W=1 would block for 1 node in a stripe, and W=ALL would block for all 3 nodes in a stripe. Reads would need to touch both stripes, so a R=1 read would need to block for 2 nodes (one from each stripe).

          (The write throughput problem also heavily affects timeseries data, because users currently have to implement their own striping to prevent overloading the range receiving current data.)

          Show
          Stu Hood added a comment - I don't see a clear solution to prevent the read-before-write step inherent in #580, but if read-before-write / CAS loop is a common enough step (for this issue and 1311), perhaps pushing the CAS loop to the server would be useful. I'll comment on 1311 with some thoughts. I think I agree with jbellis on this one: implementing write scaling (striping?) in a way that isn't generic breaks EC, so I'd like to see a solution there. > Because 1072 "shards" the increments across multiple machines A generic way to improve write throughput would be to implement striping. The outcome would essentially be duplicated rings overlaid on one another, so that multiple replica sets "own" the same tokens. For example, with N = 3, and a "striping factor" of 2, a single range would have 2 stripes of 3 nodes each, where each stripe holds disjoint data, and must be queried at read time. Writes go to a random stripe, so W=1 would block for 1 node in a stripe, and W=ALL would block for all 3 nodes in a stripe. Reads would need to touch both stripes, so a R=1 read would need to block for 2 nodes (one from each stripe). (The write throughput problem also heavily affects timeseries data, because users currently have to implement their own striping to prevent overloading the range receiving current data.)
          Hide
          Jonathan Ellis added a comment -

          ISTM that "striping" this way breaks conflict resolution – if you have two increments at CL.ONE that update different nodes on the stripe during a partition, then after the partition heals there's no way to tell that you've lost an increment similar to the classic read-update-write race. Having a single node responsible for each shard fixes this problem while still allowing rudimentary failure tolerance, but has the fragility problems described above.

          Show
          Jonathan Ellis added a comment - ISTM that "striping" this way breaks conflict resolution – if you have two increments at CL.ONE that update different nodes on the stripe during a partition, then after the partition heals there's no way to tell that you've lost an increment similar to the classic read-update-write race. Having a single node responsible for each shard fixes this problem while still allowing rudimentary failure tolerance, but has the fragility problems described above.
          Hide
          Jonathan Ellis added a comment -

          Is my comment on CASSANDRA-1421 correct, that we don't need the Clock structure for this approach either?

          If so then I think we should get rid of that before 0.7 final, since nobody has a use case for raw version vectors either.

          Show
          Jonathan Ellis added a comment - Is my comment on CASSANDRA-1421 correct, that we don't need the Clock structure for this approach either? If so then I think we should get rid of that before 0.7 final, since nobody has a use case for raw version vectors either.
          Hide
          Sylvain Lebresne added a comment -

          If, as I suppose, you're refering to the Clock structure in thrift/avro, then I'm pretty sure you
          are correct. That is, the attached patch does use it, but it is trivial to get rid of it and simply
          ignore the timestamp when its a write for a counter.

          Show
          Sylvain Lebresne added a comment - If, as I suppose, you're refering to the Clock structure in thrift/avro, then I'm pretty sure you are correct. That is, the attached patch does use it, but it is trivial to get rid of it and simply ignore the timestamp when its a write for a counter.
          Hide
          Johan Oskarsson added a comment -

          Admittedly there are tradeoffs to the approach used in 1072+1397 as outlined in previous comments. As long as users are aware of these constraints there are plenty of use cases, demonstrated by the interest on the mailing list. Similar to how eventual consistency may prevent some applications but are good enough for plenty of others.

          Are there any specific code changes we can make to the patch in order to make it commit worthy?

          Show
          Johan Oskarsson added a comment - Admittedly there are tradeoffs to the approach used in 1072+1397 as outlined in previous comments. As long as users are aware of these constraints there are plenty of use cases, demonstrated by the interest on the mailing list. Similar to how eventual consistency may prevent some applications but are good enough for plenty of others. Are there any specific code changes we can make to the patch in order to make it commit worthy?
          Hide
          Jonathan Ellis added a comment -

          am I right that there is no good reason to inflict Clock on our interface at this point?

          Show
          Jonathan Ellis added a comment - am I right that there is no good reason to inflict Clock on our interface at this point?
          Hide
          Johan Oskarsson added a comment -

          We've been looking at ways to adjust this patch to meet the EC concern above and came up with this compromise: a new increment method in the thrift interface.

          With a new method Cassandra users would not be confused by the slightly different ConsistencyLevel behavior in the current patch. The method name and parameters would be a clear indication as to what it does and separate it from the methods used on standard column families.

          With the interest shown in this feature I hope we can reach a compromise that works for all involved.

          Show
          Johan Oskarsson added a comment - We've been looking at ways to adjust this patch to meet the EC concern above and came up with this compromise: a new increment method in the thrift interface. With a new method Cassandra users would not be confused by the slightly different ConsistencyLevel behavior in the current patch. The method name and parameters would be a clear indication as to what it does and separate it from the methods used on standard column families. With the interest shown in this feature I hope we can reach a compromise that works for all involved.
          Hide
          Chris Goffinet added a comment -

          I am in favor of Johan's proposal for a compromise.

          Show
          Chris Goffinet added a comment - I am in favor of Johan's proposal for a compromise.
          Hide
          Johan Oskarsson added a comment -

          Updated patch with separate method in thrift interface for increments, marked as experimental as discussed on dev list.

          Patch has been updated to the latest changes in trunk. Tested with unit and system test as well as tests run against a real cluster.

          Show
          Johan Oskarsson added a comment - Updated patch with separate method in thrift interface for increments, marked as experimental as discussed on dev list. Patch has been updated to the latest changes in trunk. Tested with unit and system test as well as tests run against a real cluster.
          Hide
          Jonathan Ellis added a comment -

          as described above, the Clock approach was a false start and should be removed now that we understand that CASSANDRA-580 is a dead end.

          Show
          Jonathan Ellis added a comment - as described above, the Clock approach was a false start and should be removed now that we understand that CASSANDRA-580 is a dead end.
          Hide
          Johan Oskarsson added a comment -

          This patch is already huge so it's better to do that in a different ticket after this has gone in. I volunteer to do that work.

          Show
          Johan Oskarsson added a comment - This patch is already huge so it's better to do that in a different ticket after this has gone in. I volunteer to do that work.
          Hide
          Jonathan Ellis added a comment -

          that it's a huge patch is exactly why I don't want to take a "let's just commit it and clean it up later" approach.

          Show
          Jonathan Ellis added a comment - that it's a huge patch is exactly why I don't want to take a "let's just commit it and clean it up later" approach.
          Hide
          Johan Oskarsson added a comment -

          The clock changes are separate from the counters and were introduced in a different ticket, CASSANDRA-1070. I see no reason to add more work that is not directly related to counters in this jira. The clock changes would get into trunk quicker if we didn't, avoiding the extra overhead of a big patch during reviews, merge with trunk, code updates and publication of a new patch.
          If the concern is that we won't attend to the clocks once this patch is in I can promise that we'll look at it straight away.

          Show
          Johan Oskarsson added a comment - The clock changes are separate from the counters and were introduced in a different ticket, CASSANDRA-1070 . I see no reason to add more work that is not directly related to counters in this jira. The clock changes would get into trunk quicker if we didn't, avoiding the extra overhead of a big patch during reviews, merge with trunk, code updates and publication of a new patch. If the concern is that we won't attend to the clocks once this patch is in I can promise that we'll look at it straight away.
          Hide
          Jonathan Ellis added a comment - - edited

          A major concern is that the clocks are part of the API and schema. So if we commit this as-is then we have several possible scenarios:

          1. clock change gets in before 0.7.0 and everything is fine
          2. clock change isn't ready for 0.7.0 and we block 0.7.0 for it
          3. clock change isn't ready for 0.7.0, we release anyway, and break compatibility w/ the next release
          4. clock change isn't ready for 0.7.0 so we postpone it until 0.8 (thus having to maintain both versions until 0.7 is obsolete)

          2-4 are not acceptable and while I know you have the best of intentions to finish it right away, the rest of 0.7.0 is shaping up rapidly and it just doesn't look like a good risk to me.

          Show
          Jonathan Ellis added a comment - - edited A major concern is that the clocks are part of the API and schema. So if we commit this as-is then we have several possible scenarios: 1. clock change gets in before 0.7.0 and everything is fine 2. clock change isn't ready for 0.7.0 and we block 0.7.0 for it 3. clock change isn't ready for 0.7.0, we release anyway, and break compatibility w/ the next release 4. clock change isn't ready for 0.7.0 so we postpone it until 0.8 (thus having to maintain both versions until 0.7 is obsolete) 2-4 are not acceptable and while I know you have the best of intentions to finish it right away, the rest of 0.7.0 is shaping up rapidly and it just doesn't look like a good risk to me.
          Hide
          Jonathan Ellis added a comment -

          (To be explicit, I'm fine with this going in 0.7.1 if it misses 0.7.0, as long as it doesn't break compatibility anywhere.)

          Show
          Jonathan Ellis added a comment - (To be explicit, I'm fine with this going in 0.7.1 if it misses 0.7.0, as long as it doesn't break compatibility anywhere.)
          Hide
          Johan Oskarsson added a comment -

          I agree that we want to avoid 2-4. But the clock api changes are already in trunk. I don't see why bundling the clock removal in 1072 would help us avoid the scenarios outlined above.

          Show
          Johan Oskarsson added a comment - I agree that we want to avoid 2-4. But the clock api changes are already in trunk. I don't see why bundling the clock removal in 1072 would help us avoid the scenarios outlined above.
          Hide
          Jonathan Ellis added a comment -

          I was going to reply that it should be obvious that removing Clock from the API is relatively easy before we add a monster patch that introduces the only real dependency on it, but talk is cheap. So I wrote the patch first: CASSANDRA-1501

          Show
          Jonathan Ellis added a comment - I was going to reply that it should be obvious that removing Clock from the API is relatively easy before we add a monster patch that introduces the only real dependency on it, but talk is cheap. So I wrote the patch first: CASSANDRA-1501
          Hide
          Johan Oskarsson added a comment -

          Updated patch with the following changes

          • Dependency on clock structure removed
          • Decrement support
          • Encapsulates part of logic in CounterColumnType, set by default_validation_class
          • Updated streaming code to fit into the latest refactoring
          • Thrift API updated to roughly match that of CASSANDRA-1546 (with the addition of supercolumn support)
          • Additional system tests

          Can be split up into smaller jiras, for example the streaming OperationType changes.
          We hope this is a compromise all parties can be happy with.

          Show
          Johan Oskarsson added a comment - Updated patch with the following changes Dependency on clock structure removed Decrement support Encapsulates part of logic in CounterColumnType, set by default_validation_class Updated streaming code to fit into the latest refactoring Thrift API updated to roughly match that of CASSANDRA-1546 (with the addition of supercolumn support) Additional system tests Can be split up into smaller jiras, for example the streaming OperationType changes. We hope this is a compromise all parties can be happy with.
          Hide
          Sylvain Lebresne added a comment - - edited

          I think this patch has a number of important shortcomings (all of which have been discussed in some comments of CASSANDRA-1546, so sorry for the repetition):

          1. the patch uses IP addresses as node identifiers for the partitions of the counters. This is overly fragile (a change of IP, accidental or not, could corrupt data) and, I'm growing more and more convinced, a bad idea. An obvious solution is to use uuids instead of IPs. However in that perspective, I believe the approach taken by CASSANDRA-1546 to be a lot simpler (but I could be biased) that the clean context logic of this patch. Because the clean context logic requires a global knowledge of the node uuid affectations, while the approach of CASSANDRA-1546 does not.
          2. cluster topology changes could result in data corruption, if no proper care is taken by the user. Consider a simple cluster with a single node A (RF=1), accepting updates on a counter c. We boostrap node B, that gets counter c in its range (it is thus streamed to B). And now let's say that node B is decommissioned. Counter c will be streamed back to A "as is". If, after B was boostrapped, repair has been run on A, this is fine. But if repair wasn't run, it'll result in a (on disk) corrupted counter, because the newly streamed parts will be merged with the old version. And I don't think that requiring users to run repair at the risk of losing data is the right fix. This is not unrelated to my previous point in that I believe that with uuids, we can fix that by renewing a given node ID on range changes. Again, the approach of CASSANDRA-1546 where we don't need to know the affectation of node ID -> actual node (at least not on the read and/or write path) make that much easier.
          3. there is a race condition during reads. During reads, a given row can be read twice, because the switch from current memtable to memtable pending flush is not atomic. The same is true when a flushed memtable becomes a sstable and at the end of compaction. This is fine for normal reads, but will result in bogus reads for counters. The patch attached to CASSANDRA-1546 proposes a fix to that.
          4. there is no replication on writes. Which is worst than merely not supporting CL.QUORUM. This patch does provide any reasonable durability guarantee. And imho, this is far too important to be simply left as a 'later improvement'.
          Show
          Sylvain Lebresne added a comment - - edited I think this patch has a number of important shortcomings (all of which have been discussed in some comments of CASSANDRA-1546 , so sorry for the repetition): the patch uses IP addresses as node identifiers for the partitions of the counters. This is overly fragile (a change of IP, accidental or not, could corrupt data) and, I'm growing more and more convinced, a bad idea. An obvious solution is to use uuids instead of IPs. However in that perspective, I believe the approach taken by CASSANDRA-1546 to be a lot simpler (but I could be biased) that the clean context logic of this patch. Because the clean context logic requires a global knowledge of the node uuid affectations, while the approach of CASSANDRA-1546 does not. cluster topology changes could result in data corruption, if no proper care is taken by the user. Consider a simple cluster with a single node A (RF=1), accepting updates on a counter c. We boostrap node B, that gets counter c in its range (it is thus streamed to B). And now let's say that node B is decommissioned. Counter c will be streamed back to A "as is". If, after B was boostrapped, repair has been run on A, this is fine. But if repair wasn't run, it'll result in a (on disk) corrupted counter, because the newly streamed parts will be merged with the old version. And I don't think that requiring users to run repair at the risk of losing data is the right fix. This is not unrelated to my previous point in that I believe that with uuids, we can fix that by renewing a given node ID on range changes. Again, the approach of CASSANDRA-1546 where we don't need to know the affectation of node ID -> actual node (at least not on the read and/or write path) make that much easier. there is a race condition during reads. During reads, a given row can be read twice, because the switch from current memtable to memtable pending flush is not atomic. The same is true when a flushed memtable becomes a sstable and at the end of compaction. This is fine for normal reads, but will result in bogus reads for counters. The patch attached to CASSANDRA-1546 proposes a fix to that. there is no replication on writes. Which is worst than merely not supporting CL.QUORUM. This patch does provide any reasonable durability guarantee. And imho, this is far too important to be simply left as a 'later improvement'.
          Hide
          Kelvin Kakugawa added a comment -

          A set of system tests for #1072.

          A couple points of interest:

          • FIXME tags denote areas that need to be modified: hostnames, shell commands to stop/start/restart nodes on the remote server.
          • requires python 2.6
          • needs to be run from the base directory
          Show
          Kelvin Kakugawa added a comment - A set of system tests for #1072. A couple points of interest: FIXME tags denote areas that need to be modified: hostnames, shell commands to stop/start/restart nodes on the remote server. requires python 2.6 needs to be run from the base directory
          Hide
          Johan Oskarsson added a comment -

          All good points. We have reviewed them and the code, replies below.

          1. Is this the kind of ip address situation you are referring to?
          A cluster of nodes: A (127.0.0.1), B (127.0.0.2), and C (127.0.0.3) have been running and are not fully consistent. They're brought back up w/ shuffled ips, like so: A (127.0.0.2), B (127.0.0.3), and C (127.0.0.1). A has the most up-to-date view of writes to 127.0.0.1, however, C is now in-charge of writes to 127.0.0.1. i.e. any writes to A that C had not seen, previously, have now been lost.
          While this is possible, it's not very likely that a production situation would shuffle the ip addresses of nodes in this way.
          A fix with UUIDs is possible but it's beyond the scope of this jira.

          2. Valid issue, but it does sound like something of an edge case. For a first version of 1072 it seems reasonable that instructions for ops would be sufficient for this problem. If the community then still feels it's a problem we can look at how to improve the code.

          3. To resolve this issue we have borrowed the implementation from CASSANDRA-1546 (with the added deadlock fix).

          4. Brought over implementation written earlier into this patch. Adds a stage and task that does "repair on write".

          We've also refreshed the design doc to reflect these changes.

          Show
          Johan Oskarsson added a comment - All good points. We have reviewed them and the code, replies below. 1. Is this the kind of ip address situation you are referring to? A cluster of nodes: A (127.0.0.1), B (127.0.0.2), and C (127.0.0.3) have been running and are not fully consistent. They're brought back up w/ shuffled ips, like so: A (127.0.0.2), B (127.0.0.3), and C (127.0.0.1). A has the most up-to-date view of writes to 127.0.0.1, however, C is now in-charge of writes to 127.0.0.1. i.e. any writes to A that C had not seen, previously, have now been lost. While this is possible, it's not very likely that a production situation would shuffle the ip addresses of nodes in this way. A fix with UUIDs is possible but it's beyond the scope of this jira. 2. Valid issue, but it does sound like something of an edge case. For a first version of 1072 it seems reasonable that instructions for ops would be sufficient for this problem. If the community then still feels it's a problem we can look at how to improve the code. 3. To resolve this issue we have borrowed the implementation from CASSANDRA-1546 (with the added deadlock fix). 4. Brought over implementation written earlier into this patch. Adds a stage and task that does "repair on write". We've also refreshed the design doc to reflect these changes.
          Hide
          Kelvin Kakugawa added a comment -

          updated design doc to reflect refactored design:
          no algorithm change
          implemented refactored

          Show
          Kelvin Kakugawa added a comment - updated design doc to reflect refactored design: no algorithm change implemented refactored
          Hide
          Sylvain Lebresne added a comment -

          Thanks for answer. I haven't had the time to have a look at the new patch, sorry, but I will as soon as time permits. A few answers to your answers tho.

          1. Is this the kind of IP address situation you are referring to? A cluster of nodes: A (127.0.0.1), B (127.0.0.2), and C (127.0.0.3) have been running and are not fully consistent. They're brought back up w/ shuffled ips, like so: A (127.0.0.2), B (127.0.0.3), and C (127.0.0.1). A has the most up-to-date view of writes to 127.0.0.1, however, C is now in-charge of writes to 127.0.0.1. i.e. any writes to A that C had not seen, previously, have now been lost.

          That's one scenario but I think you'll actually be very lucky if in such scenario, you only "loose a few non replicated updates". There is much (much) worst.

          Suppose you have your 3 nodes cluster (and say RF=2 or 3). Node A accepts one or more counter update and its part of the counter is say 10. This value 10 is replicated to B (as part of "repair on write" or read repair). On B, the memtable is flushed, this value 10 is in one of B sstable. Now A accepts more update(s), yielding the value to say 15. Again, this value 15 is replicated. At this point, the cluster is coherent and the value for the counter is 15. But say somehow the cluster is shutdown, there is some IP mixup and B is restarted with the ip that A had before. Now, any read (on B) will reconcile the two values 10 and 15, merge them (because it now believe that these are updates it has accepted and as such are deltas, while they are not) and yield 25. Very quickly, replication will pollute every other node in the cluster with this bogus value and compaction will make it permanent.

          Potentially, any change of a node IP that uses an IP that has been used for another node at some point (even a decommissioned one) can be harmful (and dramatically so), unless you know that everything has been compacted nice and clean.

          So while I agree that such change of IPs are not supposed to be the norm, it can, and so it will, happen (even in test environment, where one could be less prudent and thus such scenario are even more likely to happen, it will pissed off people real bad).

          I'm strongly opposed (and will always be) to any change to Cassandra that will destroy data because someone in the op team has messed up and hit the enter key a bit too quickly. But that's just my humble opinion and its open source, so anybody else, please chime in and give yours.

          A fix with UUIDs is possible but it's beyond the scope of this jira.

          Because of what's above, I disagree with this. Even more so because I'm not at all convinced that this could be easily fixed afterwards.

          2. Valid issue, but it does sound like something of an edge case. For a first version of 1072 it seems reasonable that instructions for ops would be sufficient for this problem. If the community then still feels it's a problem we can look at how to improve the code.

          Not sure that's an edge case. Right now, when a node is boostrapped, repair is not run automatically at the end of the boostrap (in parts) because more failures happen quickly. Thus a good advice is to wait a bit to make sure the new node behave alright before running repair on the other nodes, to have a quick roll back if the new node doesn't behave correctly. Boostrap followed by decommission seems to me bound to happen from time to time (if someone feels like confirming/denying ?). That repair have not been run when this happens doesn't seems a crazy scenario at all either. And anyway, as for 1, the risk is to corrupt data (for the same reason, because a node will merge values that are not deltas). I don't consider that "telling people to be careful" is a fix. And because I don't think fixing that will be easy, I'm not comfortable with seeing that later.

          More generally, the counter design is based on some values being merged (summed) together (deltas) and other being reconciled as usual based on timestamps. This is a double-edged sword. It allows for quite nice performance properties, but it requires to be very careful not to sum two values that should not be summed. I don't believe this is something that should be done later (especially when we're not sure it can be done later in a satisfactory way).

          3. To resolve this issue we have borrowed the implementation from CASSANDRA-1546 (with the added deadlock fix).

          Cool and thanks for the deadlock fix.

          Show
          Sylvain Lebresne added a comment - Thanks for answer. I haven't had the time to have a look at the new patch, sorry, but I will as soon as time permits. A few answers to your answers tho. 1. Is this the kind of IP address situation you are referring to? A cluster of nodes: A (127.0.0.1), B (127.0.0.2), and C (127.0.0.3) have been running and are not fully consistent. They're brought back up w/ shuffled ips, like so: A (127.0.0.2), B (127.0.0.3), and C (127.0.0.1). A has the most up-to-date view of writes to 127.0.0.1, however, C is now in-charge of writes to 127.0.0.1. i.e. any writes to A that C had not seen, previously, have now been lost. That's one scenario but I think you'll actually be very lucky if in such scenario, you only "loose a few non replicated updates". There is much (much) worst. Suppose you have your 3 nodes cluster (and say RF=2 or 3). Node A accepts one or more counter update and its part of the counter is say 10. This value 10 is replicated to B (as part of "repair on write" or read repair). On B, the memtable is flushed, this value 10 is in one of B sstable. Now A accepts more update(s), yielding the value to say 15. Again, this value 15 is replicated. At this point, the cluster is coherent and the value for the counter is 15. But say somehow the cluster is shutdown, there is some IP mixup and B is restarted with the ip that A had before. Now, any read (on B) will reconcile the two values 10 and 15, merge them (because it now believe that these are updates it has accepted and as such are deltas, while they are not) and yield 25. Very quickly, replication will pollute every other node in the cluster with this bogus value and compaction will make it permanent. Potentially, any change of a node IP that uses an IP that has been used for another node at some point (even a decommissioned one) can be harmful (and dramatically so), unless you know that everything has been compacted nice and clean. So while I agree that such change of IPs are not supposed to be the norm, it can, and so it will, happen (even in test environment, where one could be less prudent and thus such scenario are even more likely to happen, it will pissed off people real bad). I'm strongly opposed (and will always be) to any change to Cassandra that will destroy data because someone in the op team has messed up and hit the enter key a bit too quickly. But that's just my humble opinion and its open source, so anybody else, please chime in and give yours. A fix with UUIDs is possible but it's beyond the scope of this jira. Because of what's above, I disagree with this. Even more so because I'm not at all convinced that this could be easily fixed afterwards. 2. Valid issue, but it does sound like something of an edge case. For a first version of 1072 it seems reasonable that instructions for ops would be sufficient for this problem. If the community then still feels it's a problem we can look at how to improve the code. Not sure that's an edge case. Right now, when a node is boostrapped, repair is not run automatically at the end of the boostrap (in parts) because more failures happen quickly. Thus a good advice is to wait a bit to make sure the new node behave alright before running repair on the other nodes, to have a quick roll back if the new node doesn't behave correctly. Boostrap followed by decommission seems to me bound to happen from time to time (if someone feels like confirming/denying ?). That repair have not been run when this happens doesn't seems a crazy scenario at all either. And anyway, as for 1, the risk is to corrupt data (for the same reason, because a node will merge values that are not deltas). I don't consider that "telling people to be careful" is a fix. And because I don't think fixing that will be easy, I'm not comfortable with seeing that later. More generally, the counter design is based on some values being merged (summed) together (deltas) and other being reconciled as usual based on timestamps. This is a double-edged sword. It allows for quite nice performance properties, but it requires to be very careful not to sum two values that should not be summed. I don't believe this is something that should be done later (especially when we're not sure it can be done later in a satisfactory way). 3. To resolve this issue we have borrowed the implementation from CASSANDRA-1546 (with the added deadlock fix). Cool and thanks for the deadlock fix.
          Hide
          Kelvin Kakugawa added a comment -

          re-based against trunk

          material modifications:

          • removed counter lock from 1546
          • reverse order of read/write for commutative operations
          • race condition: may potentially miss an MT, but never double counts (compare normal: may reconcile an MT twice, but never misses)

          note: the above is a more graceful race condition, than the normal code path. however, it does make eventual consistency more eventual.
          further note: if perfect accuracy is required, a more fine-grained lock strategy should be implemented.

          Show
          Kelvin Kakugawa added a comment - re-based against trunk material modifications: removed counter lock from 1546 reverse order of read/write for commutative operations race condition: may potentially miss an MT, but never double counts (compare normal: may reconcile an MT twice, but never misses) note: the above is a more graceful race condition, than the normal code path. however, it does make eventual consistency more eventual. further note: if perfect accuracy is required, a more fine-grained lock strategy should be implemented.
          Hide
          Jonathan Ellis added a comment -

          I think the bootstrap/decommission problem can be left as "don't do that" for now.

          I think we do need to fix the IP/UUID issue since as Sylvain says this isn't something that's going to be easy to fix later without forcing dump/reload.

          Show
          Jonathan Ellis added a comment - I think the bootstrap/decommission problem can be left as "don't do that" for now. I think we do need to fix the IP/UUID issue since as Sylvain says this isn't something that's going to be easy to fix later without forcing dump/reload.
          Hide
          Ryan King added a comment -

          We understand that is an issue that should be improved, but don't think it should be a blocker to moving forward.

          We at Twitter are happy with this version and can't afford to spend any more time on it. We'd like to see this committed to trunk soon, but either way we're going to be running this code indefinitely, whether or not it ever makes it into an open source release.

          Show
          Ryan King added a comment - We understand that is an issue that should be improved, but don't think it should be a blocker to moving forward. We at Twitter are happy with this version and can't afford to spend any more time on it. We'd like to see this committed to trunk soon, but either way we're going to be running this code indefinitely, whether or not it ever makes it into an open source release.
          Hide
          Simon Reavely added a comment -

          For what its worth I would like to add support to what Ryan is saying...I really, really, really want to see this in trunk soon, despite the limitations.

          Show
          Simon Reavely added a comment - For what its worth I would like to add support to what Ryan is saying...I really, really, really want to see this in trunk soon, despite the limitations.
          Hide
          Jonathan Ellis added a comment -

          We want to help, I'm just thinking this through.

          It sounds like you agree that using UUIDs is the right approach long term. (Lots of people run Cassandra on EC2 + EBS, so changing IPs the data is on is not a corner case at all there. Or any time you restore from a snapshot...)

          So if you're going to be running this patch indefinitely, is committing it knowing that we're going to fix it to use UUIDs incompatibly, actually useful to the "get Twitter running mainline Cassandra" goal?

          Show
          Jonathan Ellis added a comment - We want to help, I'm just thinking this through. It sounds like you agree that using UUIDs is the right approach long term. (Lots of people run Cassandra on EC2 + EBS, so changing IPs the data is on is not a corner case at all there. Or any time you restore from a snapshot...) So if you're going to be running this patch indefinitely, is committing it knowing that we're going to fix it to use UUIDs incompatibly, actually useful to the "get Twitter running mainline Cassandra" goal?
          Hide
          Johan Oskarsson added a comment -

          The short answer is yes, committing this now would make it much easier for us to run mainline Cassandra.
          So far our upgrades of the counting clusters have involved recreating the data from our source of truth, so once more would not be the end of the world.

          Show
          Johan Oskarsson added a comment - The short answer is yes, committing this now would make it much easier for us to run mainline Cassandra. So far our upgrades of the counting clusters have involved recreating the data from our source of truth, so once more would not be the end of the world.
          Hide
          Sylvain Lebresne added a comment -

          I read through the last patch:

          1. I think there is a "bug" with the repair-on-write. As written, before sending a "repair", it cleans the context for all the replicas it will send it to, that is all alive replicas. This will basically clean up everything (except for maybe the count of down or old (decommissioned) replicas). Again, unless I'm missing something obvious, this will basically spend its time sending empty row mutations. For each replica, only the context of this replica should be cleaned before sending it the repair mutation for this to be useful.
          2. Even if "fixed", repair-on-write doesn't help to give consistency guarantees, being (non-optionally) asynchronous. Only CL.ONE is supported for writes (which anecdotally is only checked by an assertion much too late). I see no reason for that limitation and to me it sure sound like something we want to provide on day one.
          3. It removes the fix of the race condition. I strongly believe correctness should be our first concern. Making the race condition less harmful is not a fix. That being said, I'm not saying the fix in 1546 cannot be improved and the locking be made more fine-grained.

          Even in the absence of any failure, there is no more guarantee that what CL.ONE has to offers. By giving so little guarantees, 1072 will make the counter useful to much less people that it could, and this for no clear reason.

          Show
          Sylvain Lebresne added a comment - I read through the last patch: I think there is a "bug" with the repair-on-write. As written, before sending a "repair", it cleans the context for all the replicas it will send it to, that is all alive replicas. This will basically clean up everything (except for maybe the count of down or old (decommissioned) replicas). Again, unless I'm missing something obvious, this will basically spend its time sending empty row mutations. For each replica, only the context of this replica should be cleaned before sending it the repair mutation for this to be useful. Even if "fixed", repair-on-write doesn't help to give consistency guarantees, being (non-optionally) asynchronous. Only CL.ONE is supported for writes (which anecdotally is only checked by an assertion much too late). I see no reason for that limitation and to me it sure sound like something we want to provide on day one. It removes the fix of the race condition. I strongly believe correctness should be our first concern. Making the race condition less harmful is not a fix. That being said, I'm not saying the fix in 1546 cannot be improved and the locking be made more fine-grained. Even in the absence of any failure, there is no more guarantee that what CL.ONE has to offers. By giving so little guarantees, 1072 will make the counter useful to much less people that it could, and this for no clear reason.
          Hide
          Jonathan Ellis added a comment - - edited

          Is this an accurate summary of the current state of 1072 and CASSANDRA-1546?

          1072-pro: tested in production. Supports counters in subcolumns.

          1072-con: IP-based partitioning is prone to catastrophic data loss if node IPs change (which is fairly common on an EC2/EBS deployment) or on decommission-after-bootstrap-before-repair (which is fairly uncommon ime) or possibly in other scenarios as well; changing 1072 to use UUIDs instead of IPs to fix this would be a substantial undertaking. In the worst case, there is no ConsistencyLevel guarantee higher than ONE even with repair-on-write.

          1546-pro: ConsistencyLevel support is built in; support for optional "marker" allows retry on failure as well. "LocalCounterColumn" approach limits the damage that can be done by bugs compared to relying on post-transfer "cleaning" of non-local data. Supports node IP changes.

          1546-con: essentially untested. Some known corner cases involving node movement need to be addressed. Would require using a custom comparator to emulate counters in subcolumns.

          Show
          Jonathan Ellis added a comment - - edited Is this an accurate summary of the current state of 1072 and CASSANDRA-1546 ? 1072-pro: tested in production. Supports counters in subcolumns. 1072-con: IP-based partitioning is prone to catastrophic data loss if node IPs change (which is fairly common on an EC2/EBS deployment) or on decommission-after-bootstrap-before-repair (which is fairly uncommon ime) or possibly in other scenarios as well; changing 1072 to use UUIDs instead of IPs to fix this would be a substantial undertaking. In the worst case, there is no ConsistencyLevel guarantee higher than ONE even with repair-on-write. 1546-pro: ConsistencyLevel support is built in; support for optional "marker" allows retry on failure as well. "LocalCounterColumn" approach limits the damage that can be done by bugs compared to relying on post-transfer "cleaning" of non-local data. Supports node IP changes. 1546-con: essentially untested. Some known corner cases involving node movement need to be addressed. Would require using a custom comparator to emulate counters in subcolumns.
          Hide
          Ryan King added a comment -

          Yeah, that sounds like a fair summary of the situation.

          Show
          Ryan King added a comment - Yeah, that sounds like a fair summary of the situation.
          Hide
          Kelvin Kakugawa added a comment -

          We've also tried to test 1546, internally, but at scale it's counter-specific read/write code paths add too much contention to the thread stages.

          Consequently, we also invested a reasonable effort into optimizing 1546: removed a deadlock, found an unnecessary lock around SST compaction. However, ultimately, we re-did the logic in 1072 to avoid the lock strategy of 1546. 1546's additional, counter-specific read/write paths require more reasoning about how it interacts w/ the underlying system. Whereas, 1072's extension is more limited in scope, because it only extends the basic logic of Column classes.

          Show
          Kelvin Kakugawa added a comment - We've also tried to test 1546, internally, but at scale it's counter-specific read/write code paths add too much contention to the thread stages. Consequently, we also invested a reasonable effort into optimizing 1546: removed a deadlock, found an unnecessary lock around SST compaction. However, ultimately, we re-did the logic in 1072 to avoid the lock strategy of 1546. 1546's additional, counter-specific read/write paths require more reasoning about how it interacts w/ the underlying system. Whereas, 1072's extension is more limited in scope, because it only extends the basic logic of Column classes.
          Hide
          Jonathan Ellis added a comment -

          In a perfect world I would rather wait to address the remaining -cons before committing but Kelvin, Johan, and Ryan have been rebasing this for months now and I don't think it's fair to prolong that pain indefinitely, especially if we're agreeing that we don't have to preserve backwards-compatibility at the storage layer when we fix those last issues.

          I would like to try to get to a stable client API before we commit though, for the benefit of other users as well as client authors. There are two fairly minor points I'd like to address:

          • A way to batch increment a counter in a super column (would changing CounterUpdate to contain a Counter instead of a CounterColumn be all we need here?)
          • Remove the timestamp from CounterUpdate (but keep the struct, because we'll want to add an optional uuid context for the planned replay support). The timestamp doesn't make sense to push client-side the way ordinary write timestamps do, since the conflict resolution is much more complicated and internal, so let's generate that server side.
          Show
          Jonathan Ellis added a comment - In a perfect world I would rather wait to address the remaining -cons before committing but Kelvin, Johan, and Ryan have been rebasing this for months now and I don't think it's fair to prolong that pain indefinitely, especially if we're agreeing that we don't have to preserve backwards-compatibility at the storage layer when we fix those last issues. I would like to try to get to a stable client API before we commit though, for the benefit of other users as well as client authors. There are two fairly minor points I'd like to address: A way to batch increment a counter in a super column (would changing CounterUpdate to contain a Counter instead of a CounterColumn be all we need here?) Remove the timestamp from CounterUpdate (but keep the struct, because we'll want to add an optional uuid context for the planned replay support). The timestamp doesn't make sense to push client-side the way ordinary write timestamps do, since the conflict resolution is much more complicated and internal, so let's generate that server side.
          Hide
          Jonathan Ellis added a comment - - edited

          Also: Jake thinks Thrift union support is limited to C++, Java, and Ruby. Unless he's wrong, it sounds like we should change Counter to a struct with optional fields.

          Show
          Jonathan Ellis added a comment - - edited Also: Jake thinks Thrift union support is limited to C++, Java, and Ruby. Unless he's wrong, it sounds like we should change Counter to a struct with optional fields.
          Hide
          T Jake Luciani added a comment -

          Yes those are the three that languages that directly support it. The rest treat it like a struct with 2 optional members.

          Show
          T Jake Luciani added a comment - Yes those are the three that languages that directly support it. The rest treat it like a struct with 2 optional members.
          Hide
          Jonathan Ellis added a comment -

          That should be adequate then. Thanks for the clarification.

          Show
          Jonathan Ellis added a comment - That should be adequate then. Thanks for the clarification.
          Hide
          Kelvin Kakugawa added a comment -

          update interface:

          • CounterUpdate: remove timestamp
          • CounterMutation: added
          • batch_add: via CounterMutation (instead of CounterUpdate)
          Show
          Kelvin Kakugawa added a comment - update interface: CounterUpdate: remove timestamp CounterMutation: added batch_add: via CounterMutation (instead of CounterUpdate)
          Hide
          Sylvain Lebresne added a comment -

          I would still made a few changes to this interface:

          1. CounterUpdate is lacking for batch_add, a CounterMutation should be a CounterUpdate or a Deletion.
          2. And since we don't want to expose timestamps to the client for counters, we should probably have
            a specific CounterDeletion struct with no timestamp. For the same reason the timestamp in remove_counter
            should be removed too.

          About the deletion/remove, as it turns out it doesn't really work and probably never will, in that if new increments
          follow too closely a remove, the remove could be ignored. I'm fine with keeping deletions as it (it's still useful, if only to remove definitively a counter) with a documentation explaining the limitation. But knowing it has limitation, we could also decide to keep removes restricted to the remove_counter function and not including it in batch updates. In which case we can get rid of CounterMutation altogether (and avoid creating a new CounterDeletion struct).

          Show
          Sylvain Lebresne added a comment - I would still made a few changes to this interface: CounterUpdate is lacking for batch_add, a CounterMutation should be a CounterUpdate or a Deletion. And since we don't want to expose timestamps to the client for counters, we should probably have a specific CounterDeletion struct with no timestamp. For the same reason the timestamp in remove_counter should be removed too. About the deletion/remove, as it turns out it doesn't really work and probably never will, in that if new increments follow too closely a remove, the remove could be ignored. I'm fine with keeping deletions as it (it's still useful, if only to remove definitively a counter) with a documentation explaining the limitation. But knowing it has limitation, we could also decide to keep removes restricted to the remove_counter function and not including it in batch updates. In which case we can get rid of CounterMutation altogether (and avoid creating a new CounterDeletion struct).
          Hide
          Kelvin Kakugawa added a comment -

          np, I thought ahead. CounterMutation is a struct w/ 2 optional fields: Counter, and Deletion. It basically re-writes CounterMutation as a Mutation.

          Not in favor of CounterDeletion w/o a timestamp, though. Although, I do see your point, I'm a little hesitant to make it too implicit. Granted, users have to keep in mind that it's the JVM's timestamp that matters.

          Show
          Kelvin Kakugawa added a comment - np, I thought ahead. CounterMutation is a struct w/ 2 optional fields: Counter, and Deletion. It basically re-writes CounterMutation as a Mutation. Not in favor of CounterDeletion w/o a timestamp, though. Although, I do see your point, I'm a little hesitant to make it too implicit. Granted, users have to keep in mind that it's the JVM's timestamp that matters.
          Hide
          Jonathan Ellis added a comment -

          CounterMutation is a struct w/ 2 optional fields: Counter, and Deletion

          looks reasonable to me.

          Not in favor of CounterDeletion w/o a timestamp

          I think it's confusing from a user's perspective to have it required on delete when it is not on write. If there's no value to letting the user provide timestamps other than the current time then let's leave it out.

          Show
          Jonathan Ellis added a comment - CounterMutation is a struct w/ 2 optional fields: Counter, and Deletion looks reasonable to me. Not in favor of CounterDeletion w/o a timestamp I think it's confusing from a user's perspective to have it required on delete when it is not on write. If there's no value to letting the user provide timestamps other than the current time then let's leave it out.
          Hide
          Kelvin Kakugawa added a comment -

          API update:
          removed timestamp from remove_counter and CounterDeletion

          Show
          Kelvin Kakugawa added a comment - API update: removed timestamp from remove_counter and CounterDeletion
          Hide
          Sylvain Lebresne added a comment -

          np, I thought ahead. CounterMutation is a struct w/ 2 optional fields: Counter, and Deletion. It basically re-writes CounterMutation as a Mutation.

          Sorry, I don't know if I was clear before but I still think there is some inconsistency between CounterMutation and CounterUpdate. We want to keep CounterUpdate so we can add an optional uuid in there when we add replay facilities. But since a CounterMutation takes a Counter and not a CounterUpdate, we won't be able to have the uuid in there, which will limit the replay mechanism to add for no good reason (I think adding the uuid to CounterMutation directly is the wrong solution because it's not clear the replay will apply to deletions, plus I think this would be more confusing anyway). What I'm proposing is simply to have

            struct CounterMutation {
                1: optional CounterUpdate counter,
                2: optional CounterDeletion deletion,
            }
          

          Another (very minor) remark is that the comment for CounterUpdate is outdated (mentions the timestamp).

          Show
          Sylvain Lebresne added a comment - np, I thought ahead. CounterMutation is a struct w/ 2 optional fields: Counter, and Deletion. It basically re-writes CounterMutation as a Mutation. Sorry, I don't know if I was clear before but I still think there is some inconsistency between CounterMutation and CounterUpdate. We want to keep CounterUpdate so we can add an optional uuid in there when we add replay facilities. But since a CounterMutation takes a Counter and not a CounterUpdate, we won't be able to have the uuid in there, which will limit the replay mechanism to add for no good reason (I think adding the uuid to CounterMutation directly is the wrong solution because it's not clear the replay will apply to deletions, plus I think this would be more confusing anyway). What I'm proposing is simply to have struct CounterMutation { 1: optional CounterUpdate counter, 2: optional CounterDeletion deletion, } Another (very minor) remark is that the comment for CounterUpdate is outdated (mentions the timestamp).
          Hide
          Kelvin Kakugawa added a comment -

          Ah, I see. Good point.

          Let me work that out. We'll need to support SC mutation, as well, which CounterUpdate doesn't support.

          Show
          Kelvin Kakugawa added a comment - Ah, I see. Good point. Let me work that out. We'll need to support SC mutation, as well, which CounterUpdate doesn't support.
          Hide
          Kelvin Kakugawa added a comment -

          So, the challenge is that I mirrored the mutation interface for counters.

          I have two proposals that I'd like your opinion on:
          a) add an optional uuid field to CounterColumn, or
          b) add an optional uuid field to CounterMutation that covers the whole mutation.

          (b) is not very fine-grained. However, if a batch_add call fails, the client should retry the whole batch mutation. ntm, uuids are specific to a given counter, so there won't be any fear of collisions across keys. Although, if a batch mutation were to update the same key twice (w/in its update_map), the mutation will have to be collapsed at some point.

          note: if we go w/ (a), then CounterUpdate should be refactored out into just a CounterColumn.

          Show
          Kelvin Kakugawa added a comment - So, the challenge is that I mirrored the mutation interface for counters. I have two proposals that I'd like your opinion on: a) add an optional uuid field to CounterColumn, or b) add an optional uuid field to CounterMutation that covers the whole mutation. (b) is not very fine-grained. However, if a batch_add call fails, the client should retry the whole batch mutation. ntm, uuids are specific to a given counter, so there won't be any fear of collisions across keys. Although, if a batch mutation were to update the same key twice (w/in its update_map), the mutation will have to be collapsed at some point. note: if we go w/ (a), then CounterUpdate should be refactored out into just a CounterColumn.
          Hide
          Sylvain Lebresne added a comment -

          Hum indeed. In theory I think I prefer (a). It "feels" better and it is indeed more fine grained (and we won't have to care about what to do if a batch_add update multiple times the same counter, though it's not really a big problem). The only downside is that we return a CounterColumn from a get and we will never fill the uuid in that situation (even if the user provides one on add). It's probably ok since the uuid will be optional but still, this mean that for a CounterColumn c that use a uuid, we won't have c = get(add(c)), which may be slightly surprising to users at first.

          Despite that, I still think that my own preference goes to (a) (since I don't see any other clearly better alternative on the top of my hat). And I agree that CounterUpdate could go away then.

          Show
          Sylvain Lebresne added a comment - Hum indeed. In theory I think I prefer (a). It "feels" better and it is indeed more fine grained (and we won't have to care about what to do if a batch_add update multiple times the same counter, though it's not really a big problem). The only downside is that we return a CounterColumn from a get and we will never fill the uuid in that situation (even if the user provides one on add). It's probably ok since the uuid will be optional but still, this mean that for a CounterColumn c that use a uuid, we won't have c = get(add(c)), which may be slightly surprising to users at first. Despite that, I still think that my own preference goes to (a) (since I don't see any other clearly better alternative on the top of my hat). And I agree that CounterUpdate could go away then.
          Hide
          Kelvin Kakugawa added a comment -

          Ok, I'll work out (a) and remove CounterUpdate.

          Show
          Kelvin Kakugawa added a comment - Ok, I'll work out (a) and remove CounterUpdate.
          Hide
          Kelvin Kakugawa added a comment -

          api update: replace CounterUpdate w/ CounterColumn (allow optional uuid field to be added to CounterColumn)

          Show
          Kelvin Kakugawa added a comment - api update: replace CounterUpdate w/ CounterColumn (allow optional uuid field to be added to CounterColumn)
          Hide
          Sylvain Lebresne added a comment -

          Look goods, but I wasn't able to apply the last patch on trunk (a try against 0.7 wasn't more successful). Could you rebase please ?

          Show
          Sylvain Lebresne added a comment - Look goods, but I wasn't able to apply the last patch on trunk (a try against 0.7 wasn't more successful). Could you rebase please ?
          Hide
          Kelvin Kakugawa added a comment -

          rebased against trunk@0f89b78a9dd6a89cf56e0828c16f8c1673c83f96

          fixed a regression: CounterColumn needs to override deepCopy()

          Show
          Kelvin Kakugawa added a comment - rebased against trunk@0f89b78a9dd6a89cf56e0828c16f8c1673c83f96 fixed a regression: CounterColumn needs to override deepCopy()
          Hide
          Sylvain Lebresne added a comment -

          Thanks for the rebasing. A few comments/remarks:

          1. add and batch_add should throw an InvalidRequestException if CL != CL.ONE.
            Right now this is checked by an assert in StorageProxy which is far too
            late.
          2. Why doesn't the code really stuff the partitionedCounter inside the column
            value ? Keeping the value and the partionedCounter aside seems wasteful. It
            wastes space (the information is stored twice) and it wastes cpu (the
            total is recomputed each time the context is modified, in particular after
            each call to reconcile, while in practice is would be enough to compute it
            just before returning the final value to the client). I admit the cpu part
            is not so clear as if a counter doesn't change, we avoid recomputing the
            value on each read by keeping it aside. I still think this would be a win
            to no keep the pre-computed value. And anyway, I find this juggling
            between the value and the partionedCounter adds clutter and is error-prone.
          3. That's not really a remark on the patch itself, but the patch includes the
            fix for CASSANDRA-1830, so I think we'd better commit the #1830 first and
            rebase this afterwards.

          A few other more minor remarks:

          1. Renaming repair_on_write to replicate_on_write would make me happy, in
            that I do hope we can later improve this mechanism so that it helps
            establish the usual consistency guarantees. In which case I believe
            replicate_on_write will be a more precise and less confusing name.</nitpicking>
          2. About the repair on write, I would actually be in favor of renaming it
            further in replicate_counter_on_write (or repair_counter_on_write )
            and/or disallowing it for non counter CFs. Even though there is nothing
            intrinsically wrong with doing repair_on_write on non counter CFs, I think
            it would be a disservice to users to let them think it could be useful
            (unless it does have a use for non counter CFs, but I really don't see any).
          3. A few lines in ColumnSerializer do not follow the coding style.

          Arguably those are either trivially fixable and/or more a matter of opinion
          than anything. The exception being maybe the point on getting rid of keeping
          both value and partionedCounter. I do believe it would be a nice change but I
          admit this is not crucial and I would be OK with committing as is and keep
          this as a future refactoring in the interest of having things move forward
          (note however that changing this will break the on disk format, but I think we
          agreed that we'll probably break it anyway to introduce uuids). Anyway, I
          would still love to get your opinions on the issue

          Show
          Sylvain Lebresne added a comment - Thanks for the rebasing. A few comments/remarks: add and batch_add should throw an InvalidRequestException if CL != CL.ONE. Right now this is checked by an assert in StorageProxy which is far too late. Why doesn't the code really stuff the partitionedCounter inside the column value ? Keeping the value and the partionedCounter aside seems wasteful. It wastes space (the information is stored twice) and it wastes cpu (the total is recomputed each time the context is modified, in particular after each call to reconcile, while in practice is would be enough to compute it just before returning the final value to the client). I admit the cpu part is not so clear as if a counter doesn't change, we avoid recomputing the value on each read by keeping it aside. I still think this would be a win to no keep the pre-computed value. And anyway, I find this juggling between the value and the partionedCounter adds clutter and is error-prone. That's not really a remark on the patch itself, but the patch includes the fix for CASSANDRA-1830 , so I think we'd better commit the #1830 first and rebase this afterwards. A few other more minor remarks: Renaming repair_on_write to replicate_on_write would make me happy, in that I do hope we can later improve this mechanism so that it helps establish the usual consistency guarantees. In which case I believe replicate_on_write will be a more precise and less confusing name.</nitpicking> About the repair on write, I would actually be in favor of renaming it further in replicate_counter_on_write (or repair_counter_on_write ) and/or disallowing it for non counter CFs. Even though there is nothing intrinsically wrong with doing repair_on_write on non counter CFs, I think it would be a disservice to users to let them think it could be useful (unless it does have a use for non counter CFs, but I really don't see any). A few lines in ColumnSerializer do not follow the coding style. Arguably those are either trivially fixable and/or more a matter of opinion than anything. The exception being maybe the point on getting rid of keeping both value and partionedCounter. I do believe it would be a nice change but I admit this is not crucial and I would be OK with committing as is and keep this as a future refactoring in the interest of having things move forward (note however that changing this will break the on disk format, but I think we agreed that we'll probably break it anyway to introduce uuids). Anyway, I would still love to get your opinions on the issue
          Hide
          Kelvin Kakugawa added a comment -

          Yes, I agree that modifying the value / partitioned counter implementation is interesting. I thought about it and my original rationale isn't as strong--optimizing for reads. We could do a lazy optimization that keeps the calculated total, in memory, after it's been read the first time (invalidating, after an update).

          I think the rest of the comments are reasonable, as well. Although, I've fixed 1830 in this patch months ago. (I should have broken it out into a separate patch.) And, I'd prefer replicate_on_write, because it may be more general. Although, like you, I don't see a use for it beyond counters, atm.

          I'd be happy to fix the above after commit, though. As long as we modify the value / partitioned counter implementation before 0.7.0 is final.

          Show
          Kelvin Kakugawa added a comment - Yes, I agree that modifying the value / partitioned counter implementation is interesting. I thought about it and my original rationale isn't as strong--optimizing for reads. We could do a lazy optimization that keeps the calculated total, in memory, after it's been read the first time (invalidating, after an update). I think the rest of the comments are reasonable, as well. Although, I've fixed 1830 in this patch months ago. (I should have broken it out into a separate patch.) And, I'd prefer replicate_on_write, because it may be more general. Although, like you, I don't see a use for it beyond counters, atm. I'd be happy to fix the above after commit, though. As long as we modify the value / partitioned counter implementation before 0.7.0 is final.
          Hide
          Jonathan Ellis added a comment -

          We can't put this in a stable release series until we can commit to not breaking backwards compatibility as we fix things. We should commit to trunk, then once everything is stablilized we can consider backporting to 0.7.x. (Not to mention 0.7.0 is deep into the rc series, it's too late to add major features even if it were at no-known-problems.)

          Show
          Jonathan Ellis added a comment - We can't put this in a stable release series until we can commit to not breaking backwards compatibility as we fix things. We should commit to trunk, then once everything is stablilized we can consider backporting to 0.7.x. (Not to mention 0.7.0 is deep into the rc series, it's too late to add major features even if it were at no-known-problems.)
          Hide
          Kelvin Kakugawa added a comment -

          Yes, I'm onboard w/ this. (I'm thinking about what would need to be refactored to support the value / partitioned counter refactor.)

          Show
          Kelvin Kakugawa added a comment - Yes, I'm onboard w/ this. (I'm thinking about what would need to be refactored to support the value / partitioned counter refactor.)
          Hide
          Kelvin Kakugawa added a comment -

          re-cut patch against trunk@e834012cd8f59d522c460e32f139cf785d1f97ee
          note: apache's git repo is more up-to-date than github.com/apache

          1830 code refactored w/ 1072's version of the fix. i.e. digest check broken out into a separate method.

          Show
          Kelvin Kakugawa added a comment - re-cut patch against trunk@e834012cd8f59d522c460e32f139cf785d1f97ee note: apache's git repo is more up-to-date than github.com/apache 1830 code refactored w/ 1072's version of the fix. i.e. digest check broken out into a separate method.
          Hide
          Sylvain Lebresne added a comment -

          Why refactor 1830 in this patch ? To have this broken in a separate method is purely a matter a taste. Not even saying I don't prefer it, but I think we'd better keep each ticket focused. And this ticket has enough for himself

          But that's a detail. And other than that, as said previously I'm fine with doing the refactor the partitioned counter into the column value post-commit.

          Before committing I think it would be nice to do the renaming of repair_on_write to replicate_on_write (I'm fine with just renaming the public api for now), to fix the coding style violation and to throw an InvalidRequestException early in CassandraServer when CL != CL.ONE. If only because those should be really quick to fix.
          But then I'm basically +1 on this (I won't be joignable for the next two weeks so I figured I'd say it now ).

          Show
          Sylvain Lebresne added a comment - Why refactor 1830 in this patch ? To have this broken in a separate method is purely a matter a taste. Not even saying I don't prefer it, but I think we'd better keep each ticket focused. And this ticket has enough for himself But that's a detail. And other than that, as said previously I'm fine with doing the refactor the partitioned counter into the column value post-commit. Before committing I think it would be nice to do the renaming of repair_on_write to replicate_on_write (I'm fine with just renaming the public api for now), to fix the coding style violation and to throw an InvalidRequestException early in CassandraServer when CL != CL.ONE. If only because those should be really quick to fix. But then I'm basically +1 on this (I won't be joignable for the next two weeks so I figured I'd say it now ).
          Hide
          Kelvin Kakugawa added a comment -

          renamed repair-on-write to replicate-on-write
          fixed coding style in ColumnSerializer
          check for CL.ONE in add() and batch_add()

          Show
          Kelvin Kakugawa added a comment - renamed repair-on-write to replicate-on-write fixed coding style in ColumnSerializer check for CL.ONE in add() and batch_add()
          Hide
          Kelvin Kakugawa added a comment - - edited

          modified to: lazily calculate total on client read + don't serialize value to disk

          Show
          Kelvin Kakugawa added a comment - - edited modified to: lazily calculate total on client read + don't serialize value to disk
          Hide
          Ryan King added a comment -

          Committing to trunk seems like a reasonable approach. Hopefully we can successfully backport to 0.7 then.

          Show
          Ryan King added a comment - Committing to trunk seems like a reasonable approach. Hopefully we can successfully backport to 0.7 then.
          Hide
          Chris Goffinet added a comment -

          +1 this changeset. I want to commit this to trunk, and will do so. Speak now or forever hold your peace.

          Show
          Chris Goffinet added a comment - +1 this changeset. I want to commit this to trunk, and will do so. Speak now or forever hold your peace.
          Hide
          Jonathan Ellis added a comment -

          I think it would be appropriate to let Johan do the honors, but it's up to you.

          +1

          Show
          Jonathan Ellis added a comment - I think it would be appropriate to let Johan do the honors, but it's up to you. +1
          Hide
          Kelvin Kakugawa added a comment -

          version of the patch, pre-value refactor.

          Show
          Kelvin Kakugawa added a comment - version of the patch, pre-value refactor.
          Hide
          Johan Oskarsson added a comment -

          Committed CASSANDRA-1072.pre-value_refactor.patch to trunk. Thanks for all the help everyone!

          Show
          Johan Oskarsson added a comment - Committed CASSANDRA-1072 .pre-value_refactor.patch to trunk. Thanks for all the help everyone!
          Hide
          Hudson added a comment -

          Integrated in Cassandra #637 (See https://hudson.apache.org/hudson/job/Cassandra/637/)
          Adds support for columns that act as incr/decr counters. Patch primarily by Kelvin Kakugawa with select parts from Chris Goffinet, Sylvain Lebresne, Rob Coli, Johan Oskarsson, Adam Samet, Jaakko Laine and more. Review by Jonathan Ellis and Sylvain Lebresne. CASSANDRA-1072.

          Show
          Hudson added a comment - Integrated in Cassandra #637 (See https://hudson.apache.org/hudson/job/Cassandra/637/ ) Adds support for columns that act as incr/decr counters. Patch primarily by Kelvin Kakugawa with select parts from Chris Goffinet, Sylvain Lebresne, Rob Coli, Johan Oskarsson, Adam Samet, Jaakko Laine and more. Review by Jonathan Ellis and Sylvain Lebresne. CASSANDRA-1072 .
          Hide
          ebullient12 added a comment -

          I am implementing counters. Does anyone has php client for increment_test.py or any help in this direction?

          Show
          ebullient12 added a comment - I am implementing counters. Does anyone has php client for increment_test.py or any help in this direction?

            People

            • Assignee:
              Kelvin Kakugawa
              Reporter:
              Johan Oskarsson
              Reviewer:
              Sylvain Lebresne
            • Votes:
              16 Vote for this issue
              Watchers:
              41 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development