Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Fix Version/s: 1.2.0 beta 2
    • Component/s: API, Core
    • Labels:
      None

      Description

      I discussed this in the context of triggers (CASSANDRA-1311) but it's useful as a standalone feature as well.

        Issue Links

          Activity

          Jonathan Ellis created issue -
          Hide
          Jonathan Ellis added a comment -

          In the trigger discussion (starting here and continuing here) I proposed a distributed commitlog that the coordinator could use to retry partially successful batches.

          To flesh that out a bit, I think there are two approaches we can take here:

          1. the distributed commitlog ("CSCL") approach discussed in the trigger ticket. This is definitely more complex assuming DCL RF > 1, since a write to any DCL replica succeeds, the commitlog write can be considered successful – and timeout still means "we don't know, retry." Then you have the complexity of DCL replay – sort of like hint handoff, in reverse, in the sense that we need data from other nodes that may or may not be up at the same time as us – and of course it's going to basically halve write performance.
          2. a local-only approach, where batches are written to a non-replicated system CF the way hints are now. This would provide adequate durability when we can rely on Raid1/Raid10 local disks; we don't need to worry about preserving this data indefinitely, after all; only until it's persisted to the other replicas. However, this is a non-starter for cloud environments where the provider will just nuke VMs out from under you if there's a problem, and even for non-cloud environments many prefer to deploy on Raid0 instead of paying the space overhead for Raid10.

          So I think we should

          • Start with the distributed commitlog since it is more generally useful, but
          • Make batch atomicity optional, so users who don't need it don't pay any performance penalty over what we have now
          Show
          Jonathan Ellis added a comment - In the trigger discussion (starting here and continuing here ) I proposed a distributed commitlog that the coordinator could use to retry partially successful batches. To flesh that out a bit, I think there are two approaches we can take here: the distributed commitlog ("CSCL") approach discussed in the trigger ticket. This is definitely more complex assuming DCL RF > 1, since a write to any DCL replica succeeds, the commitlog write can be considered successful – and timeout still means "we don't know, retry." Then you have the complexity of DCL replay – sort of like hint handoff, in reverse, in the sense that we need data from other nodes that may or may not be up at the same time as us – and of course it's going to basically halve write performance. a local-only approach, where batches are written to a non-replicated system CF the way hints are now. This would provide adequate durability when we can rely on Raid1/Raid10 local disks; we don't need to worry about preserving this data indefinitely, after all; only until it's persisted to the other replicas. However, this is a non-starter for cloud environments where the provider will just nuke VMs out from under you if there's a problem, and even for non-cloud environments many prefer to deploy on Raid0 instead of paying the space overhead for Raid10. So I think we should Start with the distributed commitlog since it is more generally useful, but Make batch atomicity optional, so users who don't need it don't pay any performance penalty over what we have now
          Hide
          Jonathan Ellis added a comment -

          The DCL should be in its own keyspace, both for hygiene and so replication strategy can be tuned independently. In particular I think RF=1 will be Good Enough for 90% of use cases; remember that our goal here is to handle the case when the coordinator has sent part of a batch out, but not the entire thing. Once the entire batch is sent out to one replica, HH and repair can take care of making sure that gets to the rest of the cluster.

          So it's only the relatively narrow window of sending out the rows in the batch that we need to worry about here – once that is done we don't care about preserving the original batch anymore. The coordinator itself is thus effectively a replica for the useful part of the operation, so a single other machine should be adequate protection.

          Unfortunately we don't have a way to say "store this replica anywhere but locally." So maybe we do need to go to RF=2 to make sure we have another non-local copy.

          Alternatively maybe we could special case this and write a hint elsewhere if we detect that the DCL replica would be local.

          Show
          Jonathan Ellis added a comment - The DCL should be in its own keyspace, both for hygiene and so replication strategy can be tuned independently. In particular I think RF=1 will be Good Enough for 90% of use cases; remember that our goal here is to handle the case when the coordinator has sent part of a batch out, but not the entire thing. Once the entire batch is sent out to one replica, HH and repair can take care of making sure that gets to the rest of the cluster. So it's only the relatively narrow window of sending out the rows in the batch that we need to worry about here – once that is done we don't care about preserving the original batch anymore. The coordinator itself is thus effectively a replica for the useful part of the operation, so a single other machine should be adequate protection. Unfortunately we don't have a way to say "store this replica anywhere but locally." So maybe we do need to go to RF=2 to make sure we have another non-local copy. Alternatively maybe we could special case this and write a hint elsewhere if we detect that the DCL replica would be local.
          Hide
          Jonathan Ellis added a comment -

          We'd also want to restrict the DCL to nodes in the same datacenter. No sense in sending this data over the WAN. Since ReplicationStrategy only depends on the partition key, not the coordinator location, we probably need to do something sneaky like having a strategy that generates N replicas in each DC, then special case the DCL in StorageProxy to ignore replicas outside the coordinator's DC.

          Show
          Jonathan Ellis added a comment - We'd also want to restrict the DCL to nodes in the same datacenter. No sense in sending this data over the WAN. Since ReplicationStrategy only depends on the partition key, not the coordinator location, we probably need to do something sneaky like having a strategy that generates N replicas in each DC, then special case the DCL in StorageProxy to ignore replicas outside the coordinator's DC.
          Jeremy Hanna made changes -
          Field Original Value New Value
          Link This issue is related to CASSANDRA-1311 [ CASSANDRA-1311 ]
          Hide
          Jonathan Ellis added a comment - - edited

          Here's the data model I'm leaning towards:

          CREATE TABLE batchlog (
            coordinator inet,
            shard       int,
            id          uuid,
            data        blob,
            PRIMARY KEY ((coordinator, shard))
          );
          

          (Using CASSANDRA-4179 syntax for composite-partition-key.) As discussed in CASSANDRA-1311, this is going to be a very tombstone-heavy CF since the workload looks like

          1. insert batchlog entry
          2. replicate batch
          3. remove batchlog entry

          So we're going to want to shard each coordinator's entries to avoid the problems attendant to Very Wide Rows. Unlike most such workloads, we don't actually need to time-order our entries; since batches are idempotent, replay order won't matter. Thus, we can just pick a random shard id (in a known range, say 0 to 63) to use for each entry, and on replay we will ready from each shard.

          Other notes:

          • I think we can cheat in the replication strategy by knowing that part of the partition key is the coordinator address, to avoid replicating to itself
          • default RF will be 1; operators can increase if desired
          • operators can also disable [local] commitlog on the batchlog CF, if desired
          • gcgs can be safely set to zero in all cases; worst that happens is we replay a write a second time which is not a problem
          • Currently we always write tombstones to sstables in Memtable flush. Should add a check for gcgs=0 to do an extra removeDeleted pass, which would make the actual sstable contents for batchlog almost nothing (since the normal, everything-is-working case will be that it gets deleted out while still in the memtable).
          • I think we do want to use inetaddr instead of node uuid as the coordinator id here – this gives us a replacement node (w/ the same IP) taking over for a dead one "automatic" ownership of the dead node's batchlog.
          Show
          Jonathan Ellis added a comment - - edited Here's the data model I'm leaning towards: CREATE TABLE batchlog ( coordinator inet, shard int , id uuid, data blob, PRIMARY KEY ((coordinator, shard)) ); (Using CASSANDRA-4179 syntax for composite-partition-key.) As discussed in CASSANDRA-1311 , this is going to be a very tombstone-heavy CF since the workload looks like insert batchlog entry replicate batch remove batchlog entry So we're going to want to shard each coordinator's entries to avoid the problems attendant to Very Wide Rows. Unlike most such workloads, we don't actually need to time-order our entries; since batches are idempotent, replay order won't matter. Thus, we can just pick a random shard id (in a known range, say 0 to 63) to use for each entry, and on replay we will ready from each shard. Other notes: I think we can cheat in the replication strategy by knowing that part of the partition key is the coordinator address, to avoid replicating to itself default RF will be 1; operators can increase if desired operators can also disable [local] commitlog on the batchlog CF, if desired gcgs can be safely set to zero in all cases; worst that happens is we replay a write a second time which is not a problem Currently we always write tombstones to sstables in Memtable flush. Should add a check for gcgs=0 to do an extra removeDeleted pass, which would make the actual sstable contents for batchlog almost nothing (since the normal, everything-is-working case will be that it gets deleted out while still in the memtable). I think we do want to use inetaddr instead of node uuid as the coordinator id here – this gives us a replacement node (w/ the same IP) taking over for a dead one "automatic" ownership of the dead node's batchlog.
          Jonathan Ellis made changes -
          Link This issue depends on CASSANDRA-4179 [ CASSANDRA-4179 ]
          Jonathan Ellis made changes -
          Assignee Jonathan Ellis [ jbellis ]
          Jonathan Ellis made changes -
          Status Open [ 1 ] In Progress [ 3 ]
          Hide
          Jonathan Ellis added a comment -

          We can break up implementation as follows:

          1. Add an atomic_batch_mutate method (with the same parameters as batch_mutate) using batchlog/SimpleStrategy in CassandraServer + StorageProxy
          2. Implement batchlog replay
          3. Implement a custom BatchlogStrategy that ensures redundancy at RF=1
          4. Add CQL3 support

          3 and 4 appear straightforward. 1 and 2 have some hairier corners:

          For the batchlog write:

          • We don't want to make the write path more fragile (in the sense that atomic writes will fail, where non-atomic ones would succeed). But, the batchlog shard will probably be on a different machine than the replicas. If that machine is down, we could raise UnavailableException... but better would be to try different shards until we find one whose owner is up.
          • Part of the goal here is to avoid forcing the client to retry on TimedOutException. So if we attempt a batchlog write that times out, we should also retry to another shard instead of propagating TOE to the client.
          • Corollary: we don't need to worry about batchlog hints.
          • Finally, once the batchlog write succeeds, we shouldn't have to make the client retry for timeouts writing to the replicas either; we can do the retry server-side. But, we can't just return success, since that would imply that we'd achieved the requested ConsistencyLevel and the data is available to be read. Instead, we should introduce a new exception (InProgressException?) to indicate that the data isn't available to read yet, but the client does not need to retry. (We could use this exception as well for normal reads, where we have at least one replica acknowledge the update in time.)
          • What about RF and CL for batchlog? If it's convenient, we can allow users to customize batchlog RF, but we should always use CL=1 for read and writes. (If we need to go lower level though instead of re-using the normal write path, I'm fine with hardcoding RF=1.) We don't care about "latest versions," since we're append only and if we don't see an entry on one replay attempt we'll see it on the next, and we really don't need more durability than one replica since it's only a "staging area" until it's sent out to the replicas. The main alternative would be to use the CL for the batch, in the batchlog write. I don't like that though because that's going to introduce extra latency for the batchlog write that you don't want 99% of the time.

          For replay:

          • The main difficulty is that the batchlog shard owners can't be assumed to be alive when we restart. So, we'll need to track replay status for each shard: check on startup and retry periodically until we're successful. (One advantage of restricting batchlog to RF=1 is, when a read succeeds we know we're done replaying. But if we have RF>1, then we need to retry the read indefinitely in case another replica recovered that had additional entries.
          Show
          Jonathan Ellis added a comment - We can break up implementation as follows: Add an atomic_batch_mutate method (with the same parameters as batch_mutate) using batchlog/SimpleStrategy in CassandraServer + StorageProxy Implement batchlog replay Implement a custom BatchlogStrategy that ensures redundancy at RF=1 Add CQL3 support 3 and 4 appear straightforward. 1 and 2 have some hairier corners: For the batchlog write: We don't want to make the write path more fragile (in the sense that atomic writes will fail, where non-atomic ones would succeed). But, the batchlog shard will probably be on a different machine than the replicas. If that machine is down, we could raise UnavailableException... but better would be to try different shards until we find one whose owner is up. Part of the goal here is to avoid forcing the client to retry on TimedOutException. So if we attempt a batchlog write that times out, we should also retry to another shard instead of propagating TOE to the client. Corollary: we don't need to worry about batchlog hints. Finally, once the batchlog write succeeds, we shouldn't have to make the client retry for timeouts writing to the replicas either; we can do the retry server-side. But, we can't just return success, since that would imply that we'd achieved the requested ConsistencyLevel and the data is available to be read. Instead, we should introduce a new exception (InProgressException?) to indicate that the data isn't available to read yet, but the client does not need to retry. (We could use this exception as well for normal reads, where we have at least one replica acknowledge the update in time.) What about RF and CL for batchlog? If it's convenient, we can allow users to customize batchlog RF, but we should always use CL=1 for read and writes. (If we need to go lower level though instead of re-using the normal write path, I'm fine with hardcoding RF=1.) We don't care about "latest versions," since we're append only and if we don't see an entry on one replay attempt we'll see it on the next, and we really don't need more durability than one replica since it's only a "staging area" until it's sent out to the replicas. The main alternative would be to use the CL for the batch, in the batchlog write. I don't like that though because that's going to introduce extra latency for the batchlog write that you don't want 99% of the time. For replay: The main difficulty is that the batchlog shard owners can't be assumed to be alive when we restart. So, we'll need to track replay status for each shard: check on startup and retry periodically until we're successful. (One advantage of restricting batchlog to RF=1 is, when a read succeeds we know we're done replaying. But if we have RF>1, then we need to retry the read indefinitely in case another replica recovered that had additional entries.
          Hide
          Sylvain Lebresne added a comment -

          If I understand that correctly, only the coordinator of a given batch might be able to replay batches. The problem I can see with that is that if the node dies and you never "replace it" (i.e. bring a node with the same IP back up), then you might never replay some batches. Which put a strong burden on the operator not to screw up. Besides, the batches won't be replay until a replacement node is brought up, which means that even if we replay it ultimately, it can take an unbounded time to do it.

          So I would also add a mechanism to allow other nodes to replay batches. For instance, when a node A detects that another node B is down, it could check whether it has some batches for B locally and replay them (node B will replay them too when he's back up but that doesn't matter).

          we need to retry the read indefinitely in case another replica recovered

          For that too we can use the failure detector to track which node we've successfully checked since restart (avoids the "indefinitely" part).

          default RF will be 1; operators can increase if desired

          I'll admit I find 1 just a bit too low for a default (especially given it'll be global) and I would prefer at least 2. My reasoning is that:

          1. RF=1 is a tad unsafe as far as durability is concerned.
          2. RF=1 has the problem that if the one replica you've picked might timeout. Even if you automatically retry another shard (which I'm not in favor of, see below), it will screw up the latency. RF > 1 (with CL.ONE) largely mitigate that issue.
          3. A higher RF won't be slower during the writes (it will actually be faster because of my preceding point) and that is really what we care about. If replay is a bit slower because of it, it's not a big deal (especially given that there will never be much to replay).

          Part of the goal here is to avoid forcing the client to retry on TimedOutException. So if we attempt a batchlog write that times out, we should also retry to another shard instead of propagating TOE to the client.

          I think that what this ticket will provide is an extention of the atomicity that exists for batches to the same key to all batches, and I don't think this give us much more than that. So I fully expect the retry policy for clients to be unchanged (most of the time client applications want to retry because what they care about is to achieve a given consistency level, or because they care that the data is replicated to at least X node).

          In other words, I see a timeout as saying "I haven't been able to achieve the requested consistency level in time". This ticket doesn't change that, it only makes stronger guarantee on the state of the DB in that case (which is good). But I don't see why that would make us start doing retry server-side.

          we shouldn't have to make the client retry for timeouts writing to the replicas either; we can do the retry server-side

          Same as above, I disagree .

          Instead, we should introduce a new exception (InProgressException?) to indicate that the data isn't available to read yet

          As said above I think that this should still be a TimeoutException. However, I do see a point in giving more info on what that timeout means and I've opened for CASSANDRA-4414 for that (which I meant to do since some time anyway). Having suceesfully wrote to the DCL could just be one of the info we would add to the TimeoutException.

          Show
          Sylvain Lebresne added a comment - If I understand that correctly, only the coordinator of a given batch might be able to replay batches. The problem I can see with that is that if the node dies and you never "replace it" (i.e. bring a node with the same IP back up), then you might never replay some batches. Which put a strong burden on the operator not to screw up. Besides, the batches won't be replay until a replacement node is brought up, which means that even if we replay it ultimately, it can take an unbounded time to do it. So I would also add a mechanism to allow other nodes to replay batches. For instance, when a node A detects that another node B is down, it could check whether it has some batches for B locally and replay them (node B will replay them too when he's back up but that doesn't matter). we need to retry the read indefinitely in case another replica recovered For that too we can use the failure detector to track which node we've successfully checked since restart (avoids the "indefinitely" part). default RF will be 1; operators can increase if desired I'll admit I find 1 just a bit too low for a default (especially given it'll be global) and I would prefer at least 2. My reasoning is that: RF=1 is a tad unsafe as far as durability is concerned. RF=1 has the problem that if the one replica you've picked might timeout. Even if you automatically retry another shard (which I'm not in favor of, see below), it will screw up the latency. RF > 1 (with CL.ONE) largely mitigate that issue. A higher RF won't be slower during the writes (it will actually be faster because of my preceding point) and that is really what we care about. If replay is a bit slower because of it, it's not a big deal (especially given that there will never be much to replay). Part of the goal here is to avoid forcing the client to retry on TimedOutException. So if we attempt a batchlog write that times out, we should also retry to another shard instead of propagating TOE to the client. I think that what this ticket will provide is an extention of the atomicity that exists for batches to the same key to all batches, and I don't think this give us much more than that. So I fully expect the retry policy for clients to be unchanged (most of the time client applications want to retry because what they care about is to achieve a given consistency level, or because they care that the data is replicated to at least X node). In other words, I see a timeout as saying "I haven't been able to achieve the requested consistency level in time". This ticket doesn't change that, it only makes stronger guarantee on the state of the DB in that case (which is good). But I don't see why that would make us start doing retry server-side. we shouldn't have to make the client retry for timeouts writing to the replicas either; we can do the retry server-side Same as above, I disagree . Instead, we should introduce a new exception (InProgressException?) to indicate that the data isn't available to read yet As said above I think that this should still be a TimeoutException. However, I do see a point in giving more info on what that timeout means and I've opened for CASSANDRA-4414 for that (which I meant to do since some time anyway). Having suceesfully wrote to the DCL could just be one of the info we would add to the TimeoutException.
          Hide
          Jonathan Ellis added a comment -

          only the coordinator of a given batch might be able to replay batches

          Right, my (unspecified so far) assumption was we would provide a way to assume responsibility for a removed coordinator's orphaned entries on removetoken/decommission/replacetoken.

          when a node A detects that another node B is down, it could check whether it has some batches for B locally and replay them

          That makes sense, that would be a lot more timely than waiting to replace B. If we have B delete these when it's done we might not even need to worry about removetoken et al.

          I find 1 just a bit too low for a default

          Well, it's more complex than that. If we used the BacklogStrategy proposed above, it's really "RF=1+", because we need (1) the coordinator to go down before it can replicate out to the actual data replicas and (2) the backlog shard host to die unrecoverably, to lose data. So there is a much more narrow window in which hardware failure can cause data loss, than in a traditional RF=1 case where if we lose that node at any time from now on, we lose data. So we need at least RF=2 in that case for redundancy, since there is no alternative. But in the backlog case both the coordinator and the client provide redundancy, across a small window of vulnerability.

          That said, I think if we just use the normal SP read path for replay purposes, we get arbitrary RF support automatically, so I don't think using 1 as a default and allowing it to be tuned as desired will be a problem.

          I fully expect the retry policy for clients to be unchanged

          I think this is (a) an important improvement addressing (b) a significant pain among people who have actually looked close enough to realize what the "official" policy is today, and (c) one that we can fix without much difficulty in the context of atomic batches. We use the local commitlog for both atomicity and durability; we can use the distributed batchlog in the same way. (I note in passing that Megastore uses Bigtable rows as a distributed transaction log in a similar fashion.)

          "Failed writes leave the cluster in an unknown state" is the most frequent [legitimate] complaint users have about Cassandra, and one that affects evaluations vs master-oriented systems. We can try to educate about the difference between UE failure and TOE not-really-failure until we are blue in the face but we will continue to get hammered for it.

          The standard answer to "just retry the operation" isn't really adequate, either. If part of a batch times out, and then the client dies before retry is successful, then we will have no way to recover from the inconsistency (in the ACID sense, not the CAP sense).

          Thus, Hector has implemented a client-side commitlog, which helps, but this is neither something every client should need to reimplement, nor is it as durable as what we can provide on the server (since it's always effectively RF=1), nor do we expect client machines to be as robust as the ones in the Cassandra cluster.

          Now, we cannot eliminate TOE 100% of the time, but we can come very very close – with the approach I outlined, the only case we need to hand back a TOE for is if the coordinator attempts a backlog write to a believed-to-be-up node, the write fails, and then the coordinator gets partitioned off so that there are no other live backlog targets available. Since we cannot continue, and we don't know if the original attempt succeeded or not, we have to return TOE. So we will (1) dramatically reduce TOE, and (2) the TOE we do hand back will not cause inconsistency if the client dies before it can retry.

          I understand the argument that (2) is the really important part, but again, we can deliver (1) without significantly more effort, so I think it's worth doing. It's the difference between "you should still implement client-side retry after each op in case of failure" and "you can probably ignore this the way you do today with the chance of your Oracle installation failing before it gives you the answer."

          Show
          Jonathan Ellis added a comment - only the coordinator of a given batch might be able to replay batches Right, my (unspecified so far) assumption was we would provide a way to assume responsibility for a removed coordinator's orphaned entries on removetoken/decommission/replacetoken. when a node A detects that another node B is down, it could check whether it has some batches for B locally and replay them That makes sense, that would be a lot more timely than waiting to replace B. If we have B delete these when it's done we might not even need to worry about removetoken et al. I find 1 just a bit too low for a default Well, it's more complex than that. If we used the BacklogStrategy proposed above, it's really "RF=1+", because we need (1) the coordinator to go down before it can replicate out to the actual data replicas and (2) the backlog shard host to die unrecoverably, to lose data. So there is a much more narrow window in which hardware failure can cause data loss, than in a traditional RF=1 case where if we lose that node at any time from now on, we lose data. So we need at least RF=2 in that case for redundancy, since there is no alternative. But in the backlog case both the coordinator and the client provide redundancy, across a small window of vulnerability. That said, I think if we just use the normal SP read path for replay purposes, we get arbitrary RF support automatically, so I don't think using 1 as a default and allowing it to be tuned as desired will be a problem. I fully expect the retry policy for clients to be unchanged I think this is (a) an important improvement addressing (b) a significant pain among people who have actually looked close enough to realize what the "official" policy is today, and (c) one that we can fix without much difficulty in the context of atomic batches. We use the local commitlog for both atomicity and durability; we can use the distributed batchlog in the same way. (I note in passing that Megastore uses Bigtable rows as a distributed transaction log in a similar fashion.) "Failed writes leave the cluster in an unknown state" is the most frequent [legitimate] complaint users have about Cassandra, and one that affects evaluations vs master-oriented systems. We can try to educate about the difference between UE failure and TOE not-really-failure until we are blue in the face but we will continue to get hammered for it. The standard answer to "just retry the operation" isn't really adequate, either. If part of a batch times out, and then the client dies before retry is successful, then we will have no way to recover from the inconsistency (in the ACID sense, not the CAP sense). Thus, Hector has implemented a client-side commitlog, which helps, but this is neither something every client should need to reimplement, nor is it as durable as what we can provide on the server (since it's always effectively RF=1), nor do we expect client machines to be as robust as the ones in the Cassandra cluster. Now, we cannot eliminate TOE 100% of the time, but we can come very very close – with the approach I outlined, the only case we need to hand back a TOE for is if the coordinator attempts a backlog write to a believed-to-be-up node, the write fails, and then the coordinator gets partitioned off so that there are no other live backlog targets available. Since we cannot continue, and we don't know if the original attempt succeeded or not, we have to return TOE. So we will (1) dramatically reduce TOE, and (2) the TOE we do hand back will not cause inconsistency if the client dies before it can retry. I understand the argument that (2) is the really important part, but again, we can deliver (1) without significantly more effort, so I think it's worth doing. It's the difference between "you should still implement client-side retry after each op in case of failure" and "you can probably ignore this the way you do today with the chance of your Oracle installation failing before it gives you the answer."
          Hide
          Sylvain Lebresne added a comment -

          Well, it's more complex than that.

          I understand that but:

          • with RF=1 we would still write to disk on only 1 node. So if some disk in the cluster has any problem, then it's enough to have one other node going down (it doesn't have to be another hardware failure, it could a simple OOM or anything really) to break the atomicity "guarantee". Granted you have to be a bit unlucky, but the odds are far from unimaginable imo. And that what guarantee are about, protecting you against being unlucky. I think RF=2 make this order of magnitudes more secure, and if RF=2 had big drawbacks, then ok, why not consider RF=1 has the default, but I don't think that's the case, quite the contrary even.
          • as said in my previous comment, it's not only about durability. It's a latency issue. If you do RF=1, then each time a node dies (is upgraded or whatnot) you know that some portion of batchlog writes on the cluster will suffer from timeouts (even if we retry on the coordinator, the latency will still suffer). That's actually the main reason why I think RF=2 is a much much better default.
          • I don't see much downside to RF=2 compared to RF=1. A little bit more network traffic and more CPU usage maybe, but I think those are largely outweighed by the advantages.

          Overall I do think quite strongly that RF=1 is the wrong default (and having it configurable don't make it a better default).

          "Failed writes leave the cluster in an unknown state" is the most frequent [legitimate] complaint users have about Cassandra, and one that affects evaluations vs master-oriented systems. We can try to educate about the difference between UE failure and TOE not-really-failure until we are blue in the face but we will continue to get hammered for it.

          Let's be clear that I completely agree with that. But fixing "Failed writes leave the cluster in an unknown state" is fixed by fixing atomicity. And I'm all for fixing batch atomicity, and I even think that for CQL3 we should make batch be atomic by default for all the reasons you mentioned (which wouldn't exclude having some escape hatch like "BATCH ... APPLY WITHOUT ATOMICITY GUARANTEE"). But whether we do coordinator-side retry is not directly related imho (and so at best should be considered separatly).

          To be precise, the DCL patch will add one more possibility for TOE compared to the current write path, and that's a TOE while writting into the DCL. First, I think that using RF=2 will largely mitigate the chance of getting that TOE in the first place as said above. But that being said we could indeed retry another shard if we do still get a TOE I suppose. The only thing that bothers me a bit is that I think it's useful that the timeout configured by the client be an actual timeout on the server answer, even if to say that we haven't achieved what asked in the time granted (and again, I'm all for returning more information on what a TOE means exactly, i.e. CASSANDRA-4414, so that client may be able to judge whether what we do have been able to achieve during that time is enough that he don't need to retry). However I suppose one option could be to try the DCL write with a smaller timeout than the client supplied one, so that we can do a retry while respecting the client timeout.

          Finally, once the batchlog write succeeds, we shouldn't have to make the client retry for timeouts writing to the replicas either; we can do the retry server-side

          My point is that retrying server-side in that case would be plain wrong. On the write path (that's not true for read but that is a different subject), a timeout when writting to the replicas means that the CL cannot be achieved at the current time (counter are another exception of that, but they are a whole different problem). So retrying (client and server side for that matter) with the same CL is useless and bad. The only thing that can be improved compared to today is that we can say to the client that while the CL cannot be achieve we did persist the write on some replica, which would remove the retry-with-smaller-CL-because-even-if-I-can't-get-my-CL-I-want-to-make-sure-the-write-is-at-least-persisted-on-some-replicas most client probably do today. And that is really useful, but that is also a totally separate issue to that ticket (namely CASSANDRA-4414) that don't only apply to batches nor only to the atomic ones.

          As a side note, I wouldn't be completely against discussing the possibility of doing some coordinator-side retry for reads, but that's a different issue

          Show
          Sylvain Lebresne added a comment - Well, it's more complex than that. I understand that but: with RF=1 we would still write to disk on only 1 node. So if some disk in the cluster has any problem, then it's enough to have one other node going down (it doesn't have to be another hardware failure, it could a simple OOM or anything really) to break the atomicity "guarantee". Granted you have to be a bit unlucky, but the odds are far from unimaginable imo. And that what guarantee are about, protecting you against being unlucky. I think RF=2 make this order of magnitudes more secure, and if RF=2 had big drawbacks, then ok, why not consider RF=1 has the default, but I don't think that's the case, quite the contrary even. as said in my previous comment, it's not only about durability. It's a latency issue. If you do RF=1, then each time a node dies (is upgraded or whatnot) you know that some portion of batchlog writes on the cluster will suffer from timeouts (even if we retry on the coordinator, the latency will still suffer). That's actually the main reason why I think RF=2 is a much much better default. I don't see much downside to RF=2 compared to RF=1. A little bit more network traffic and more CPU usage maybe, but I think those are largely outweighed by the advantages. Overall I do think quite strongly that RF=1 is the wrong default (and having it configurable don't make it a better default). "Failed writes leave the cluster in an unknown state" is the most frequent [legitimate] complaint users have about Cassandra, and one that affects evaluations vs master-oriented systems. We can try to educate about the difference between UE failure and TOE not-really-failure until we are blue in the face but we will continue to get hammered for it. Let's be clear that I completely agree with that. But fixing "Failed writes leave the cluster in an unknown state" is fixed by fixing atomicity. And I'm all for fixing batch atomicity, and I even think that for CQL3 we should make batch be atomic by default for all the reasons you mentioned (which wouldn't exclude having some escape hatch like "BATCH ... APPLY WITHOUT ATOMICITY GUARANTEE"). But whether we do coordinator-side retry is not directly related imho (and so at best should be considered separatly). To be precise, the DCL patch will add one more possibility for TOE compared to the current write path, and that's a TOE while writting into the DCL. First, I think that using RF=2 will largely mitigate the chance of getting that TOE in the first place as said above. But that being said we could indeed retry another shard if we do still get a TOE I suppose. The only thing that bothers me a bit is that I think it's useful that the timeout configured by the client be an actual timeout on the server answer, even if to say that we haven't achieved what asked in the time granted (and again, I'm all for returning more information on what a TOE means exactly, i.e. CASSANDRA-4414 , so that client may be able to judge whether what we do have been able to achieve during that time is enough that he don't need to retry). However I suppose one option could be to try the DCL write with a smaller timeout than the client supplied one, so that we can do a retry while respecting the client timeout. Finally, once the batchlog write succeeds, we shouldn't have to make the client retry for timeouts writing to the replicas either; we can do the retry server-side My point is that retrying server-side in that case would be plain wrong. On the write path (that's not true for read but that is a different subject), a timeout when writting to the replicas means that the CL cannot be achieved at the current time (counter are another exception of that, but they are a whole different problem). So retrying (client and server side for that matter) with the same CL is useless and bad. The only thing that can be improved compared to today is that we can say to the client that while the CL cannot be achieve we did persist the write on some replica, which would remove the retry-with-smaller-CL-because-even-if-I-can't-get-my-CL-I-want-to-make-sure-the-write-is-at-least-persisted-on-some-replicas most client probably do today. And that is really useful, but that is also a totally separate issue to that ticket (namely CASSANDRA-4414 ) that don't only apply to batches nor only to the atomic ones. As a side note, I wouldn't be completely against discussing the possibility of doing some coordinator-side retry for reads, but that's a different issue
          Hide
          Jonathan Ellis added a comment -

          If you do RF=1, then each time a node dies (is upgraded or whatnot) you know that some portion of batchlog writes on the cluster will suffer from timeouts (even if we retry on the coordinator, the latency will still suffer). That's actually the main reason why I think RF=2 is a much much better default

          Good point... I tend to think in terms of optimizing for throughput, but I think you're right.

          Our hands may be tied though, how do you default to RF=2 and not screw over single node clusters? I'd be okay with defaulting to RF=1 but logging a WARN on startup if it hasn't been changed.

          But whether we do coordinator-side retry is not directly related imho

          Agreed from a purely theoretical standpoint, but practically it's related in the sense that clients see retries in general as painful. So if we can get to a place where clients don't need to retry except in obscure corner cases (obscure enough that imo ignoring them is reasonable) then so much the better.

          a timeout when writting to the replicas means that the CL cannot be achieved at the current time

          Right, which is where the InProgressException comes in: your write is in progress, you won't necessarily be able to read it yet, but you don't need to retry (unless you need to block for readability).

          Show
          Jonathan Ellis added a comment - If you do RF=1, then each time a node dies (is upgraded or whatnot) you know that some portion of batchlog writes on the cluster will suffer from timeouts (even if we retry on the coordinator, the latency will still suffer). That's actually the main reason why I think RF=2 is a much much better default Good point... I tend to think in terms of optimizing for throughput, but I think you're right. Our hands may be tied though, how do you default to RF=2 and not screw over single node clusters? I'd be okay with defaulting to RF=1 but logging a WARN on startup if it hasn't been changed. But whether we do coordinator-side retry is not directly related imho Agreed from a purely theoretical standpoint, but practically it's related in the sense that clients see retries in general as painful. So if we can get to a place where clients don't need to retry except in obscure corner cases (obscure enough that imo ignoring them is reasonable) then so much the better. a timeout when writting to the replicas means that the CL cannot be achieved at the current time Right, which is where the InProgressException comes in: your write is in progress, you won't necessarily be able to read it yet, but you don't need to retry (unless you need to block for readability).
          Hide
          Sylvain Lebresne added a comment -

          Our hands may be tied though, how do you default to RF=2 and not screw over single node clusters?

          Yeah, that's the thing I haven't really figure out yet. Maybe it wouldn't be too hard to check at startup for each node if there is more than one node and then update the RF to 2. But that's a bit hacky. It's just that the DCL will be an internal thing, and I think it would be really great if 90% of users don't have to even know it exists, even less have to change the RF. But if we don't have a better solution, the WARN thing might be an option.

          So if we can get to a place where clients don't need to retry except in obscure corner cases

          I agree. What I'm saying is that CASSANDRA-4414 is doing just that and actually solves the problem not only for atomic batches but for writes in general.

          which is where the InProgressException comes in

          Again, really just saying that adding InProgressException is just a subpart of CASSANDRA-4414 (the ticket suggests reusing TOE and adding info with it, but I'm fine discussion the option of adding other exception instead (which would achieve the exact same thing)).

          Show
          Sylvain Lebresne added a comment - Our hands may be tied though, how do you default to RF=2 and not screw over single node clusters? Yeah, that's the thing I haven't really figure out yet. Maybe it wouldn't be too hard to check at startup for each node if there is more than one node and then update the RF to 2. But that's a bit hacky. It's just that the DCL will be an internal thing, and I think it would be really great if 90% of users don't have to even know it exists, even less have to change the RF. But if we don't have a better solution, the WARN thing might be an option. So if we can get to a place where clients don't need to retry except in obscure corner cases I agree. What I'm saying is that CASSANDRA-4414 is doing just that and actually solves the problem not only for atomic batches but for writes in general. which is where the InProgressException comes in Again, really just saying that adding InProgressException is just a subpart of CASSANDRA-4414 (the ticket suggests reusing TOE and adding info with it, but I'm fine discussion the option of adding other exception instead (which would achieve the exact same thing)).
          Hide
          Jonathan Ellis added a comment -

          What I'm saying is that CASSANDRA-4414 is doing just that and actually solves the problem not only for atomic batches but for writes in general.

          Sounds like we're in violent agreement, then. I'll comment over there.

          Show
          Jonathan Ellis added a comment - What I'm saying is that CASSANDRA-4414 is doing just that and actually solves the problem not only for atomic batches but for writes in general. Sounds like we're in violent agreement, then. I'll comment over there.
          Jonathan Ellis made changes -
          Link This issue is blocked by CASSANDRA-4414 [ CASSANDRA-4414 ]
          Hide
          Jonathan Ellis added a comment -

          I think the above overcomplicates things a bit.

          The idea with sharding was to

          1. mitigate wide batchlog rows
          2. spread batchlog load better across the cluster
          3. avoid having to fail a batch write if the batchlog replica for a given coordinator is down

          If we make the batchlog replica in charge of replay, these all go away or get simpler.

          1. replica can examine batchlog entries and convert to hints if they are older than 2x write timeout. We expect very few of these so the inefficiency (over having coordinator do it only after restart) is not a Big Deal. This also simplifies replay a great deal (now coordinator no longer has poll for the BL replicas to come up)
          2. since we never have to do a non-local read of the batchlog, we can use a WriteAnywhereStrategy that just picks a random node in the local DC
          3. can be mitigated by making our hypothetical WAS FD-aware, or simply by going to RF=2 (and doing CL.ONE writes).

          I like having replay be local and automatic a great deal, over having the coordinator do it (which implies having some manual? failover mechanism when the coordinator is down for good).

          Note that we'd want to define batchlog with LocalStrategy (in system ks), we'd manually invoke WriteAnywhereStrategy from StorageProxy. Thinking about it, we probably wouldn't want an actual Strategy, just similar code, since we don't actually depend on the row key to pick replicas.

          Show
          Jonathan Ellis added a comment - I think the above overcomplicates things a bit. The idea with sharding was to mitigate wide batchlog rows spread batchlog load better across the cluster avoid having to fail a batch write if the batchlog replica for a given coordinator is down If we make the batchlog replica in charge of replay, these all go away or get simpler. replica can examine batchlog entries and convert to hints if they are older than 2x write timeout. We expect very few of these so the inefficiency (over having coordinator do it only after restart) is not a Big Deal. This also simplifies replay a great deal (now coordinator no longer has poll for the BL replicas to come up) since we never have to do a non-local read of the batchlog, we can use a WriteAnywhereStrategy that just picks a random node in the local DC can be mitigated by making our hypothetical WAS FD-aware, or simply by going to RF=2 (and doing CL.ONE writes). I like having replay be local and automatic a great deal, over having the coordinator do it (which implies having some manual? failover mechanism when the coordinator is down for good). Note that we'd want to define batchlog with LocalStrategy (in system ks), we'd manually invoke WriteAnywhereStrategy from StorageProxy. Thinking about it, we probably wouldn't want an actual Strategy, just similar code, since we don't actually depend on the row key to pick replicas.
          Hide
          Jonathan Ellis added a comment -

          Rolling our own "replication strategy" in storage proxy also allows us to pick min(2, nodes in cluster) as our replication factor out of the box.

          Show
          Jonathan Ellis added a comment - Rolling our own "replication strategy" in storage proxy also allows us to pick min(2, nodes in cluster) as our replication factor out of the box.
          Jonathan Ellis made changes -
          Assignee Jonathan Ellis [ jbellis ] Aleksey Yeschenko [ iamaleksey ]
          Reviewer jbellis
          Hide
          Jonathan Ellis added a comment -

          Split implementation out into four subtasks.

          Show
          Jonathan Ellis added a comment - Split implementation out into four subtasks.
          Hide
          Jonathan Ellis added a comment -

          Resolving as complete since functional subtasks are done.

          Show
          Jonathan Ellis added a comment - Resolving as complete since functional subtasks are done.
          Jonathan Ellis made changes -
          Status In Progress [ 3 ] Resolved [ 5 ]
          Fix Version/s 1.2.0 beta 2 [ 12323284 ]
          Resolution Fixed [ 1 ]
          Gavin made changes -
          Workflow no-reopen-closed, patch-avail [ 12670259 ] patch-available, re-open possible [ 12753060 ]
          Gavin made changes -
          Workflow patch-available, re-open possible [ 12753060 ] reopen-resolved, no closed status, patch-avail, testing [ 12755725 ]
          Gavin made changes -
          Link This issue depends on CASSANDRA-4179 [ CASSANDRA-4179 ]
          Gavin made changes -
          Link This issue depends upon CASSANDRA-4179 [ CASSANDRA-4179 ]
          Transition Time In Source Status Execution Times Last Executer Last Execution Date
          Open Open In Progress In Progress
          39d 8h 4m 1 Jonathan Ellis 04/Jul/12 02:15
          In Progress In Progress Resolved Resolved
          106d 21h 44m 1 Jonathan Ellis 18/Oct/12 23:59

            People

            • Assignee:
              Aleksey Yeschenko
              Reporter:
              Jonathan Ellis
              Reviewer:
              Jonathan Ellis
            • Votes:
              4 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development