Cassandra
  1. Cassandra
  2. CASSANDRA-620

Add per-keyspace replication factor (possibly even replication strategy)

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Fix Version/s: 0.6
    • Component/s: None
    • Labels:
      None

      Description

      (but partitioner may only be cluster-wide, still)

      not 100% sure this makes sense but it would allow maintaining system metadata in a replicated-across-entire-cluster keyspace (without ugly special casing), as well as making Cassandra more flexible as a shared resource for multiple apps

      1. 0013-adjustments-for-new-clusterprobe-tool.patch
        3 kB
        Gary Dusbabek
      2. 0012-ensure-that-unbootstrap-calls-onFinish-after-tables-.patch
        5 kB
        Gary Dusbabek
      3. 0011-remove-replicas-from-ARS.patch
        59 kB
        Gary Dusbabek
      4. 0010-fix-problems-with-bootstrapping.patch
        6 kB
        Gary Dusbabek
      5. 0009-make-TMD-private-in-ARS.patch
        7 kB
        Gary Dusbabek
      6. 0008-push-endpoint-snitch-into-keyspace-configuration.patch
        30 kB
        Gary Dusbabek
      7. 0007-modify-TestRingCache-to-make-it-easier-to-test-speci.patch
        3 kB
        Gary Dusbabek
      8. 0006-added-additional-testing-keyspace.patch
        4 kB
        Gary Dusbabek
      9. 0005-introduce-table-into-pending-ranges-code.patch
        32 kB
        Gary Dusbabek
      10. 0004-fix-non-compiling-tests.-necessary-changes-in-test-c.patch
        12 kB
        Gary Dusbabek
      11. 0003-push-table-names-into-streaming-expose-TMD-in-ARS.patch
        29 kB
        Gary Dusbabek
      12. 0002-cleaned-up-as-much-as-possible-before-dealing-with-r.patch
        33 kB
        Gary Dusbabek
      13. 0001-push-replication-factor-and-strategy-into-table-exce.patch
        30 kB
        Gary Dusbabek
      14. v3-patches.tgz
        43 kB
        Gary Dusbabek
      15. v2-patches.tgz
        41 kB
        Gary Dusbabek
      16. v1-patches.tgz
        29 kB
        Gary Dusbabek

        Issue Links

          Activity

          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Cassandra #357 (See http://hudson.zones.apache.org/hudson/job/Cassandra/357/ )
          Hide
          Gary Dusbabek added a comment -

          r906521

          Show
          Gary Dusbabek added a comment - r906521
          Hide
          Gary Dusbabek added a comment -

          Thanks Jaakko--I realize this was a big set to review. I'll get it rebased and committed.

          Show
          Gary Dusbabek added a comment - Thanks Jaakko--I realize this was a big set to review. I'll get it rebased and committed.
          Hide
          Jaakko Laine added a comment -

          Yes, pending ranges is range->list<InetAddress> mapping. Replication strategy dictates what nodes there are in the list and in which order, whereas replication factor dictates the length of the list. If we have two tables with same replication factor, it would be sufficient to calculate pending ranges once for maximum replication factor for that table, since pending ranges for smaller factor will be a subset of the other. An example to illustrate this:

          Suppose we have TableA and TableB using same strategy, with replication factors 2 and 3 respectively. Suppose there are nodes NodeA, NodeB and NodeC in the cluster and NodeD boots "behind" NodeC. In this situation pending ranges for NodeD would be:

          TableA: B-C, C-D
          TableB: A-B, B-C, C-D

          Notice that pending ranges for TableA is a subset of pending ranges for TableB. Instead of having pending ranges per table, it would be enough to have them per replication strategy in use. We could then just pick replication factor number of nodes from the beginning of the list.

          Anyway, as said before, this is an optimization which can be done later if needed.

          The patchset loogs OK to me.

          Show
          Jaakko Laine added a comment - Yes, pending ranges is range->list<InetAddress> mapping. Replication strategy dictates what nodes there are in the list and in which order, whereas replication factor dictates the length of the list. If we have two tables with same replication factor, it would be sufficient to calculate pending ranges once for maximum replication factor for that table, since pending ranges for smaller factor will be a subset of the other. An example to illustrate this: Suppose we have TableA and TableB using same strategy, with replication factors 2 and 3 respectively. Suppose there are nodes NodeA, NodeB and NodeC in the cluster and NodeD boots "behind" NodeC. In this situation pending ranges for NodeD would be: TableA: B-C, C-D TableB: A-B, B-C, C-D Notice that pending ranges for TableA is a subset of pending ranges for TableB. Instead of having pending ranges per table, it would be enough to have them per replication strategy in use. We could then just pick replication factor number of nodes from the beginning of the list. Anyway, as said before, this is an optimization which can be done later if needed. The patchset loogs OK to me.
          Hide
          Gary Dusbabek added a comment -
          • latest patchset addresses the problems with unbootstrap.
          • I think pending ranges still need to be calculated on a table basis. A pending range is still a mapping of range->list<InetAddress>, right? That list will be different from table to table based on replication strategy and replication factor, even though the ranges themselves will be constant per replication strategy.
          Show
          Gary Dusbabek added a comment - latest patchset addresses the problems with unbootstrap. I think pending ranges still need to be calculated on a table basis. A pending range is still a mapping of range->list<InetAddress>, right? That list will be different from table to table based on replication strategy and replication factor, even though the ranges themselves will be constant per replication strategy.
          Hide
          Gary Dusbabek added a comment -

          Archiving v2 patches.

          Show
          Gary Dusbabek added a comment - Archiving v2 patches.
          Hide
          Jaakko Laine added a comment -

          a few initial comments:

          • Isn't onFinish called too soon? It should be called only after all tables have been transferred. I think the easiest would be to have "pending" in unbootstrap record ranges per table, and when it is completely empty, call onFinish.
          • returning immediately if rangesMM is empty is no longer valid as we're doing this on table basis. Due to different replication factor, some tables might need data transfer and some others not.
          • pending ranges are still calculated on table basis. This might be a lot of waste (if we have 10 tables with same strategy, pending ranges will be calculated 9 times unnecessarily every time a node moves). Of course we may go with this and optimize later.
          Show
          Jaakko Laine added a comment - a few initial comments: Isn't onFinish called too soon? It should be called only after all tables have been transferred. I think the easiest would be to have "pending" in unbootstrap record ranges per table, and when it is completely empty, call onFinish. returning immediately if rangesMM is empty is no longer valid as we're doing this on table basis. Due to different replication factor, some tables might need data transfer and some others not. pending ranges are still calculated on table basis. This might be a lot of waste (if we have 10 tables with same strategy, pending ranges will be calculated 9 times unnecessarily every time a node moves). Of course we may go with this and optimize later.
          Hide
          Gary Dusbabek added a comment -

          updated to include minor RingCache change.

          Show
          Gary Dusbabek added a comment - updated to include minor RingCache change.
          Hide
          Gary Dusbabek added a comment -

          Attached a newly rebased set of patches that address the points brought up by Jaakko.

          1) Then I think that replicas_ should come out of ARS and instead retrieve the replication factor on a per-table basis when needed. This shook out nicely in the code, except the unit tests.

          2) Made TMD.pendingRanges a ConcurrentMap.

          3) I went ahead and made ARS.tokenMetadata_ private and removed the getter.

          4) Addressed in #673.

          5) modified by chaining callbacks to possibly call a leaveRing callback.

          6) DatabaseDescriptor will complain if any KeySpace entity is missing EndPointSnitch or ReplicaPlacementStrategy. Likewise, StorageService will complain if a bogus table is passed into getReplicationStrategy. TokenMetadata is the other place this could happen. I believe the risk is mitigated through TMD.getPendingRangesMM(). Are there any specific places you're worried about? Many of the methods that now accept a 'table' parameter get that parameter from a call to DD.getNonSystemTables(). Those that don't and are called indirectly from a client should have had the keyspace parameter checked for validity during thrift validation. I think we're pretty safe here.

          Show
          Gary Dusbabek added a comment - Attached a newly rebased set of patches that address the points brought up by Jaakko. 1) Then I think that replicas_ should come out of ARS and instead retrieve the replication factor on a per-table basis when needed. This shook out nicely in the code, except the unit tests. 2) Made TMD.pendingRanges a ConcurrentMap. 3) I went ahead and made ARS.tokenMetadata_ private and removed the getter. 4) Addressed in #673. 5) modified by chaining callbacks to possibly call a leaveRing callback. 6) DatabaseDescriptor will complain if any KeySpace entity is missing EndPointSnitch or ReplicaPlacementStrategy. Likewise, StorageService will complain if a bogus table is passed into getReplicationStrategy. TokenMetadata is the other place this could happen. I believe the risk is mitigated through TMD.getPendingRangesMM(). Are there any specific places you're worried about? Many of the methods that now accept a 'table' parameter get that parameter from a call to DD.getNonSystemTables(). Those that don't and are called indirectly from a client should have had the keyspace parameter checked for validity during thrift validation. I think we're pretty safe here.
          Hide
          Gary Dusbabek added a comment -

          The original patches.

          Show
          Gary Dusbabek added a comment - The original patches.
          Hide
          Jaakko Laine added a comment -

          additional comments:

          (5) unbootsrap should call onFinish only after all tables have been transferred. (also debug message could be modified to reflect per table function)

          (6) In many places getSomething(...) is replaced by getSomething(table, ...). Is it guaranteed that the latter never returns null?

          one tiny thing more:
          nodePicker_ variable in RingCache is not used anymore

          Show
          Jaakko Laine added a comment - additional comments: (5) unbootsrap should call onFinish only after all tables have been transferred. (also debug message could be modified to reflect per table function) (6) In many places getSomething(...) is replaced by getSomething(table, ...). Is it guaranteed that the latter never returns null? one tiny thing more: nodePicker_ variable in RingCache is not used anymore
          Hide
          Jaakko Laine added a comment -

          (1) ARS.replica_ cannot be used for this purpose as it might be different for two instances of same replication strategy. For this purpose, a maximum for each type of replication strategy in use should be used (see my previous comment above). This would allow us to calculate pending ranges only once per replication strategy.

          (2) Assigning a new value on top of the other was not a problem. If somebody was still using the old pending ranges, let them do so. Gossip propagation and pending ranges calculation is not very accurate in terms of timing anyway, so if somebody uses the old version for a few microseconds more, that is OK. However, if we change the data structure when somebody else is using it, that is different issue, I think.

          (4) Yeah, basically anything that keeps track of what tables and ranges are needed from where should work. One thing to remember, though, is that stream sources might be different for every table, so it might be easier to just keep track on table basis what has been transferred, instead of calculating an inverse list of what ranges from what table are needed from each host. I think first option would just need small change to StorageService (just have addBootstrapSource and removeBootstrapSource have "table" as extra parameter and internally have hashmap). This would also take care of #673 (are you planning to do all my work?

          Show
          Jaakko Laine added a comment - (1) ARS.replica_ cannot be used for this purpose as it might be different for two instances of same replication strategy. For this purpose, a maximum for each type of replication strategy in use should be used (see my previous comment above). This would allow us to calculate pending ranges only once per replication strategy. (2) Assigning a new value on top of the other was not a problem. If somebody was still using the old pending ranges, let them do so. Gossip propagation and pending ranges calculation is not very accurate in terms of timing anyway, so if somebody uses the old version for a few microseconds more, that is OK. However, if we change the data structure when somebody else is using it, that is different issue, I think. (4) Yeah, basically anything that keeps track of what tables and ranges are needed from where should work. One thing to remember, though, is that stream sources might be different for every table, so it might be easier to just keep track on table basis what has been transferred, instead of calculating an inverse list of what ranges from what table are needed from each host. I think first option would just need small change to StorageService (just have addBootstrapSource and removeBootstrapSource have "table" as extra parameter and internally have hashmap). This would also take care of #673 (are you planning to do all my work?
          Hide
          Gary Dusbabek added a comment -

          Jaakko,

          1) As long as the 'replica_' member of ARS can safely be considered as a maximum, then there is no reason we can't limit the number of instances. The fact that it is used to populate in 'getNaturalEndpoints' made me unsure.

          2) No. This could be addressed by making 'TMD.pendingRanges' a ConcurrentMap (preferred) or by synchronizing SS.calculatePendingRanges(). I suppose this doesn't address the fact that the contents could change while whoever called getPendingRanges() is using the data, but that we had that problem before.

          3) StorageService owns it. I had a hard time following this at first too. Every reference to TMD can trace it's roots back to the one created in SS. TMD is so close to being a singleton. I can't remember why I changed calculatePendingRanges() to retrieve it from the ARS instead of grabbing the SS member though.

          4) What if I change this so that the message becomes: "send me bootstrap data for this list of tables" instead of: "for each table, send me bootstrap data."? Then, as soon as a remote node is finished, it can send an indication and the bootstrapping node can remove the remote node from it's bootstrap set. It seems like that would solve CASSANDRA-673 at the same time, correct?

          Thanks for the feedback!

          Show
          Gary Dusbabek added a comment - Jaakko, 1) As long as the 'replica_' member of ARS can safely be considered as a maximum, then there is no reason we can't limit the number of instances. The fact that it is used to populate in 'getNaturalEndpoints' made me unsure. 2) No. This could be addressed by making 'TMD.pendingRanges' a ConcurrentMap (preferred) or by synchronizing SS.calculatePendingRanges(). I suppose this doesn't address the fact that the contents could change while whoever called getPendingRanges() is using the data, but that we had that problem before. 3) StorageService owns it. I had a hard time following this at first too. Every reference to TMD can trace it's roots back to the one created in SS. TMD is so close to being a singleton. I can't remember why I changed calculatePendingRanges() to retrieve it from the ARS instead of grabbing the SS member though. 4) What if I change this so that the message becomes: "send me bootstrap data for this list of tables" instead of: "for each table, send me bootstrap data."? Then, as soon as a remote node is finished, it can send an indication and the bootstrapping node can remove the remote node from it's bootstrap set. It seems like that would solve CASSANDRA-673 at the same time, correct? Thanks for the feedback!
          Hide
          Jaakko Laine added a comment -

          Yeah, it is related, but in this case it happens even if none of the tables are empty.

          Show
          Jaakko Laine added a comment - Yeah, it is related, but in this case it happens even if none of the tables are empty.
          Hide
          Jonathan Ellis added a comment -

          BTW, there is a ticket for #4 (an existing issue before this change) – CASSANDRA-673

          Show
          Jonathan Ellis added a comment - BTW, there is a ticket for #4 (an existing issue before this change) – CASSANDRA-673
          Hide
          Jaakko Laine added a comment -

          Some initial comments/questions (I'll have another look at this tomorrow):

          (1) Do we need to have pending ranges per table? It should be enough to have them per replication strategy. Same applies to related methods in StorageService (restoreReplicaCount, calculatePendingRanges, etc)

          (2) Is setPendingRanges atomic? Previously the whole data structure was replaced in one assign, now it is modified while clients might have handles to it.

          (3) Who "owns" token metadata? StorageService handles metadata to strategy, but later (in calculatePendingRanges) gets it again from strategy. ARS.getTokenMetadata seems to be called only by StorageService.

          (4) Bootstrap sources should be recorded per table. If there are multiple tables that are streamed from the same source, it will be removed after the first one is complete. The node might start serving reads before it has completed bootstrap.

          Show
          Jaakko Laine added a comment - Some initial comments/questions (I'll have another look at this tomorrow): (1) Do we need to have pending ranges per table? It should be enough to have them per replication strategy. Same applies to related methods in StorageService (restoreReplicaCount, calculatePendingRanges, etc) (2) Is setPendingRanges atomic? Previously the whole data structure was replaced in one assign, now it is modified while clients might have handles to it. (3) Who "owns" token metadata? StorageService handles metadata to strategy, but later (in calculatePendingRanges) gets it again from strategy. ARS.getTokenMetadata seems to be called only by StorageService. (4) Bootstrap sources should be recorded per table. If there are multiple tables that are streamed from the same source, it will be removed after the first one is complete. The node might start serving reads before it has completed bootstrap.
          Hide
          Gary Dusbabek added a comment -

          Patches move replication strategy, replication factor and endpoint snitch in to keyspace configuration. endpoint snitch isn't strictly necessary, but makes some things easier.

          Show
          Gary Dusbabek added a comment - Patches move replication strategy, replication factor and endpoint snitch in to keyspace configuration. endpoint snitch isn't strictly necessary, but makes some things easier.
          Hide
          Jaakko Laine added a comment -

          I think one tokenMetadata is enough. When replication strategies construct endpoint lists, they just get token/endpoint information from token metadata and then pick the nodes they want from that list. I think there is no need to duplicate this information.

          The problem that needs to be addressed is pending ranges, but I think we can do that relatively easily inside "one" token metadata: In order to address multiple replication strategies and replication factors, we'll need to be able to support any combination of those two. The easiest way to address this would probably be to always calculate pending ranges for all replication strategies in use, and for maximum replica count for each replication strategy.

          That is, if we have following replication strategies in use:

          Table1: RackAware, factor 3
          Table2: RackUnaware, factor 2
          Table3: RackAware, factor 2

          We would calculate pending ranges for RackAware using factor 3 and for RackUnaware using factor 2. This would prepare tokenmetadata to serve any pending range query possible and would need only as many separate lists as there are strategies in use. Pending ranges would be stored in ordered list, so when replication strategy (getWriteEndPoints) is considering Table3, it would only look for first two ranges in the list for RackAware (naturally for Table1 full list would be considered)

          Show
          Jaakko Laine added a comment - I think one tokenMetadata is enough. When replication strategies construct endpoint lists, they just get token/endpoint information from token metadata and then pick the nodes they want from that list. I think there is no need to duplicate this information. The problem that needs to be addressed is pending ranges, but I think we can do that relatively easily inside "one" token metadata: In order to address multiple replication strategies and replication factors, we'll need to be able to support any combination of those two. The easiest way to address this would probably be to always calculate pending ranges for all replication strategies in use, and for maximum replica count for each replication strategy. That is, if we have following replication strategies in use: Table1: RackAware, factor 3 Table2: RackUnaware, factor 2 Table3: RackAware, factor 2 We would calculate pending ranges for RackAware using factor 3 and for RackUnaware using factor 2. This would prepare tokenmetadata to serve any pending range query possible and would need only as many separate lists as there are strategies in use. Pending ranges would be stored in ordered list, so when replication strategy (getWriteEndPoints) is considering Table3, it would only look for first two ranges in the list for RackAware (naturally for Table1 full list would be considered)
          Hide
          Jonathan Ellis added a comment -

          > The replicationStrategy and tokenMetadata members in SS will go away. Each table is going to need its own replication strategy, which implies a TokenMetadata for each.

          We still want one token per node, which means singleton TokenMetadata should still be fine. The pendingranges business will probably need to grow a Table parameter, but the core is mapping Token to IP which won't need to change. (All the higher-level stuff is in ARS already.)

          > Bootstrapping will need to happen per table instead of all tables at once since the range of each node will vary according to the table replication factor.

          Right. That is, we still want it "at once" in the sense that the node doesn't join the ring until it's ready to serve reads for all its ranges, but the range determination will be per-table.

          Show
          Jonathan Ellis added a comment - > The replicationStrategy and tokenMetadata members in SS will go away. Each table is going to need its own replication strategy, which implies a TokenMetadata for each. We still want one token per node, which means singleton TokenMetadata should still be fine. The pendingranges business will probably need to grow a Table parameter, but the core is mapping Token to IP which won't need to change. (All the higher-level stuff is in ARS already.) > Bootstrapping will need to happen per table instead of all tables at once since the range of each node will vary according to the table replication factor. Right. That is, we still want it "at once" in the sense that the node doesn't join the ring until it's ready to serve reads for all its ranges, but the range determination will be per-table.
          Hide
          Gary Dusbabek added a comment -

          I've spent some time looking at this. The biggest obstacle I've seen so far is that introducing RF per keyspace means that you can no longer deduce a range from a token (you'll need token + keyspace).

          Correct me if I'm wrong, but here is what looks like needs to be changed:

          The replicationStrategy and tokenMetadata members in SS will go away. Each table is going to need its own replication strategy, which implies a TokenMetadata for each. TokenMetadata has been treated as a singleton up until now, so I'll need to figure out how to approach that.

          Bootstrapping will need to happen per table instead of all tables at once since the range of each node will vary according to the table replication factor.

          After that, I think I'll have smooth sailing. Is there anything obvious and big I might be missing?

          Show
          Gary Dusbabek added a comment - I've spent some time looking at this. The biggest obstacle I've seen so far is that introducing RF per keyspace means that you can no longer deduce a range from a token (you'll need token + keyspace). Correct me if I'm wrong, but here is what looks like needs to be changed: The replicationStrategy and tokenMetadata members in SS will go away. Each table is going to need its own replication strategy, which implies a TokenMetadata for each. TokenMetadata has been treated as a singleton up until now, so I'll need to figure out how to approach that. Bootstrapping will need to happen per table instead of all tables at once since the range of each node will vary according to the table replication factor. After that, I think I'll have smooth sailing. Is there anything obvious and big I might be missing?
          Hide
          Gary Dusbabek added a comment -

          My thoughts here are to push the replication factor down into the replication strategy (where it exists in the code already), and then push the replication strategy down into the keyspace.

          I don't think we can easily do the replication factor without bringing in table-specific replication strategies as well, simply for the reason that the AbstractReplicationStrategy member of the StorageService singleton no longer makes sense.

          Show
          Gary Dusbabek added a comment - My thoughts here are to push the replication factor down into the replication strategy (where it exists in the code already), and then push the replication strategy down into the keyspace. I don't think we can easily do the replication factor without bringing in table-specific replication strategies as well, simply for the reason that the AbstractReplicationStrategy member of the StorageService singleton no longer makes sense.

            People

            • Assignee:
              Gary Dusbabek
              Reporter:
              Jonathan Ellis
            • Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development