Details

      Description

      In JBOD, when someone gets a bad drive, the bad drive is replaced with a new empty one and repair is run.
      This can cause deleted data to come back in some cases. Also this is true for corrupt stables in which we delete the corrupt stable and run repair.
      Here is an example:
      Say we have 3 nodes A,B and C and RF=3 and GC grace=10days.
      row=sankalp col=sankalp is written 20 days back and successfully went to all three nodes.
      Then a delete/tombstone was written successfully for the same row column 15 days back.
      Since this tombstone is more than gc grace, it got compacted in Nodes A and B since it got compacted with the actual data. So there is no trace of this row column in node A and B.
      Now in node C, say the original data is in drive1 and tombstone is in drive2. Compaction has not yet reclaimed the data and tombstone.
      Drive2 becomes corrupt and was replaced with new empty drive.
      Due to the replacement, the tombstone in now gone and row=sankalp col=sankalp has come back to life.
      Now after replacing the drive we run repair. This data will be propagated to all nodes.

      Note: This is still a problem even if we run repair every gc grace.

        Issue Links

          Activity

          Hide
          kohlisankalp sankalp kohli added a comment -

          With this, the whole disk_failure_policy stuff is broken. If you blacklist a drive, you can potentially bring data back to life.

          One of the fixes of this is one of my JIRA which I fixed long back.
          CASSANDRA-4784
          If we divide each drive with ranges, then we are sure that the data along with the tombstone will get blacklisted.
          Example: Say a node is handling range 1-10 and 11-20. We can have drive A handle 1-10 and drive B handle 11-20.
          Thought this might have problems with load balancing.

          Show
          kohlisankalp sankalp kohli added a comment - With this, the whole disk_failure_policy stuff is broken. If you blacklist a drive, you can potentially bring data back to life. One of the fixes of this is one of my JIRA which I fixed long back. CASSANDRA-4784 If we divide each drive with ranges, then we are sure that the data along with the tombstone will get blacklisted. Example: Say a node is handling range 1-10 and 11-20. We can have drive A handle 1-10 and drive B handle 11-20. Thought this might have problems with load balancing.
          Hide
          benedict Benedict added a comment -

          One possibility here is that we could split bloom filter and metadata onto a separate disk to their data files, so that if/when a disk fails we have the option of scrubbing any records on the remaining disks that we think were present on the lost disk in a file with min_timestamp < gc_grace_seconds ago.

          Once we've done the scrub (in fact it could probably be "done" instantly by just setting up some filter for compaction + reads until we're fully repaired and have compacted the old data) we can start serving reads again, and can start a repair from the other nodes to receive data for all of the records we're now missing (either through the missing disk or that we're forcefully trashing).

          Show
          benedict Benedict added a comment - One possibility here is that we could split bloom filter and metadata onto a separate disk to their data files, so that if/when a disk fails we have the option of scrubbing any records on the remaining disks that we think were present on the lost disk in a file with min_timestamp < gc_grace_seconds ago. Once we've done the scrub (in fact it could probably be "done" instantly by just setting up some filter for compaction + reads until we're fully repaired and have compacted the old data) we can start serving reads again, and can start a repair from the other nodes to receive data for all of the records we're now missing (either through the missing disk or that we're forcefully trashing).
          Hide
          jbellis Jonathan Ellis added a comment -

          the whole disk_failure_policy stuff is broken

          I would say rather, disk_failure_policy works brilliantly so that if you're using tombstones you can set it to stop the server and rebuild it.

          If we divide each drive with ranges, then we are sure that the data along with the tombstone will get blacklisted.

          That will probably work well enough as long as vnode count >> disk count. Would have the added benefit of reducing fragmentation for STCS.

          Less than zero interest in trying to add sub-vnode "regions" though.

          One possibility here is that we could split bloom filter and metadata onto a separate disk to their data files

          Not really a fan; complicates moving data around significantly without generalizing well beyond a single disk failure. Even for single disk failures it bifurcates the recovery process: if you lose "data" then you scrub/repair; if you lose metadata you rebuild it from data.

          Show
          jbellis Jonathan Ellis added a comment - the whole disk_failure_policy stuff is broken I would say rather, disk_failure_policy works brilliantly so that if you're using tombstones you can set it to stop the server and rebuild it. If we divide each drive with ranges, then we are sure that the data along with the tombstone will get blacklisted. That will probably work well enough as long as vnode count >> disk count. Would have the added benefit of reducing fragmentation for STCS. Less than zero interest in trying to add sub-vnode "regions" though. One possibility here is that we could split bloom filter and metadata onto a separate disk to their data files Not really a fan; complicates moving data around significantly without generalizing well beyond a single disk failure. Even for single disk failures it bifurcates the recovery process: if you lose "data" then you scrub/repair; if you lose metadata you rebuild it from data.
          Hide
          jbellis Jonathan Ellis added a comment -

          (Classifying this an Improvement since while the behavior is not optimal in this scenario, it's working as designed.)

          Show
          jbellis Jonathan Ellis added a comment - (Classifying this an Improvement since while the behavior is not optimal in this scenario, it's working as designed.)
          Hide
          benedict Benedict added a comment - - edited

          if you lose "data" then you scrub/repair; if you lose metadata you rebuild it from data.

          You'd always have to do both with any single disk failure. But I agree it isn't optimal; but it is cost-free to maintain, so is just essentially an optimisation + automated process to downgrade the node in the event of failure without having to manually rebuild it.

          Simply redundantly writing out the metadata would change it to a more uniform process, and tolerant to more than one failure, but at increased cost; at which point you might as well redundantly write out tombstones - either as a bloom filter or an extra sstable. The latter could be complicated to maintain cheaply and safely though. For multiple disk failures I'd say, if you have configured the auto-downgrading to happen - it should just trash everything it has and (optionally) repair.

          Show
          benedict Benedict added a comment - - edited if you lose "data" then you scrub/repair; if you lose metadata you rebuild it from data. You'd always have to do both with any single disk failure. But I agree it isn't optimal; but it is cost-free to maintain, so is just essentially an optimisation + automated process to downgrade the node in the event of failure without having to manually rebuild it. Simply redundantly writing out the metadata would change it to a more uniform process, and tolerant to more than one failure, but at increased cost; at which point you might as well redundantly write out tombstones - either as a bloom filter or an extra sstable. The latter could be complicated to maintain cheaply and safely though. For multiple disk failures I'd say, if you have configured the auto-downgrading to happen - it should just trash everything it has and (optionally) repair.
          Hide
          krummas Marcus Eriksson added a comment -

          Been poking this, wip-patch pushed here: https://github.com/krummas/cassandra/commits/marcuse/6696

          it does the following;

          • Extract an interface out of SSTableWriter (imaginatively called SSTableWriterInterface), start using this interface everywhere
          • Create DiskAwareSSTableWriter which knows about disk layout and starts using it instead of standard SSTW
          • Ranges of tokens are assigned to the disks, this way we only need to check "is the key we are appending larger than the boundary token for the current disk? If so, create a new SSTableWriter for that disk
          • Breaks unit tests

          todo:

          • fix unit tests, general cleanups
          • I kind of want to name the interface SSTableWriter and call the old SSTW class something else, but i guess SSTW is the class that most external people depend on, so maybe not
          • Take disk size into consideration when splitting the ranges over disks, this needs to be deterministic though, so we have to use total disk size instead of free disk space.
          • Make other partitioners than M3P work
          • Fix keycache

          Rebalancing of data is simply running upgradesstables or scrub, if we loose a disk, we will take writes to the other disks

          Comments on this approach?

          Show
          krummas Marcus Eriksson added a comment - Been poking this, wip-patch pushed here: https://github.com/krummas/cassandra/commits/marcuse/6696 it does the following; Extract an interface out of SSTableWriter (imaginatively called SSTableWriterInterface), start using this interface everywhere Create DiskAwareSSTableWriter which knows about disk layout and starts using it instead of standard SSTW Ranges of tokens are assigned to the disks, this way we only need to check "is the key we are appending larger than the boundary token for the current disk? If so, create a new SSTableWriter for that disk Breaks unit tests todo: fix unit tests, general cleanups I kind of want to name the interface SSTableWriter and call the old SSTW class something else, but i guess SSTW is the class that most external people depend on, so maybe not Take disk size into consideration when splitting the ranges over disks, this needs to be deterministic though, so we have to use total disk size instead of free disk space. Make other partitioners than M3P work Fix keycache Rebalancing of data is simply running upgradesstables or scrub, if we loose a disk, we will take writes to the other disks Comments on this approach?
          Hide
          jbellis Jonathan Ellis added a comment -

          Can we drop BOP/OPP in 3.0?

          Show
          jbellis Jonathan Ellis added a comment - Can we drop BOP/OPP in 3.0?
          Hide
          benedict Benedict added a comment -

          Had a quick glance, and have one initial thought: Might be worth forcing compaction to always work on one disk (i.e. always selects files from one disk for compaction). Would simplify it slightly, and it seems likely to be the most optimal use of IO, but also as it stands you could have a scenario where one file is selected each from a different disk, which would result in a perpetual compaction loop.

          Show
          benedict Benedict added a comment - Had a quick glance, and have one initial thought: Might be worth forcing compaction to always work on one disk (i.e. always selects files from one disk for compaction). Would simplify it slightly, and it seems likely to be the most optimal use of IO, but also as it stands you could have a scenario where one file is selected each from a different disk, which would result in a perpetual compaction loop.
          Hide
          benedict Benedict added a comment -

          It seems to me it might also be simpler, once this change is made, to just split the range of the memtable and call subMap(lb, ub) and spawn a separate flush writer for each range, which might avoid the need for an SSTableWriterInterface... Might also be a good time to introduce a separate flush executor for each disk.

          Show
          benedict Benedict added a comment - It seems to me it might also be simpler, once this change is made, to just split the range of the memtable and call subMap(lb, ub) and spawn a separate flush writer for each range, which might avoid the need for an SSTableWriterInterface... Might also be a good time to introduce a separate flush executor for each disk.
          Hide
          benedict Benedict added a comment -

          Last thoughts for the day: only major downside to this approach is that we are now guaranteeing no better than single disk performance for all operations on a given partition. So if there are particularly large and fragmented partitions, they could see read performance decline notably. One possible solution to this would be split by clustering part (if any), instead of partition key, but determine the clustering part range split as a function of the partition hash, so that the distribution of data as a whole is still random (i.e. each partition has a different clustering distribution across the disks). This would make the initial flush more complex, and might require more merging on reads, but compaction could still be easily constrained to one disk. This is just a poorly formed thought I'm throwing out there for consideration, and possibly outside of scope for this ticket.

          Either way, I'm not certain that splitting ranges based on disk size is such a great idea. As a follow on ticket it might be sensible to permit two category of disks: archive for slow and cold data, and live disks for faster data. Splitting by capacity seems likely to create undesirable performance characteristics, as two similarly performant disks with different capacities would lead to worse performance for the data residing on the larger disks.

          On the whole I'm +1 this change anyway, the more I think about it. I had been vaguely considering something along these lines to optimise flush performance, but it seems we can get this for free along with improving correctness, which is great.

          Show
          benedict Benedict added a comment - Last thoughts for the day: only major downside to this approach is that we are now guaranteeing no better than single disk performance for all operations on a given partition. So if there are particularly large and fragmented partitions, they could see read performance decline notably. One possible solution to this would be split by clustering part (if any), instead of partition key, but determine the clustering part range split as a function of the partition hash, so that the distribution of data as a whole is still random (i.e. each partition has a different clustering distribution across the disks). This would make the initial flush more complex, and might require more merging on reads, but compaction could still be easily constrained to one disk. This is just a poorly formed thought I'm throwing out there for consideration, and possibly outside of scope for this ticket. Either way, I'm not certain that splitting ranges based on disk size is such a great idea. As a follow on ticket it might be sensible to permit two category of disks: archive for slow and cold data, and live disks for faster data. Splitting by capacity seems likely to create undesirable performance characteristics, as two similarly performant disks with different capacities would lead to worse performance for the data residing on the larger disks. On the whole I'm +1 this change anyway, the more I think about it. I had been vaguely considering something along these lines to optimise flush performance, but it seems we can get this for free along with improving correctness, which is great.
          Hide
          jbellis Jonathan Ellis added a comment -

          So if there are particularly large and fragmented partitions, they could see read performance decline notably

          Let's state for the record that solving this problem is a non-goal.

          (A single query might see higher latency, but overall we will do better even with large and fragmented partitions since the fragmentation and merging required will be less.)

          Show
          jbellis Jonathan Ellis added a comment - So if there are particularly large and fragmented partitions, they could see read performance decline notably Let's state for the record that solving this problem is a non-goal. (A single query might see higher latency, but overall we will do better even with large and fragmented partitions since the fragmentation and merging required will be less.)
          Hide
          benedict Benedict added a comment -

          Let's state for the record that solving this problem is a non-goal.

          Fair enough. Just throwing it out there

          overall we will do better even with large and fragmented partitions since the fragmentation and merging required will be less.

          Possibly. It would depend on data distribution and size of ranges. If you had large-ish dense ranges per-disk, this shouldn't be a problem. But I think either way it's a major complication so at the very least not worth doing now, and since the distribution and split size are not tunable, maybe not ever.

          Show
          benedict Benedict added a comment - Let's state for the record that solving this problem is a non-goal. Fair enough. Just throwing it out there overall we will do better even with large and fragmented partitions since the fragmentation and merging required will be less. Possibly. It would depend on data distribution and size of ranges. If you had large-ish dense ranges per-disk, this shouldn't be a problem. But I think either way it's a major complication so at the very least not worth doing now, and since the distribution and split size are not tunable, maybe not ever.
          Hide
          krummas Marcus Eriksson added a comment -

          Can we drop BOP/OPP in 3.0?

          Hmm, that would be nice. A big PITA would be to rewrite all the unit tests that depend on order, created CASSANDRA-6922

          Show
          krummas Marcus Eriksson added a comment - Can we drop BOP/OPP in 3.0? Hmm, that would be nice. A big PITA would be to rewrite all the unit tests that depend on order, created CASSANDRA-6922
          Hide
          krummas Marcus Eriksson added a comment -

          It seems to me it might also be simpler, once this change is made, to just split the range of the memtable and call subMap(lb, ub) and spawn a separate flush writer for each range, which might avoid the need for an SSTableWriterInterface

          hmm yeah, might be better to not have the SSTWI and handle that outside to get more flexibility, I'll try to do that

          Show
          krummas Marcus Eriksson added a comment - It seems to me it might also be simpler, once this change is made, to just split the range of the memtable and call subMap(lb, ub) and spawn a separate flush writer for each range, which might avoid the need for an SSTableWriterInterface hmm yeah, might be better to not have the SSTWI and handle that outside to get more flexibility, I'll try to do that
          Hide
          krummas Marcus Eriksson added a comment -

          pushed a new version to https://github.com/krummas/cassandra/commits/marcuse/6696-2

          • removed SSTWInterface, instead created a helper class that is reused in most places
          • multithreaded flush, one thread per disk
          • support multiple flush dirs
          • sort compaction/flush dirs lexicographically to make sure we always put the same tokens on the same disks (even if you rearrange dirs in config etc)
          • avoids compaction loops by making sure we never start STCS compactions with any sstables that don't intersect (which the sstables on different disks wont)
          • RandomP and Murmur3P supported, the rest will dump data on the first disk for now

          TODO:

          • ask user@ for remove-OPP/BOP feedback, otherwise make them work with JBOD, in the old way
          Show
          krummas Marcus Eriksson added a comment - pushed a new version to https://github.com/krummas/cassandra/commits/marcuse/6696-2 removed SSTWInterface, instead created a helper class that is reused in most places multithreaded flush, one thread per disk support multiple flush dirs sort compaction/flush dirs lexicographically to make sure we always put the same tokens on the same disks (even if you rearrange dirs in config etc) avoids compaction loops by making sure we never start STCS compactions with any sstables that don't intersect (which the sstables on different disks wont) RandomP and Murmur3P supported, the rest will dump data on the first disk for now TODO: ask user@ for remove-OPP/BOP feedback, otherwise make them work with JBOD, in the old way
          Hide
          jbellis Jonathan Ellis added a comment -

          Can you add javadoc for splitranges? Why is it partitioner-dependent?

          Show
          jbellis Jonathan Ellis added a comment - Can you add javadoc for splitranges? Why is it partitioner-dependent?
          Hide
          krummas Marcus Eriksson added a comment -

          Why is it partitioner-dependent?

          if we own all tokens we need to know min and max tokens for the partitioner to be able to split them over the disks.

          javadoc added for splitRanges in the repo above

          Show
          krummas Marcus Eriksson added a comment - Why is it partitioner-dependent? if we own all tokens we need to know min and max tokens for the partitioner to be able to split them over the disks. javadoc added for splitRanges in the repo above
          Hide
          benedict Benedict added a comment -

          Just a suggestion (not 100% certain it is better, but it seems cleaner to me):

          Once this feature is activated by the user, it might be easier to have an upgrade period during which sstables are migrated using DiskAwareWriter, but after which we know that the constraints hold. This would allow us to mostly leave the code unchanged in a few places (e.g. scrubber, compactiontask) which are already (prior to this ticket) a little on the complex side. It also seems like it would be easier to reason about behaviour in the future if we know these constraints are safely imposed, whereas using DiskAwareWriter leaves you with the impression we're never quite sure if the files obey our constraints or not.

          Really it's not a major issue, but worth considering.

          One other minor thing (more certain about this one though): perDiskExecutor should be an array of executors, one per disk; any configurable parallelism then should affect the number of threads each executor is given. Otherwise could get uneven distribution of work to the disks (especially as we add tasks in disk order, so if multiple tasks get queued at once, we'll get clumping of tasks by disk, reducing throughput on some disks through over-utilisation, and under-utilising the others.)

          Show
          benedict Benedict added a comment - Just a suggestion (not 100% certain it is better, but it seems cleaner to me): Once this feature is activated by the user, it might be easier to have an upgrade period during which sstables are migrated using DiskAwareWriter, but after which we know that the constraints hold. This would allow us to mostly leave the code unchanged in a few places (e.g. scrubber, compactiontask) which are already (prior to this ticket) a little on the complex side. It also seems like it would be easier to reason about behaviour in the future if we know these constraints are safely imposed, whereas using DiskAwareWriter leaves you with the impression we're never quite sure if the files obey our constraints or not. Really it's not a major issue, but worth considering. One other minor thing (more certain about this one though): perDiskExecutor should be an array of executors, one per disk; any configurable parallelism then should affect the number of threads each executor is given. Otherwise could get uneven distribution of work to the disks (especially as we add tasks in disk order, so if multiple tasks get queued at once, we'll get clumping of tasks by disk, reducing throughput on some disks through over-utilisation, and under-utilising the others.)
          Hide
          benedict Benedict added a comment -

          A further suggestion: whilst we know vnodes don't currently distribute perfectly, this would be much simpler and more robust if we said that each disk simply gets assigned 1/#disks contiguous portion of the total (global) token range. This way, once we migrate to the new layout we never have to worry about it again. As things stand, any addition or removal of a single node, or change in RF, triggers a need to rewrite the entire cluster. Whilst this does ensure even distribution acriss the disks, this seems like we leave some major holes in the protection we're offering, and filling them may be error prone (and certainly costly).

          So, my suggestion is that we permit this feature only for vnodes. We can, at the same time, perhaps visit the question of more deterministically allocating vnode ranges so that the cluster is evenly distributed.

          sankalp kohli, what do you think?

          Show
          benedict Benedict added a comment - A further suggestion: whilst we know vnodes don't currently distribute perfectly, this would be much simpler and more robust if we said that each disk simply gets assigned 1/#disks contiguous portion of the total (global) token range. This way, once we migrate to the new layout we never have to worry about it again . As things stand, any addition or removal of a single node, or change in RF, triggers a need to rewrite the entire cluster . Whilst this does ensure even distribution acriss the disks, this seems like we leave some major holes in the protection we're offering, and filling them may be error prone (and certainly costly). So, my suggestion is that we permit this feature only for vnodes. We can, at the same time, perhaps visit the question of more deterministically allocating vnode ranges so that the cluster is evenly distributed. sankalp kohli , what do you think?
          Hide
          krummas Marcus Eriksson added a comment -

          Benedict do you mean having a background job move data around after upgrade? Or hanging on startup and rewriting everything?

          Current version would end up with data on the correct disks eventually with compactions, but I agree it would be nice to be able to just care about the disks when flushing and streaming. Manually copying sstables into the datadirs and calling 'nodetool refresh' would also need some care.

          Show
          krummas Marcus Eriksson added a comment - Benedict do you mean having a background job move data around after upgrade? Or hanging on startup and rewriting everything? Current version would end up with data on the correct disks eventually with compactions, but I agree it would be nice to be able to just care about the disks when flushing and streaming. Manually copying sstables into the datadirs and calling 'nodetool refresh' would also need some care.
          Hide
          krummas Marcus Eriksson added a comment -

          btw, being able to not care about locations while compacting means we can't really keep having a separate flush directory, since the data flushed to a directory will stay there forever, wdyt, is it worth keeping flush directories and DiskAwareWriter everywhere or should we drop support for separate flush dir? With flushing being spread out on all disks, the advantages of having a separate flush dir are not as big.

          Show
          krummas Marcus Eriksson added a comment - btw, being able to not care about locations while compacting means we can't really keep having a separate flush directory, since the data flushed to a directory will stay there forever, wdyt, is it worth keeping flush directories and DiskAwareWriter everywhere or should we drop support for separate flush dir? With flushing being spread out on all disks, the advantages of having a separate flush dir are not as big.
          Hide
          benedict Benedict added a comment -

          +1 on dropping separate flush dir. This is a better solution IMO - get full parallelism of the disks available.

          do you mean having a background job move data around after upgrade

          Yes, I think this would be preferable. Blocking at startup would make a rolling upgrade much too painful. If we mark all old sstables as compacting at startup, we can safely rewrite them in the background, and not worry about them violating our assumptions/constraints, since they're not eligible for regular compaction.

          Show
          benedict Benedict added a comment - +1 on dropping separate flush dir. This is a better solution IMO - get full parallelism of the disks available. do you mean having a background job move data around after upgrade Yes, I think this would be preferable. Blocking at startup would make a rolling upgrade much too painful. If we mark all old sstables as compacting at startup, we can safely rewrite them in the background, and not worry about them violating our assumptions/constraints, since they're not eligible for regular compaction.
          Hide
          jeromatron Jeremy Hanna added a comment -

          Do you mean dropping support for a separate flush directory for JBOD configurations or generally? Wouldn't it still have significant performance benefits in non-JBOD environments?

          Show
          jeromatron Jeremy Hanna added a comment - Do you mean dropping support for a separate flush directory for JBOD configurations or generally? Wouldn't it still have significant performance benefits in non-JBOD environments?
          Hide
          krummas Marcus Eriksson added a comment -

          In those cases I think it would be better for the user to just create a JBOD configuration over those drives

          Show
          krummas Marcus Eriksson added a comment - In those cases I think it would be better for the user to just create a JBOD configuration over those drives
          Hide
          krummas Marcus Eriksson added a comment -

          pushed a new version to https://github.com/krummas/cassandra/commits/marcuse/6696-3 which;

          • adds nodetool command to rebalance data over disks so that user can do this whenever they want (like after manually adding sstables to the data directories)
          • removes diskawarewriter from everything but streams and the rebalancing command
          • makes the flush executor an array of executors.
          • splits ranges based on total partitioner range and makes this feature vnodes-only
          • supports the old way of doing things for non-vnodes setup (and ordered partitioners)

          there are still some of my config-changes left in as i bet there will be more comments on this

          Show
          krummas Marcus Eriksson added a comment - pushed a new version to https://github.com/krummas/cassandra/commits/marcuse/6696-3 which; adds nodetool command to rebalance data over disks so that user can do this whenever they want (like after manually adding sstables to the data directories) removes diskawarewriter from everything but streams and the rebalancing command makes the flush executor an array of executors. splits ranges based on total partitioner range and makes this feature vnodes-only supports the old way of doing things for non-vnodes setup (and ordered partitioners) there are still some of my config-changes left in as i bet there will be more comments on this
          Hide
          jbellis Jonathan Ellis added a comment -

          If I have 256 vnodes and 8 disks, will a flush write 256 sstables or 8?

          Show
          jbellis Jonathan Ellis added a comment - If I have 256 vnodes and 8 disks, will a flush write 256 sstables or 8?
          Hide
          krummas Marcus Eriksson added a comment -

          8

          Show
          krummas Marcus Eriksson added a comment - 8
          Hide
          jbellis Jonathan Ellis added a comment -

          would it simplify things to make it per-vnode?

          thinking we'd also get more compaction benefit that way... at the expense of doing more random-ish io on flush, but (1) this will be mitigated by larger off-heap memtables in 2.1 and (2) we could tune compaction vs io by adjusting number of vnodes, instead of being stuck w/ the disk count.

          /cc Tupshin Harper

          Show
          jbellis Jonathan Ellis added a comment - would it simplify things to make it per-vnode? thinking we'd also get more compaction benefit that way... at the expense of doing more random-ish io on flush, but (1) this will be mitigated by larger off-heap memtables in 2.1 and (2) we could tune compaction vs io by adjusting number of vnodes, instead of being stuck w/ the disk count. /cc Tupshin Harper
          Hide
          krummas Marcus Eriksson added a comment -

          I don't think it would simplify things much (this is quite simple already), but doing per-vnode sstables could enable some nice benefits, like turning off the exact vnodes that are affected by a disk failure or a mini auto-repair on corrupt sstables perhaps?

          The drawback I see is that we would end up with very many sstables, making it a real pita to do backups etc.

          Show
          krummas Marcus Eriksson added a comment - I don't think it would simplify things much (this is quite simple already), but doing per-vnode sstables could enable some nice benefits, like turning off the exact vnodes that are affected by a disk failure or a mini auto-repair on corrupt sstables perhaps? The drawback I see is that we would end up with very many sstables, making it a real pita to do backups etc.
          Hide
          tupshin Tupshin Harper added a comment -

          +1. To the extent that we can do sstables per vnode without introducing other performance costs, I am hugely in favor of it. With good OS tuning, I'm not scared of too many sstables. If it is a pain for backup, or other things, you could have an offline sstable consolidator script that would take a batch of sstables and stream them out as a single sstable to a remote location.

          Show
          tupshin Tupshin Harper added a comment - +1. To the extent that we can do sstables per vnode without introducing other performance costs, I am hugely in favor of it. With good OS tuning, I'm not scared of too many sstables. If it is a pain for backup, or other things, you could have an offline sstable consolidator script that would take a batch of sstables and stream them out as a single sstable to a remote location.
          Hide
          benedict Benedict added a comment -

          The problem here is packing vnodes fairly across the disks: either we need to ensure that all vnodes are of roughly equal size (very difficult), or we probably need to have a dynamic allocation strategy, and the problem with that is that when the token range gets redistributed by node additions/removals, the whole cluster suddenly needs to start kicking off rebalancing of their local disks.

          We could support splitting the token range into M distinct chunks, where M is preferably some multiple of the number of disks, and split the total token range into M chunks, then allocate each chunk to a disk in round-robin fashion. This then remains deterministic, and it is I think easier to guarantee an even distribution within a given token range than it is to guarantee all vnodes are of equal size, whilst still supporting a dynamic cluster size. Even here, though, realistically I think we need the number of chunks to be quite a bit smaller than the number of vnodes to guarantee anything approaching balance of these chunks.

          Show
          benedict Benedict added a comment - The problem here is packing vnodes fairly across the disks: either we need to ensure that all vnodes are of roughly equal size (very difficult), or we probably need to have a dynamic allocation strategy, and the problem with that is that when the token range gets redistributed by node additions/removals, the whole cluster suddenly needs to start kicking off rebalancing of their local disks. We could support splitting the token range into M distinct chunks, where M is preferably some multiple of the number of disks, and split the total token range into M chunks, then allocate each chunk to a disk in round-robin fashion. This then remains deterministic, and it is I think easier to guarantee an even distribution within a given token range than it is to guarantee all vnodes are of equal size, whilst still supporting a dynamic cluster size. Even here, though, realistically I think we need the number of chunks to be quite a bit smaller than the number of vnodes to guarantee anything approaching balance of these chunks.
          Hide
          jbellis Jonathan Ellis added a comment - - edited

          doing per-vnode sstables could enable some nice benefits, like turning off the exact vnodes that are affected by a disk failure or a mini auto-repair on corrupt sstables perhaps?

          CASSANDRA-4784 lists some other benefits, the strongest of which I think are

          1. on disk failure, we can invalidate the affected vnodes and repair them, rather than continuing to serve incomplete data or halting the entire node [similar to what you are saying here]
          2. we can deduplicate ranges for bulk load into another cluster (CASSANDRA-4756)

          /cc sankalp kohli

          Show
          jbellis Jonathan Ellis added a comment - - edited doing per-vnode sstables could enable some nice benefits, like turning off the exact vnodes that are affected by a disk failure or a mini auto-repair on corrupt sstables perhaps? CASSANDRA-4784 lists some other benefits, the strongest of which I think are on disk failure, we can invalidate the affected vnodes and repair them, rather than continuing to serve incomplete data or halting the entire node [similar to what you are saying here] we can deduplicate ranges for bulk load into another cluster ( CASSANDRA-4756 ) /cc sankalp kohli
          Hide
          jbellis Jonathan Ellis added a comment -

          either we need to ensure that all vnodes are of roughly equal size (very difficult), or we probably need to have a dynamic allocation strategy

          Why is the first option "very difficult"? BOP aside (and the consensus was, we can continue supporting that because its users are willing to live with its limitations), assuming that every vnode is of roughly equal side is a core part of consistent hashing.

          "M distinct chunks" gives you the worst of both worlds.

          Show
          jbellis Jonathan Ellis added a comment - either we need to ensure that all vnodes are of roughly equal size (very difficult), or we probably need to have a dynamic allocation strategy Why is the first option "very difficult"? BOP aside (and the consensus was, we can continue supporting that because its users are willing to live with its limitations), assuming that every vnode is of roughly equal side is a core part of consistent hashing. "M distinct chunks" gives you the worst of both worlds.
          Hide
          jbellis Jonathan Ellis added a comment -

          With good OS tuning, I'm not scared of too many sstables

          We can add subdirectory-per-vnode if necessary, but aren't modern FS capable of dealing with hundreds of thousands of files per directory?

          Show
          jbellis Jonathan Ellis added a comment - With good OS tuning, I'm not scared of too many sstables We can add subdirectory-per-vnode if necessary, but aren't modern FS capable of dealing with hundreds of thousands of files per directory?
          Hide
          benedict Benedict added a comment -

          assuming that every vnode is of roughly equal side is a core part of consistent hashing.

          Well the assumption is broken then. I can assure you vnodes are not of equal size, especially not with our current allocation strategy, and getting them to be of equal size is kind of tough. We may be able to improve that, though.

          I'm not sure how what I'm suggesting can't also provide most of these other benefits, however we can bring the two approaches closer by simply saying all vnodes starting within the first 1/DISK portion of the token range are allocated to the first disk, and so on - and then they're pretty similar. But the unequal size of vnodes means any compaction "tuning" will have limited impact, and probably induce more random IO

          Show
          benedict Benedict added a comment - assuming that every vnode is of roughly equal side is a core part of consistent hashing. Well the assumption is broken then. I can assure you vnodes are not of equal size, especially not with our current allocation strategy, and getting them to be of equal size is kind of tough. We may be able to improve that, though. I'm not sure how what I'm suggesting can't also provide most of these other benefits, however we can bring the two approaches closer by simply saying all vnodes starting within the first 1/DISK portion of the token range are allocated to the first disk, and so on - and then they're pretty similar. But the unequal size of vnodes means any compaction "tuning" will have limited impact, and probably induce more random IO
          Hide
          tupshin Tupshin Harper added a comment -

          We can add subdirectory-per-vnode if necessary, but aren't modern FS capable of dealing with hundreds of thousands of files per directory?

          Exactly my thinking.

          Show
          tupshin Tupshin Harper added a comment - We can add subdirectory-per-vnode if necessary, but aren't modern FS capable of dealing with hundreds of thousands of files per directory? Exactly my thinking.
          Hide
          tupshin Tupshin Harper added a comment -

          Well the assumption is broken then.

          Yes, very true, and I've been thinking for a while now that, while we don't need a strategy to keep all vnodes the exact same size, we would benefit from a background process that gradually splits and combines the largest and smallest outliers to have vnodes tend to converge on the same size.

          Show
          tupshin Tupshin Harper added a comment - Well the assumption is broken then. Yes, very true, and I've been thinking for a while now that, while we don't need a strategy to keep all vnodes the exact same size, we would benefit from a background process that gradually splits and combines the largest and smallest outliers to have vnodes tend to converge on the same size.
          Hide
          jbellis Jonathan Ellis added a comment -

          To clarify: vnodes are not equal in size but they are proportional to token distance, again with the exception of BOP. So we can easily do a knapsack problem across the local disks on first startup.

          Show
          jbellis Jonathan Ellis added a comment - To clarify: vnodes are not equal in size but they are proportional to token distance, again with the exception of BOP. So we can easily do a knapsack problem across the local disks on first startup.
          Hide
          tupshin Tupshin Harper added a comment -

          Agreed. Seems quite sufficient for this problem.

          Show
          tupshin Tupshin Harper added a comment - Agreed. Seems quite sufficient for this problem.
          Hide
          benedict Benedict added a comment -

          Adding or removing a node becomes an operation proportional in size to the number of nodes in the cluster is my issue with that

          Show
          benedict Benedict added a comment - Adding or removing a node becomes an operation proportional in size to the number of nodes in the cluster is my issue with that
          Hide
          jbellis Jonathan Ellis added a comment -

          How's that?

          Show
          jbellis Jonathan Ellis added a comment - How's that?
          Hide
          benedict Benedict added a comment -

          You have to rerun your knapsack algorithm after each change of cluster token allocation to avoid getting very skewed distribution across the disks. What I'm suggesting is allocating vnodes in a way that is designed to deterministically allow groupings that never need rebalancing

          Show
          benedict Benedict added a comment - You have to rerun your knapsack algorithm after each change of cluster token allocation to avoid getting very skewed distribution across the disks. What I'm suggesting is allocating vnodes in a way that is designed to deterministically allow groupings that never need rebalancing
          Hide
          tupshin Tupshin Harper added a comment -

          or we probably need to have a dynamic allocation strategy, and the problem with that is that when the token range gets redistributed by node additions/removals, the whole cluster suddenly needs to start kicking off rebalancing of their local disks.

          A node addition will add 256 vnodes to the ring. Unless I misunderstand, this will be DC-local resizing of vnodes, and that if the cluster is huge, there will still only be 256 (times RF?) different resize operations that have to take place in that DC. So there is a finite cap on the amount of work needed to be performed per node addition (and presumably removals), and that cap is actually bounded by vnodes per node, and not by cluster size.
          If true, then Jonathan's solution feels good enough, since the upper bound is reasonably constrained. Not saying I wouldn't prefer doing less overall work, though.

          Show
          tupshin Tupshin Harper added a comment - or we probably need to have a dynamic allocation strategy, and the problem with that is that when the token range gets redistributed by node additions/removals, the whole cluster suddenly needs to start kicking off rebalancing of their local disks. A node addition will add 256 vnodes to the ring. Unless I misunderstand, this will be DC-local resizing of vnodes, and that if the cluster is huge, there will still only be 256 (times RF?) different resize operations that have to take place in that DC. So there is a finite cap on the amount of work needed to be performed per node addition (and presumably removals), and that cap is actually bounded by vnodes per node, and not by cluster size. If true, then Jonathan's solution feels good enough, since the upper bound is reasonably constrained. Not saying I wouldn't prefer doing less overall work, though.
          Hide
          jbellis Jonathan Ellis added a comment - - edited

          What I'm suggesting is allocating vnodes in a way that is designed to deterministically allow groupings that never need rebalancing

          But I don't think yours accomplishes that either. No matter how you allocate token ranges across disk, if a new node "steals" from a range that intersects disk X but not disk Y, you're going to end up with more imbalance post-bootstrap than you had before.

          Show
          jbellis Jonathan Ellis added a comment - - edited What I'm suggesting is allocating vnodes in a way that is designed to deterministically allow groupings that never need rebalancing But I don't think yours accomplishes that either. No matter how you allocate token ranges across disk, if a new node "steals" from a range that intersects disk X but not disk Y, you're going to end up with more imbalance post-bootstrap than you had before.
          Hide
          benedict Benedict added a comment -

          if a new node "steals" from a range that intersects disk X but not disk Y, you're going to end up with more imbalance post-bootstrap than you had before.

          Sure, it will steal an amount, but if the allocation of new vnodes ensures that any stealing happens equally distributed across the cluster then while any single node will cause an imbalance, the total imbalance of the cluster is kept bounded throughout an arbitrary number of node additions. So that you never get perfection, but you're never far from it either. The basic idea is that while you cannot easily guarantee the size of any single vnode, you can guarantee that if you collect any N adjacent vnodes together that their total owned range is within some proportion of the ideal. As N grows the proximity to perfect increases.

          there is a finite cap on the amount of work needed to be performed per node addition

          Sure, but that's a reasonably large cap - for all clusters with fewer than 256 nodes my statement holds true

          Show
          benedict Benedict added a comment - if a new node "steals" from a range that intersects disk X but not disk Y, you're going to end up with more imbalance post-bootstrap than you had before. Sure, it will steal an amount, but if the allocation of new vnodes ensures that any stealing happens equally distributed across the cluster then while any single node will cause an imbalance, the total imbalance of the cluster is kept bounded throughout an arbitrary number of node additions. So that you never get perfection, but you're never far from it either. The basic idea is that while you cannot easily guarantee the size of any single vnode, you can guarantee that if you collect any N adjacent vnodes together that their total owned range is within some proportion of the ideal. As N grows the proximity to perfect increases. there is a finite cap on the amount of work needed to be performed per node addition Sure, but that's a reasonably large cap - for all clusters with fewer than 256 nodes my statement holds true
          Hide
          tupshin Tupshin Harper added a comment -

          True. I merely mean to say that the problem doesn't get horrible at extreme scale. You could also optimize for rapid additions by deferring rebalancing until all nodes are added (nodetool disablebalancing, nodetool enablebalancing), or some such. Still not arguing for it, though.

          Show
          tupshin Tupshin Harper added a comment - True. I merely mean to say that the problem doesn't get horrible at extreme scale. You could also optimize for rapid additions by deferring rebalancing until all nodes are added (nodetool disablebalancing, nodetool enablebalancing), or some such. Still not arguing for it, though.
          Hide
          jbellis Jonathan Ellis added a comment -

          I think if you draw it out on paper you'll see that "assign vnodes via knapsack, then steal some token ranges" and "divide into M token ranges, then steal some" work out to about the same imbalance post-bootstrap. Am I misunderstanding what you are proposing?

          Show
          jbellis Jonathan Ellis added a comment - I think if you draw it out on paper you'll see that "assign vnodes via knapsack, then steal some token ranges" and "divide into M token ranges, then steal some" work out to about the same imbalance post-bootstrap. Am I misunderstanding what you are proposing?
          Hide
          benedict Benedict added a comment - - edited

          I may be misunderstanding your proposal: I assume you mean assign the vnodes to each disk via knapsack? In which case your balance per disk is based solely on the knapsack. If the cluster wide vnode allocation is designed specifically to ensure that any given range will maintain the property I gave (i.e. that any adjacent N will be within some proportion of the ideal ownership proportion) then the balance is based on that and will continue to be true no matter how many nodes are added to the cluster, whereas you will have to re-knapsack each time the ownership range changes.

          Show
          benedict Benedict added a comment - - edited I may be misunderstanding your proposal: I assume you mean assign the vnodes to each disk via knapsack? In which case your balance per disk is based solely on the knapsack. If the cluster wide vnode allocation is designed specifically to ensure that any given range will maintain the property I gave (i.e. that any adjacent N will be within some proportion of the ideal ownership proportion) then the balance is based on that and will continue to be true no matter how many nodes are added to the cluster, whereas you will have to re-knapsack each time the ownership range changes.
          Hide
          jbellis Jonathan Ellis added a comment -

          Why do we care about adjacent N? When new nodes join they will choose random tokens.

          Show
          jbellis Jonathan Ellis added a comment - Why do we care about adjacent N? When new nodes join they will choose random tokens.
          Hide
          benedict Benedict added a comment -

          Adjacent N is another way of saying all vnodes assigned to a given real node but within a contiguous range of the total token range.

          I only care about it because I think it is tractable to create a vnode allocation algorithm that fits this bill (I already got a naive approach working almost well enough when I hacked around for a couple of hours, I'm sure a much more optimal algorithm is within our grasp if we put a bit of thought into it)

          Show
          benedict Benedict added a comment - Adjacent N is another way of saying all vnodes assigned to a given real node but within a contiguous range of the total token range. I only care about it because I think it is tractable to create a vnode allocation algorithm that fits this bill (I already got a naive approach working almost well enough when I hacked around for a couple of hours, I'm sure a much more optimal algorithm is within our grasp if we put a bit of thought into it)
          Hide
          jbellis Jonathan Ellis added a comment -

          Okay, but I think that's clearly a different ticket. In the meantime, sstable-per-vnode has a lot of advantages.

          Show
          jbellis Jonathan Ellis added a comment - Okay, but I think that's clearly a different ticket. In the meantime, sstable-per-vnode has a lot of advantages.
          Hide
          benedict Benedict added a comment -

          Okay, but I think that's clearly a different ticket. In the meantime, sstable-per-vnode has a lot of advantages.

          Agreed, it's CASSANDRA-7032

          But I guess what I'm saying is let's hold off knapsacking and rebalancing, as that's a lot of added complexity to this ticket, and we can probably fix it more easily with CASSANDRA-7032.

          Show
          benedict Benedict added a comment - Okay, but I think that's clearly a different ticket. In the meantime, sstable-per-vnode has a lot of advantages. Agreed, it's CASSANDRA-7032 But I guess what I'm saying is let's hold off knapsacking and rebalancing, as that's a lot of added complexity to this ticket, and we can probably fix it more easily with CASSANDRA-7032 .
          Hide
          krummas Marcus Eriksson added a comment -

          OK, working on flushing to per-vnode sstables now

          Splitting the total range in #disks parts, then iterating over the ranges and flushing all vnodes with start token within the disk boundaries to that disk.

          Then we need to figure out how to handle LCS, perhaps one leveled manifest per vnode is the way to go...

          Show
          krummas Marcus Eriksson added a comment - OK, working on flushing to per-vnode sstables now Splitting the total range in #disks parts, then iterating over the ranges and flushing all vnodes with start token within the disk boundaries to that disk. Then we need to figure out how to handle LCS, perhaps one leveled manifest per vnode is the way to go...
          Hide
          krummas Marcus Eriksson added a comment -

          Pushed a semi-working sstable-per vnode version here: https://github.com/krummas/cassandra/commits/marcuse/6696-3 (by no means review-ready)

          • flushes to vnode-separate sstables, spread out over the disks available
          • keeps the sstables separate during compaction, for STCS by grouping the compactionbuckets by overlapping sstables, and with LCS by keeping a separate manifest for every vnode.

          Still quite broken, but i think good enough to evaluate if we want to go this way, drawback is mainly that it takes a looong time to flush to 768 sstables instead of one (768 = num_tokens=256 and rf = 3). Doing 768 parallel compactions is also quite heavy.

          Unless anyone has a brilliant idea how to make flushing and compaction less heavy, I think we need some sort of balance here, maybe grouping the vnodes (8 or 16 vnodes per sstable perhaps?) so that we flush a more reasonable amout of sstables, or even just going with the per-disk approach?

          Show
          krummas Marcus Eriksson added a comment - Pushed a semi-working sstable-per vnode version here: https://github.com/krummas/cassandra/commits/marcuse/6696-3 (by no means review-ready) flushes to vnode-separate sstables, spread out over the disks available keeps the sstables separate during compaction, for STCS by grouping the compactionbuckets by overlapping sstables, and with LCS by keeping a separate manifest for every vnode. Still quite broken, but i think good enough to evaluate if we want to go this way, drawback is mainly that it takes a looong time to flush to 768 sstables instead of one (768 = num_tokens=256 and rf = 3). Doing 768 parallel compactions is also quite heavy. Unless anyone has a brilliant idea how to make flushing and compaction less heavy, I think we need some sort of balance here, maybe grouping the vnodes (8 or 16 vnodes per sstable perhaps?) so that we flush a more reasonable amout of sstables, or even just going with the per-disk approach?
          Hide
          benedict Benedict added a comment -

          I think we may be able to get a good half-way house by setting a minimum sstable size below which we aggregate vnodes into a single sstable, ensuring we always keep a whole vnode in one table (unless that vnode is larger than the maximum sstable size, in which case we split it, and it alone) - this should be cost free and tend rapidly towards separate sstables per vnode for all but the most recent data, which could simply ALL be copied over to any nodes we want to duplicate data to, as the overhead would be approximately constant regardless of the amount of data the node is managing. We could introduce a tool to split out a single token range from those files for users who wanted to avoid this fixed overhead cost.

          Show
          benedict Benedict added a comment - I think we may be able to get a good half-way house by setting a minimum sstable size below which we aggregate vnodes into a single sstable, ensuring we always keep a whole vnode in one table (unless that vnode is larger than the maximum sstable size, in which case we split it, and it alone) - this should be cost free and tend rapidly towards separate sstables per vnode for all but the most recent data, which could simply ALL be copied over to any nodes we want to duplicate data to, as the overhead would be approximately constant regardless of the amount of data the node is managing. We could introduce a tool to split out a single token range from those files for users who wanted to avoid this fixed overhead cost.
          Hide
          krummas Marcus Eriksson added a comment -

          this special cases compaction a bit though, we could have sstables that overlap with other sstables of similar size that we can't really compact together (which we probably shouldn't since they overlap too little (CASSANDRA-6474)).

          for LCS i guess we could align the vnode start/end to the sstables start/end. Ie, in level 1 (10 sstables) each sstable would contain ~100 vnodes, in level2 (100 sstables) ~10, and in level3 (1000 sstables) 1 vnode. Then we could flush sstables mapping to the sstables in level1 to only compact those together.

          Show
          krummas Marcus Eriksson added a comment - this special cases compaction a bit though, we could have sstables that overlap with other sstables of similar size that we can't really compact together (which we probably shouldn't since they overlap too little ( CASSANDRA-6474 )). for LCS i guess we could align the vnode start/end to the sstables start/end. Ie, in level 1 (10 sstables) each sstable would contain ~100 vnodes, in level2 (100 sstables) ~10, and in level3 (1000 sstables) 1 vnode. Then we could flush sstables mapping to the sstables in level1 to only compact those together.
          Hide
          benedict Benedict added a comment -

          Or (somewhat handwavy, just to give a basic outline of the idea): we could say each vnode has its own LCS hierarchy - this is optimal from a read perspective - and perhaps have L1 switch to 1 file in size by default (L2 being 10, etc), and then for our flush to L0 we write files equivalent in size to one L1 file, grouping however many vnodes fit in the flush, and then only merge with the individual L1s once the density of the relevant portion of L0 is > ~0.5 per vnode

          Show
          benedict Benedict added a comment - Or (somewhat handwavy, just to give a basic outline of the idea): we could say each vnode has its own LCS hierarchy - this is optimal from a read perspective - and perhaps have L1 switch to 1 file in size by default (L2 being 10, etc), and then for our flush to L0 we write files equivalent in size to one L1 file, grouping however many vnodes fit in the flush, and then only merge with the individual L1s once the density of the relevant portion of L0 is > ~0.5 per vnode
          Hide
          krummas Marcus Eriksson added a comment -

          flush to L0 we write files equivalent in size to one L1 file, grouping however many vnodes fit in the flush

          just checking that i get it: one sstable in L1 is one vnode, current default size is 160M, we would flush a 1.6G memtable into 10 L0 files?

          only merge with the individual L1s once the density of the relevant portion of L0 is > ~0.5 per vnode

          could you elaborate?

          Show
          krummas Marcus Eriksson added a comment - flush to L0 we write files equivalent in size to one L1 file, grouping however many vnodes fit in the flush just checking that i get it: one sstable in L1 is one vnode, current default size is 160M, we would flush a 1.6G memtable into 10 L0 files? only merge with the individual L1s once the density of the relevant portion of L0 is > ~0.5 per vnode could you elaborate?
          Hide
          benedict Benedict added a comment -

          only merge with the individual L1s once the density of the relevant portion of L0 is > ~0.5 per vnode

          I mean when the amount of data we would flush into the next level would on average be data equal to 50% of the size limit of the lower level. But that is too high (see below)

          current default size is 160M

          I was reading stale docs that set it at 5Mb. Somewhere inbetween seems sensible - 20Mb? That way we'd get 1.6Gb into 80 files; if we have 768 vnodes and we set the ratio for flushing down into the lower level at 0.1 we'd on average merge straight into L1, but in reality this would only happen for those vnodes with sufficient density, and those without would pause until sufficient density of data appeared.

          The only slight complication to this is what we do if there then become files containing enough data to get merged into one L1, but another portion is much too small to be efficient to merge down - in this case I'd suggest simply remerging out the data that would be inefficient to merge into L0, until it hits our merge threshold (or is >= in size to the data already present in L1, if L1 is not very full). Alternatively we could, for simplicity, simply always merge as soon as the average for any file exceeds our threshold, but I'm not convinced this is a great strategy.

          Show
          benedict Benedict added a comment - only merge with the individual L1s once the density of the relevant portion of L0 is > ~0.5 per vnode I mean when the amount of data we would flush into the next level would on average be data equal to 50% of the size limit of the lower level. But that is too high (see below) current default size is 160M I was reading stale docs that set it at 5Mb. Somewhere inbetween seems sensible - 20Mb? That way we'd get 1.6Gb into 80 files; if we have 768 vnodes and we set the ratio for flushing down into the lower level at 0.1 we'd on average merge straight into L1, but in reality this would only happen for those vnodes with sufficient density, and those without would pause until sufficient density of data appeared. The only slight complication to this is what we do if there then become files containing enough data to get merged into one L1, but another portion is much too small to be efficient to merge down - in this case I'd suggest simply remerging out the data that would be inefficient to merge into L0, until it hits our merge threshold (or is >= in size to the data already present in L1, if L1 is not very full). Alternatively we could, for simplicity, simply always merge as soon as the average for any file exceeds our threshold, but I'm not convinced this is a great strategy.
          Hide
          jbellis Jonathan Ellis added a comment -

          It looks like what HBase does (as of fairly recently is) is flush to a single file, then break it up into sub-regions/stripes/vnodes when compacting (multiple L0 files) with L1+.

          https://issues.apache.org/jira/secure/attachment/12576005/Stripe%20compactions.pdf

          https://issues.apache.org/jira/browse/HBASE-7667

          Show
          jbellis Jonathan Ellis added a comment - It looks like what HBase does (as of fairly recently is) is flush to a single file, then break it up into sub-regions/stripes/vnodes when compacting (multiple L0 files) with L1+. https://issues.apache.org/jira/secure/attachment/12576005/Stripe%20compactions.pdf https://issues.apache.org/jira/browse/HBASE-7667
          Hide
          benedict Benedict added a comment -

          I'd note that it appears what they're doing is really a different compaction strategy - the approach is not dissimilar to what I'm suggesting here for our L0 only, and it may be that we could/should implement it generally, but I think the two are slightly orthogonal tasks (since we're no doubt going to be keeping LCS around). It's worth noting that their stripes are not based on vnodes, but on the distribution of the data present, with merging/splitting as a given range gets too small/big.

          Show
          benedict Benedict added a comment - I'd note that it appears what they're doing is really a different compaction strategy - the approach is not dissimilar to what I'm suggesting here for our L0 only, and it may be that we could/should implement it generally, but I think the two are slightly orthogonal tasks (since we're no doubt going to be keeping LCS around). It's worth noting that their stripes are not based on vnodes, but on the distribution of the data present, with merging/splitting as a given range gets too small/big.
          Hide
          jbellis Jonathan Ellis added a comment -

          HBase doesn't have compaction strategies per se, but you can think of this as an extension of their STCS strategy. Still, I don't see any reason why it can't apply across the board for us.

          It's worth noting that their stripes are not based on vnodes, but on the distribution of the data present, with merging/splitting as a given range gets too small/big.

          Sort of. They have a special case where you can do "size based stripes" for workloads where you have mostly-increasing keys. (Remember that hbase uses an ordered partitioner.) That doesn't apply to us, but the range-based stripes are basically exactly the same as our vnodes here.

          Show
          jbellis Jonathan Ellis added a comment - HBase doesn't have compaction strategies per se, but you can think of this as an extension of their STCS strategy. Still, I don't see any reason why it can't apply across the board for us. It's worth noting that their stripes are not based on vnodes, but on the distribution of the data present, with merging/splitting as a given range gets too small/big. Sort of. They have a special case where you can do "size based stripes" for workloads where you have mostly-increasing keys. (Remember that hbase uses an ordered partitioner.) That doesn't apply to us, but the range-based stripes are basically exactly the same as our vnodes here.
          Hide
          tupshin Tupshin Harper added a comment -

          They are basically splittable and resizable vnodes if you were to use shuffled vnodes with a byte ordered partitioner. Which makes them have more in common with CQL partitions than with vnodes, from a "range of data" point of view. Except that the size of the ranges don't vary with the data model like they do with Cassandra.

          Show
          tupshin Tupshin Harper added a comment - They are basically splittable and resizable vnodes if you were to use shuffled vnodes with a byte ordered partitioner. Which makes them have more in common with CQL partitions than with vnodes, from a "range of data" point of view. Except that the size of the ranges don't vary with the data model like they do with Cassandra.
          Hide
          tupshin Tupshin Harper added a comment -

          Hbase actually has pluggable compaction strategies these days.

          Show
          tupshin Tupshin Harper added a comment - Hbase actually has pluggable compaction strategies these days.
          Hide
          benedict Benedict added a comment -

          That doesn't apply to us

          It might apply more with the proliferation of composite keys. I would like to see our compaction strategies make more use of this information eventually, and these are ordered.

          That doesn't apply to us, but the range-based stripes are basically exactly the same as our vnodes here.

          So you mean to apply it only as far as the vnode boundaries and then switch to STCS/LCS?

          Show
          benedict Benedict added a comment - That doesn't apply to us It might apply more with the proliferation of composite keys. I would like to see our compaction strategies make more use of this information eventually, and these are ordered. That doesn't apply to us, but the range-based stripes are basically exactly the same as our vnodes here. So you mean to apply it only as far as the vnode boundaries and then switch to STCS/LCS?
          Hide
          jbellis Jonathan Ellis added a comment -

          Right, I assumed we're going to be doing STCS/LCS w/in vnode boundaries. No?

          Show
          jbellis Jonathan Ellis added a comment - Right, I assumed we're going to be doing STCS/LCS w/in vnode boundaries. No?
          Hide
          benedict Benedict added a comment - - edited

          Right, I assumed we're going to be doing STCS/LCS w/in vnode boundaries. No?

          Yes, just checking I wasn't misunderstanding what you were saying. I think this sounds roughly in line with what I was suggesting in that case (i.e. +1 this, or some similar variant thereof)

          Show
          benedict Benedict added a comment - - edited Right, I assumed we're going to be doing STCS/LCS w/in vnode boundaries. No? Yes, just checking I wasn't misunderstanding what you were saying. I think this sounds roughly in line with what I was suggesting in that case (i.e. +1 this, or some similar variant thereof)
          Hide
          krummas Marcus Eriksson added a comment -

          summing up the discussion;

          • one "stripe" is one vnode
          • we flush to big files in L0, file per disk or perhaps group a bunch of vnodes together to increase the amount of parallel compactions we can do L0 -> L1

          for STCS:

          • we introduce L0 for STCS
          • when we end up with a given number of overlapping L0 files (4), we compact those together and create per-vnode L1 files.
          • major compaction: include all files in compaction, write #vnodes files

          for LCS:

          • We introduce a leveled manifest per vnode
          • L0 is "global"
          • when doing L0 -> L1 compactions, we end up with one file per involved vnode-stripe in L1, here we can gain a lot by not flushing too big L0 files.
          • we still do STCS within L0 if we get too much data here, making sure we only compact overlapping files

          anything i missed/misunderstood?

          Show
          krummas Marcus Eriksson added a comment - summing up the discussion; one "stripe" is one vnode we flush to big files in L0, file per disk or perhaps group a bunch of vnodes together to increase the amount of parallel compactions we can do L0 -> L1 for STCS: we introduce L0 for STCS when we end up with a given number of overlapping L0 files (4), we compact those together and create per-vnode L1 files. major compaction: include all files in compaction, write #vnodes files for LCS: We introduce a leveled manifest per vnode L0 is "global" when doing L0 -> L1 compactions, we end up with one file per involved vnode-stripe in L1, here we can gain a lot by not flushing too big L0 files. we still do STCS within L0 if we get too much data here, making sure we only compact overlapping files anything i missed/misunderstood?
          Hide
          jbellis Jonathan Ellis added a comment -

          major compaction: include all files in compaction, write #vnodes files

          Minor note: Doing it per-vnode would mean you don't have to wait for the entire dataset to finish before promoting some tmp to finished, and gets you "partial credit" if interrupted.

          here we can gain a lot by not flushing too big L0 files.

          I'm not sure I follow that point.

          Show
          jbellis Jonathan Ellis added a comment - major compaction: include all files in compaction, write #vnodes files Minor note: Doing it per-vnode would mean you don't have to wait for the entire dataset to finish before promoting some tmp to finished, and gets you "partial credit" if interrupted. here we can gain a lot by not flushing too big L0 files. I'm not sure I follow that point.
          Hide
          krummas Marcus Eriksson added a comment -

          we can gain a lot by not flushing too big L0 files.

          if we flush to one big file, we would have to involve all L1 sstables when compacting L0 -> L1, if we flush smaller files, we can do more compactions in parallel and we don't have to wait for all ongoing L1 -> L2 compactions to finish before starting L0 -> L1

          Show
          krummas Marcus Eriksson added a comment - we can gain a lot by not flushing too big L0 files. if we flush to one big file, we would have to involve all L1 sstables when compacting L0 -> L1, if we flush smaller files, we can do more compactions in parallel and we don't have to wait for all ongoing L1 -> L2 compactions to finish before starting L0 -> L1
          Hide
          tupshin Tupshin Harper added a comment -

          I may be misunderstanding, but this seems to be optimizing for compaction throughput/parallelization, but at the expense of doing more total compaction activity (number of compactions per mutation over the life of that mutation, a form of write-amplification) by starting with smaller tables.

          If that's not the case, then please ignore, but it is important to note that for the largest scale, highest velocity, longest retained use cases, it's the number of recompactions/write amplification that really hurts.

          Show
          tupshin Tupshin Harper added a comment - I may be misunderstanding, but this seems to be optimizing for compaction throughput/parallelization, but at the expense of doing more total compaction activity (number of compactions per mutation over the life of that mutation, a form of write-amplification) by starting with smaller tables. If that's not the case, then please ignore, but it is important to note that for the largest scale, highest velocity, longest retained use cases, it's the number of recompactions/write amplification that really hurts.
          Hide
          krummas Marcus Eriksson added a comment -

          not really, note that we take the memtable, split it in X parts and write those parts to disk;

          for example, say we have 100 vnodes, meaning we have 100 L1 non-intersecting sstables, if we then flush one file that intersects with all those sstables, we would have to include all those files when we compact L0 -> L1. If we instead flush to 10 non-intersecting sstables in L0, we can do those L0 -> L1 compactions independently, but the mutations are recompacted as many times

          does that make sense?

          Show
          krummas Marcus Eriksson added a comment - not really, note that we take the memtable, split it in X parts and write those parts to disk; for example, say we have 100 vnodes, meaning we have 100 L1 non-intersecting sstables, if we then flush one file that intersects with all those sstables, we would have to include all those files when we compact L0 -> L1. If we instead flush to 10 non-intersecting sstables in L0, we can do those L0 -> L1 compactions independently, but the mutations are recompacted as many times does that make sense?
          Hide
          tupshin Tupshin Harper added a comment -

          It does, thanks.

          Show
          tupshin Tupshin Harper added a comment - It does, thanks.
          Hide
          jbellis Jonathan Ellis added a comment -

          So maybe, flush to one L0 file per disk?

          Show
          jbellis Jonathan Ellis added a comment - So maybe, flush to one L0 file per disk?
          Hide
          krummas Marcus Eriksson added a comment -

          flush to one L0 file per disk

          yep, will do that first, then tweak and benchmark if more files are better

          Show
          krummas Marcus Eriksson added a comment - flush to one L0 file per disk yep, will do that first, then tweak and benchmark if more files are better
          Hide
          krummas Marcus Eriksson added a comment - - edited

          Just pushed a version to https://github.com/krummas/cassandra/commits/marcuse/6696-4 - I'll spend some more time writing tests, but I figure it is ready for feedback now atleast.

          • Flush to one sstable per disk:
            • Split the total range in #disks parts
            • Flush whole vnodes, if a vnode starts on a disk, it stays there. Note though that if a vnode wraps around the tokenspace, it will be split in 2 parts and be on different disks.
          • SSTables flushed during startup will not get placed correctly since we don't yet know the local ranges.
          • LeveledCompaction needs to know what ranges we have, calling startup() on the CompactionStrategy has been moved out of the CFS constructor
          • LCS:
            • One manifest per vnode, with a global L0.
            • L1 is now aims to contain one sstable
            • Same prios as before, first STCS in L0, then compactions in L1+, and last L0 -> L1.
            • STCS in L0 will create big per-disk files, not per-vnode ones.
          • STCS:
            • We now have L0 and L1, L1 contains per-vnode sstables, but within the vnode-sstables we give no overlappiness-guarantees
            • Compactions in L0 only include L0 sstables, and L1 compactions only include L1 compactions, all compactions end up as per-vnode sstables in L1
            • When we get 4 sstables of similar size in L0, we will compact those, and create num_tokens L1 sstables.
            • When one L1 vnode gets 4 sstables of similar size, it will compact those together
            • L0 -> L1 compactions are prioritized over L1 -> L1 ones (though, these will run in parallel)
          • Introduces originalFirst to keep track of the original first key of the sstable, we need this when figuring out which manifest the sstable belongs to during replace(..).
          • If we get new ring version (i.e. we get a new token or lose one), we only reinitialize the LeveledManifestWrapper, this means that we might have sstables that start in one vnode, but does not end in it.
          • "nodetool rebalancedata" will iterate over all sstables and make sure they are in the correct places.
          • If a disk breaks/runs out of space we will flush/compact to the remaining disks
          Show
          krummas Marcus Eriksson added a comment - - edited Just pushed a version to https://github.com/krummas/cassandra/commits/marcuse/6696-4 - I'll spend some more time writing tests, but I figure it is ready for feedback now atleast. Flush to one sstable per disk: Split the total range in #disks parts Flush whole vnodes, if a vnode starts on a disk, it stays there. Note though that if a vnode wraps around the tokenspace, it will be split in 2 parts and be on different disks. SSTables flushed during startup will not get placed correctly since we don't yet know the local ranges. LeveledCompaction needs to know what ranges we have, calling startup() on the CompactionStrategy has been moved out of the CFS constructor LCS: One manifest per vnode, with a global L0. L1 is now aims to contain one sstable Same prios as before, first STCS in L0, then compactions in L1+, and last L0 -> L1. STCS in L0 will create big per-disk files, not per-vnode ones. STCS: We now have L0 and L1, L1 contains per-vnode sstables, but within the vnode-sstables we give no overlappiness-guarantees Compactions in L0 only include L0 sstables, and L1 compactions only include L1 compactions, all compactions end up as per-vnode sstables in L1 When we get 4 sstables of similar size in L0, we will compact those, and create num_tokens L1 sstables. When one L1 vnode gets 4 sstables of similar size, it will compact those together L0 -> L1 compactions are prioritized over L1 -> L1 ones (though, these will run in parallel) Introduces originalFirst to keep track of the original first key of the sstable, we need this when figuring out which manifest the sstable belongs to during replace(..). If we get new ring version (i.e. we get a new token or lose one), we only reinitialize the LeveledManifestWrapper, this means that we might have sstables that start in one vnode, but does not end in it. "nodetool rebalancedata" will iterate over all sstables and make sure they are in the correct places. If a disk breaks/runs out of space we will flush/compact to the remaining disks
          Hide
          jbellis Jonathan Ellis added a comment -

          Not serving stale data is good, but warning the other nodes when we blacklist a disk to read those vnodes' data from other replicas would be even better. New ticket for that?

          Show
          jbellis Jonathan Ellis added a comment - Not serving stale data is good, but warning the other nodes when we blacklist a disk to read those vnodes' data from other replicas would be even better. New ticket for that?
          Hide
          brandon.williams Brandon Williams added a comment -

          A simple approach would be for the node to increase its severity when it has blacklisted a disk.

          Show
          brandon.williams Brandon Williams added a comment - A simple approach would be for the node to increase its severity when it has blacklisted a disk.
          Hide
          jbellis Jonathan Ellis added a comment -

          Do we have per-vnode severity? We want to blacklist just the affected vnodes.

          Show
          jbellis Jonathan Ellis added a comment - Do we have per-vnode severity? We want to blacklist just the affected vnodes.
          Hide
          brandon.williams Brandon Williams added a comment -

          No, but I guess it wouldn't be too hard to add if we just advertised a list of affected vnodes only.

          Show
          brandon.williams Brandon Williams added a comment - No, but I guess it wouldn't be too hard to add if we just advertised a list of affected vnodes only.
          Hide
          benedict Benedict added a comment -

          Linking CASSANDRA-7551 to remember to roll it back when we eventually merge this, as the prior default memtable_cleanup_threshold of 0.4 should be approximately optimal again.

          Show
          benedict Benedict added a comment - Linking CASSANDRA-7551 to remember to roll it back when we eventually merge this, as the prior default memtable_cleanup_threshold of 0.4 should be approximately optimal again.
          Hide
          jbellis Jonathan Ellis added a comment -

          Yuki Morishita, can you pick up review here?

          Show
          jbellis Jonathan Ellis added a comment - Yuki Morishita , can you pick up review here?
          Hide
          krummas Marcus Eriksson added a comment -

          this is not really ready for review, i'm going to pick this up again once CASSANDRA-8004 has been committed (that rebase will be epic)

          but Yuki Morishita, please have a look if you have any feedback/input

          Show
          krummas Marcus Eriksson added a comment - this is not really ready for review, i'm going to pick this up again once CASSANDRA-8004 has been committed (that rebase will be epic) but Yuki Morishita , please have a look if you have any feedback/input
          Hide
          krummas Marcus Eriksson added a comment -

          A bit of a status update;

          This is now basically 3 parts;

          1. multi threaded flushing - one thread per disk, splits the owned token range evenly over the drives
          2. one compaction strategy instance per disk
          3. optional vnode aware compaction strategy that you can use if you are using vnodes:
            • keeps 2 levels of sstables, level 0 is newly flushed, bigger sstables, level 1 contains sstables per vnode
            • to avoid getting massive amounts of sstables in L1, we don't compact a vnode into L1 until we approximate that we can reach a configurable sstable size. During an L0 compaction (which contains data from all vnodes) we approximate if "the next" vnode has enough data for a L1 sstable, otherwise we keep the data for that vnode in L0 until the next compaction.
            • within each vnode we do size tiering

          todo:

          • rebalancing after ring changes and when disks break
          • we can flush before knowing what ranges we own (ie during commit log replay for example) - we might need to persist which tokens this node has (this includes local tokens and others we have due to replication)
          • improving compaction strategy heuristics
          Show
          krummas Marcus Eriksson added a comment - A bit of a status update; This is now basically 3 parts; multi threaded flushing - one thread per disk, splits the owned token range evenly over the drives one compaction strategy instance per disk optional vnode aware compaction strategy that you can use if you are using vnodes: keeps 2 levels of sstables, level 0 is newly flushed, bigger sstables, level 1 contains sstables per vnode to avoid getting massive amounts of sstables in L1, we don't compact a vnode into L1 until we approximate that we can reach a configurable sstable size. During an L0 compaction (which contains data from all vnodes) we approximate if "the next" vnode has enough data for a L1 sstable, otherwise we keep the data for that vnode in L0 until the next compaction. within each vnode we do size tiering todo: rebalancing after ring changes and when disks break we can flush before knowing what ranges we own (ie during commit log replay for example) - we might need to persist which tokens this node has (this includes local tokens and others we have due to replication) improving compaction strategy heuristics
          Hide
          nickmbailey Nick Bailey added a comment -

          So I just want to mention on here that the current approach here isn't going to help us much with CASSANDRA-4756.

          If you don't update your compaction strategy, sstables will contain data from many vnodes so things aren't much different than now. If you do use the new compaction strategy, things are slightly better in that levels 1 or higher are split per vnode and you could deduplicate that data, but level 0 won't be so you'll still be forced to overstream anything in level 0.

          We may want to revisit a new approach to CASSANDRA-4756, specifically one that isn't compaction strategy specific.

          Show
          nickmbailey Nick Bailey added a comment - So I just want to mention on here that the current approach here isn't going to help us much with CASSANDRA-4756 . If you don't update your compaction strategy, sstables will contain data from many vnodes so things aren't much different than now. If you do use the new compaction strategy, things are slightly better in that levels 1 or higher are split per vnode and you could deduplicate that data, but level 0 won't be so you'll still be forced to overstream anything in level 0. We may want to revisit a new approach to CASSANDRA-4756 , specifically one that isn't compaction strategy specific.
          Hide
          nickmbailey Nick Bailey added a comment -

          I'd also like to mention that we should consider what the best way to expose this new information to operators is. Specifically, what vnodes are assigned to what disk? What vnode is an sstable responsible for? It should be possible to get that information without running sstablemetadata against every sstable file.

          Show
          nickmbailey Nick Bailey added a comment - I'd also like to mention that we should consider what the best way to expose this new information to operators is. Specifically, what vnodes are assigned to what disk? What vnode is an sstable responsible for? It should be possible to get that information without running sstablemetadata against every sstable file.
          Hide
          jjordan Jeremiah Jordan added a comment -

          multi threaded flushing - one thread per disk, splits the owned token range evenly over the drives

          It might be nice to have this be configurable. This one flush per drive still results in L0 having sstables that overlap with almost all of L1 on a per drive basis. If you flush to X ranges per drive, then you can get some of the benefits of more parallel L0->L1 promotion even if you only have one drive.

          Show
          jjordan Jeremiah Jordan added a comment - multi threaded flushing - one thread per disk, splits the owned token range evenly over the drives It might be nice to have this be configurable. This one flush per drive still results in L0 having sstables that overlap with almost all of L1 on a per drive basis. If you flush to X ranges per drive, then you can get some of the benefits of more parallel L0->L1 promotion even if you only have one drive.
          Hide
          krummas Marcus Eriksson added a comment -

          It might be nice to have this be configurable. This one flush per drive still results in L0 having sstables that overlap with almost all of L1 on a per drive basis. If you flush to X ranges per drive, then you can get some of the benefits of more parallel L0->L1 promotion even if you only have one drive.

          It might, or you just set up multiple data directories for the same drive. We can improve this later, please create a ticket that depends on this.

          Show
          krummas Marcus Eriksson added a comment - It might be nice to have this be configurable. This one flush per drive still results in L0 having sstables that overlap with almost all of L1 on a per drive basis. If you flush to X ranges per drive, then you can get some of the benefits of more parallel L0->L1 promotion even if you only have one drive. It might, or you just set up multiple data directories for the same drive. We can improve this later, please create a ticket that depends on this.
          Hide
          krummas Marcus Eriksson added a comment -

          Specifically, what vnodes are assigned to what disk? What vnode is an sstable responsible for? It should be possible to get that information without running sstablemetadata against every sstable file.

          yes, we could in theory create sub directories per vnode for example, then we would get the sstables very easily. But, again, we can do this after we commit this, please create a new ticket that depends on this

          Show
          krummas Marcus Eriksson added a comment - Specifically, what vnodes are assigned to what disk? What vnode is an sstable responsible for? It should be possible to get that information without running sstablemetadata against every sstable file. yes, we could in theory create sub directories per vnode for example, then we would get the sstables very easily. But, again, we can do this after we commit this, please create a new ticket that depends on this
          Hide
          krummas Marcus Eriksson added a comment -

          Ryan McGuire Philip Thompson I think I'm going to need some test-infrastructure help here - I want to be able to run all dtests (and ccm) with multiple data directories, can you guys help out?

          Show
          krummas Marcus Eriksson added a comment - Ryan McGuire Philip Thompson I think I'm going to need some test-infrastructure help here - I want to be able to run all dtests (and ccm) with multiple data directories, can you guys help out?
          Hide
          philipthompson Philip Thompson added a comment -

          I think the easiest way would be to give you a patched ccm that defaults to multiple data directories

          Show
          philipthompson Philip Thompson added a comment - I think the easiest way would be to give you a patched ccm that defaults to multiple data directories
          Hide
          krummas Marcus Eriksson added a comment -

          Sure, I can do that myself, but it would be nice to have it in the future as well

          Show
          krummas Marcus Eriksson added a comment - Sure, I can do that myself, but it would be nice to have it in the future as well
          Hide
          philipthompson Philip Thompson added a comment -

          Well it can easily be done right now through the existing ccmlib API. The only issue is if isn't the default, we would need to set it on a test by test basis. Unless you think going forward it should ALWAYS be the default? I can do that easily as well.

          Show
          philipthompson Philip Thompson added a comment - Well it can easily be done right now through the existing ccmlib API. The only issue is if isn't the default, we would need to set it on a test by test basis. Unless you think going forward it should ALWAYS be the default? I can do that easily as well.
          Hide
          krummas Marcus Eriksson added a comment -

          We should perhaps randomize it on a per-dtest case in the future (or perhaps do that in some other test system), but for now, a branch with multiple directories as default will be ok

          Show
          krummas Marcus Eriksson added a comment - We should perhaps randomize it on a per-dtest case in the future (or perhaps do that in some other test system), but for now, a branch with multiple directories as default will be ok
          Hide
          yukim Yuki Morishita added a comment -

          Besides the code review going on Marcus' branch on github, I have one question.

          For non-vnode, current implementation splits local ranges from start to end evenly over disks. Looks like it assumes that local ranges are close each other.
          But isn't there a situation that node's local ranges are very sparse(maybe NTS with multiple DCs/Racks)?
          In that case, disks can be unbalanced.
          Should we calculate more precise ownership for ranges and assign evenly to disks?

          Show
          yukim Yuki Morishita added a comment - Besides the code review going on Marcus' branch on github, I have one question. For non-vnode, current implementation splits local ranges from start to end evenly over disks. Looks like it assumes that local ranges are close each other. But isn't there a situation that node's local ranges are very sparse(maybe NTS with multiple DCs/Racks)? In that case, disks can be unbalanced. Should we calculate more precise ownership for ranges and assign evenly to disks?
          Hide
          benedict Benedict added a comment -

          Since CASSANDRA-7032 will likely make it in ahead of this, we should consider making this feature require migrating each disk to being directly assigned a collection of vnodes. On migration we can do our best to evenly spread the existing vnodes onto each disk, and if the end result is too uneven for any given node, we can assign some new tokens to the disks until it all works out.

          Show
          benedict Benedict added a comment - Since CASSANDRA-7032 will likely make it in ahead of this, we should consider making this feature require migrating each disk to being directly assigned a collection of vnodes. On migration we can do our best to evenly spread the existing vnodes onto each disk, and if the end result is too uneven for any given node, we can assign some new tokens to the disks until it all works out.
          Hide
          krummas Marcus Eriksson added a comment -

          https://github.com/krummas/cassandra/commits/marcuse/6696-11

          I've broken out the compaction strategy part in a separate ticket to decrease the size of this review (and to get a clear separation of the two issues).

          dtest updates are here: https://github.com/krummas/cassandra-dtest/commits/marcuse/6696 and they require this ccm: https://github.com/krummas/ccm/commits/multi-data-dirs - for the tests to pass we also need CASSANDRA-10421

          Benedict could you have a look at the memtable flushing parts as I'm sure you will have improvement suggestions there?

          Yuki Morishita one thing that might be an issue is the fact that during streaming we now create several sstables - this might break the ProgressInfo as we key by filename. Do you have any suggestions on how to fix that?

          Ryan McGuire I think I need a bit of help with the testing here - I want to make sure nothing explodes with a big node (200+G) on 2.1/2.2/3.0 that upgrades to this with the various compaction strategies

          Show
          krummas Marcus Eriksson added a comment - https://github.com/krummas/cassandra/commits/marcuse/6696-11 I've broken out the compaction strategy part in a separate ticket to decrease the size of this review (and to get a clear separation of the two issues). dtest updates are here: https://github.com/krummas/cassandra-dtest/commits/marcuse/6696 and they require this ccm: https://github.com/krummas/ccm/commits/multi-data-dirs - for the tests to pass we also need CASSANDRA-10421 Benedict could you have a look at the memtable flushing parts as I'm sure you will have improvement suggestions there? Yuki Morishita one thing that might be an issue is the fact that during streaming we now create several sstables - this might break the ProgressInfo as we key by filename. Do you have any suggestions on how to fix that? Ryan McGuire I think I need a bit of help with the testing here - I want to make sure nothing explodes with a big node (200+G) on 2.1/2.2/3.0 that upgrades to this with the various compaction strategies
          Hide
          carlyeks Carl Yeksigian added a comment -

          Overall, this feature looks really good. The biggest concern I have is that currently we have a limit of a partition size by the disk size; with this, we will be limiting the token range containing the largest partition, which is a change we should be calling out in NEWS.

          • What purpose does perDiskFlushExecutor serve for us? In the past, the number of flush executors has determined how many concurrent executors we can run, while this seems like it would restrict the parallelism. Also, if we were using a different partitioner, we would only ever be running through a single flush executor.
          • If we have an imbalanced distribution of data, "rebalance disks" isn't going to do anything for us, so it seems misnamed; we are just formulaically reassigning the sstables based on token range size / # sstables.
          • In CompactionManager.rebalanceDisks, we will throw an assertion error instead of returning null if the partition doesn't define splitting behavior.
          • Todo in CompactionStrategyManager.groupSSTablesForAntiCompaction; doesn't look implemented, but we probably want to make sure that we keep each disk's group of sstables separate.
          • If we run a rebalance, it is possible that we'll move all of disk 1 to disk 2 before any of disk 2's sstables move somewhere else, causing out of space issues. Might be better to make actively mix up the order of disks from which we are pulling sstables.
          • In CompactionAwareWriter.getWriteDirectory, we aren't checking to make sure that we have enough disk space to run the compaction.
          • Should we provide a default implementation of getMaximumToken, since this will be introduced mid-3.0 cycle, and mark it for removal at 4.0?
          • Added methods in SSTableTxnWriter don't seem used; they should be removed, or SSTableTxnWriter should inherit from SSTableMultiWriter.
          • RangeAwareSSTableWriter should be called something else. For CASSANDRA-10540, we'll want to add a new one which splits the incoming stream by each token range, this one is splitting by disk assignments.
          • RangeAwareSSTableWriter seems to be violating the transactional contract (there is also a TODO in commit). precommit and commit do nothing for the finished writers.
          • We should make sure that we have a splitter defined for the partition. We should make sure that the splitter is defined, otherwise we'll be creating a leveling where we are sending a huge number of sstables back to level 0 for overlap.

          nits:

          • perDiskFlushExecutors should be initialized to an array of ExecutorService instead of JMXEnabledThreadPoolExecutor, formatting space
          • write sorted contents log message should include the range, as we'll be writing the same memtable many times
          • comment why we need maybeReload in CompactionStrategyManager.handleNotification
          • comment how getCompactionStrategyIndex handles going from randomly partitioned sstables to sorting into the correct strategy - took me a little bit to realize that it should only fall into the binary search case if there is already data saved
          • in CompactionAwareWriter, "panicLocation" needs a different name; maybe "defaultLocation" would be better since this is expected to happen on, for instance, system tables
          • getMaximumToken needs a javadoc
          • spurious line deleted in RandomPartitioner at L219
          Show
          carlyeks Carl Yeksigian added a comment - Overall, this feature looks really good. The biggest concern I have is that currently we have a limit of a partition size by the disk size; with this, we will be limiting the token range containing the largest partition, which is a change we should be calling out in NEWS. What purpose does perDiskFlushExecutor serve for us? In the past, the number of flush executors has determined how many concurrent executors we can run, while this seems like it would restrict the parallelism. Also, if we were using a different partitioner, we would only ever be running through a single flush executor. If we have an imbalanced distribution of data, "rebalance disks" isn't going to do anything for us, so it seems misnamed; we are just formulaically reassigning the sstables based on token range size / # sstables . In CompactionManager.rebalanceDisks, we will throw an assertion error instead of returning null if the partition doesn't define splitting behavior. Todo in CompactionStrategyManager.groupSSTablesForAntiCompaction; doesn't look implemented, but we probably want to make sure that we keep each disk's group of sstables separate. If we run a rebalance, it is possible that we'll move all of disk 1 to disk 2 before any of disk 2's sstables move somewhere else, causing out of space issues. Might be better to make actively mix up the order of disks from which we are pulling sstables. In CompactionAwareWriter.getWriteDirectory, we aren't checking to make sure that we have enough disk space to run the compaction. Should we provide a default implementation of getMaximumToken, since this will be introduced mid-3.0 cycle, and mark it for removal at 4.0? Added methods in SSTableTxnWriter don't seem used; they should be removed, or SSTableTxnWriter should inherit from SSTableMultiWriter. RangeAwareSSTableWriter should be called something else. For CASSANDRA-10540 , we'll want to add a new one which splits the incoming stream by each token range, this one is splitting by disk assignments. RangeAwareSSTableWriter seems to be violating the transactional contract (there is also a TODO in commit). precommit and commit do nothing for the finished writers. We should make sure that we have a splitter defined for the partition. We should make sure that the splitter is defined, otherwise we'll be creating a leveling where we are sending a huge number of sstables back to level 0 for overlap. nits: perDiskFlushExecutors should be initialized to an array of ExecutorService instead of JMXEnabledThreadPoolExecutor, formatting space write sorted contents log message should include the range, as we'll be writing the same memtable many times comment why we need maybeReload in CompactionStrategyManager.handleNotification comment how getCompactionStrategyIndex handles going from randomly partitioned sstables to sorting into the correct strategy - took me a little bit to realize that it should only fall into the binary search case if there is already data saved in CompactionAwareWriter, "panicLocation" needs a different name; maybe "defaultLocation" would be better since this is expected to happen on, for instance, system tables getMaximumToken needs a javadoc spurious line deleted in RandomPartitioner at L219
          Hide
          brianmhess Brian Hess added a comment -

          Will this change do anything for the case where there is only one disk? Will each sstable contain a single token range (or at least not all token ranges)?

          Show
          brianmhess Brian Hess added a comment - Will this change do anything for the case where there is only one disk? Will each sstable contain a single token range (or at least not all token ranges)?
          Hide
          krummas Marcus Eriksson added a comment -

          Brian Hess no, it will be the same if you only have a single disk, CASSANDRA-10540 will split sstables based on local tokens

          Show
          krummas Marcus Eriksson added a comment - Brian Hess no, it will be the same if you only have a single disk, CASSANDRA-10540 will split sstables based on local tokens
          Hide
          carlyeks Carl Yeksigian added a comment -

          Yuki Morishita: I've looked at this whole changeset, but it would be great to make sure that the streaming portion is correct; could you review that part?

          Show
          carlyeks Carl Yeksigian added a comment - Yuki Morishita : I've looked at this whole changeset, but it would be great to make sure that the streaming portion is correct; could you review that part?
          Hide
          krummas Marcus Eriksson added a comment -

          What purpose does perDiskFlushExecutor serve for us? In the past, the number of flush executors has determined how many concurrent executors we can run, while this seems like it would restrict the parallelism.

          Idea is that we have one thread per disk writing, but I guess the thread count should be DatabaseDescriptor.getFlushWriters() per disk and the flushExecutor thread count should be 1 - we want to quickly hand off to the single flushExecutor when flushing and then run the per disk writing in the perDiskFlushExecutor. Do you have any other suggestion on how to model this?

          RangeAwareSSTableWriter should be called something else. For CASSANDRA-10540, we'll want to add a new one which splits the incoming stream by each token range, this one is splitting by disk assignments.

          We wont split on incoming streams based on token range in CASSANDRA-10540 - remote node will most likely already have sstables split based on its local ranges and those should match any ranges we own, so we can simply write it to disk, then the new sstable will get added to the correct compaction strategy (if it fits, otherwise it does a round in "L0")

          We should make sure that we have a splitter defined for the partition. We should make sure that the splitter is defined, otherwise we'll be creating a leveling where we are sending a huge number of sstables back to level 0 for overlap.

          What do you mean? Splitter for a partition?

          Show
          krummas Marcus Eriksson added a comment - What purpose does perDiskFlushExecutor serve for us? In the past, the number of flush executors has determined how many concurrent executors we can run, while this seems like it would restrict the parallelism. Idea is that we have one thread per disk writing, but I guess the thread count should be DatabaseDescriptor.getFlushWriters() per disk and the flushExecutor thread count should be 1 - we want to quickly hand off to the single flushExecutor when flushing and then run the per disk writing in the perDiskFlushExecutor. Do you have any other suggestion on how to model this? RangeAwareSSTableWriter should be called something else. For CASSANDRA-10540 , we'll want to add a new one which splits the incoming stream by each token range, this one is splitting by disk assignments. We wont split on incoming streams based on token range in CASSANDRA-10540 - remote node will most likely already have sstables split based on its local ranges and those should match any ranges we own, so we can simply write it to disk, then the new sstable will get added to the correct compaction strategy (if it fits, otherwise it does a round in "L0") We should make sure that we have a splitter defined for the partition. We should make sure that the splitter is defined, otherwise we'll be creating a leveling where we are sending a huge number of sstables back to level 0 for overlap. What do you mean? Splitter for a partition?
          Hide
          krummas Marcus Eriksson added a comment -

          fixed the rest of the comments and pushed to https://github.com/krummas/cassandra/commits/marcuse/6696-11

          renamed rebalance do redistribute, but if anyone has a better suggestion for this, let me know

          Show
          krummas Marcus Eriksson added a comment - fixed the rest of the comments and pushed to https://github.com/krummas/cassandra/commits/marcuse/6696-11 renamed rebalance do redistribute, but if anyone has a better suggestion for this, let me know
          Hide
          carlyeks Carl Yeksigian added a comment -

          Forgot to add myself as a watcher, so I didn't see the comments.

          Idea is that we have one thread per disk writing, but I guess the thread count should be DatabaseDescriptor.getFlushWriters() per disk and the flushExecutor thread count should be 1 - we want to quickly hand off to the single flushExecutor when flushing and then run the per disk writing in the perDiskFlushExecutor. Do you have any other suggestion on how to model this?

          I think the change to have flushWriters per disk makes sense, but we should set the default to 1 instead of # of disks; we should also update the comment in the cassandra.yaml.

          We wont split on incoming streams based on token range in CASSANDRA-10540 - remote node will most likely already have sstables split based on its local ranges and those should match any ranges we own, so we can simply write it to disk, then the new sstable will get added to the correct compaction strategy (if it fits, otherwise it does a round in "L0")

          Makes sense to me.

          What do you mean? Splitter for a partition?

          Reading that took me awhile to figure out what I was trying to say; it was about sstableofflinerelevel. Looking over it again, I see that it is handling the different ranges correctly, so we can ignore that.

          Other than the slight changes around flush writers, the rest of it looks good to me.

          Show
          carlyeks Carl Yeksigian added a comment - Forgot to add myself as a watcher, so I didn't see the comments. Idea is that we have one thread per disk writing, but I guess the thread count should be DatabaseDescriptor.getFlushWriters() per disk and the flushExecutor thread count should be 1 - we want to quickly hand off to the single flushExecutor when flushing and then run the per disk writing in the perDiskFlushExecutor. Do you have any other suggestion on how to model this? I think the change to have flushWriters per disk makes sense, but we should set the default to 1 instead of # of disks; we should also update the comment in the cassandra.yaml. We wont split on incoming streams based on token range in CASSANDRA-10540 - remote node will most likely already have sstables split based on its local ranges and those should match any ranges we own, so we can simply write it to disk, then the new sstable will get added to the correct compaction strategy (if it fits, otherwise it does a round in "L0") Makes sense to me. What do you mean? Splitter for a partition? Reading that took me awhile to figure out what I was trying to say; it was about sstableofflinerelevel. Looking over it again, I see that it is handling the different ranges correctly, so we can ignore that. Other than the slight changes around flush writers, the rest of it looks good to me.
          Hide
          krummas Marcus Eriksson added a comment -

          pushed the flush writers fix and when re-running the tests i noticed a problem with ScrubTest and fixed that. Also fixed dtest multi data directory failure I had missed.

          Test runs:
          http://cassci.datastax.com/view/Dev/view/krummas/job/krummas-marcuse-6696-11-testall
          http://cassci.datastax.com/view/Dev/view/krummas/job/6696_dtest

          Show
          krummas Marcus Eriksson added a comment - pushed the flush writers fix and when re-running the tests i noticed a problem with ScrubTest and fixed that. Also fixed dtest multi data directory failure I had missed. Test runs: http://cassci.datastax.com/view/Dev/view/krummas/job/krummas-marcuse-6696-11-testall http://cassci.datastax.com/view/Dev/view/krummas/job/6696_dtest
          Hide
          yukim Yuki Morishita added a comment -

          Marcus Eriksson As you pointed out, progress display will be messed up.
          Since total bytes received for each boundary cannot be determined beforehand right now, displaying constant name is the way to go. For that, keyspace and table names are enough imo.
          Of course if we only have one disc, then we can do the way we do now (showing the whole path).

          Other than that, streaming part seems good to me.

          Show
          yukim Yuki Morishita added a comment - Marcus Eriksson As you pointed out, progress display will be messed up. Since total bytes received for each boundary cannot be determined beforehand right now, displaying constant name is the way to go. For that, keyspace and table names are enough imo. Of course if we only have one disc, then we can do the way we do now (showing the whole path). Other than that, streaming part seems good to me.
          Hide
          krummas Marcus Eriksson added a comment -

          pushed 2 new commits to the repo - first one fixes a bug where streaming would never finish if we streamed to several sstables (CASSANDRA-10949).

          The second commit fixes the progress reporting by introducing a writer id that we key on instead of file name, this means that the file name will change in netstats, but the progress will be correct.

          http://cassci.datastax.com/view/Dev/view/krummas/job/krummas-marcuse-6696-11-testall
          http://cassci.datastax.com/view/Dev/view/krummas/job/6696_dtest

          Philip Thompson could you have a look at the dtests/ccm changes as well so we can merge them at the same time? https://github.com/krummas/cassandra-dtest/commits/marcuse/6696 and https://github.com/krummas/ccm/commits/multi-data-dirs

          Show
          krummas Marcus Eriksson added a comment - pushed 2 new commits to the repo - first one fixes a bug where streaming would never finish if we streamed to several sstables ( CASSANDRA-10949 ). The second commit fixes the progress reporting by introducing a writer id that we key on instead of file name, this means that the file name will change in netstats, but the progress will be correct. http://cassci.datastax.com/view/Dev/view/krummas/job/krummas-marcuse-6696-11-testall http://cassci.datastax.com/view/Dev/view/krummas/job/6696_dtest Philip Thompson could you have a look at the dtests/ccm changes as well so we can merge them at the same time? https://github.com/krummas/cassandra-dtest/commits/marcuse/6696 and https://github.com/krummas/ccm/commits/multi-data-dirs
          Hide
          carlyeks Carl Yeksigian added a comment -

          Changes LGTM. Do we have any idea why HintsCatalogTest.loadCompletenessAndOrderTest is failing?

          Show
          carlyeks Carl Yeksigian added a comment - Changes LGTM. Do we have any idea why HintsCatalogTest.loadCompletenessAndOrderTest is failing?
          Hide
          yukim Yuki Morishita added a comment -

          Carl Yeksigian that test should have been fixed in CASSANDRA-10950.

          Show
          yukim Yuki Morishita added a comment - Carl Yeksigian that test should have been fixed in CASSANDRA-10950 .
          Hide
          carlyeks Carl Yeksigian added a comment -

          Yuki Morishita Ah, thanks; I'm still catching up on the changes

          Show
          carlyeks Carl Yeksigian added a comment - Yuki Morishita Ah, thanks; I'm still catching up on the changes
          Hide
          yukim Yuki Morishita added a comment - - edited

          Marcus Eriksson I still prefer just returning 'keyspace name/table name pair' in RangeAwareSSTableWriter#getFilename over adding UUID to ProgressInfo. Even with ID, nodetool netstats will still show constantly changing file name with inaccurate bytes.
          My suggested change is here.
          SSTableMultiWriter#getFilename is also used in debug log when complete flushing SSTable(s), and because RangeAwareSSTableWriter can write SSTables when flushing, I think displaying just ks/table name there too is not confusing than displaying only last written file name. (edit: This looks like no problem here, my bad)

          Show
          yukim Yuki Morishita added a comment - - edited Marcus Eriksson I still prefer just returning 'keyspace name/table name pair' in RangeAwareSSTableWriter#getFilename over adding UUID to ProgressInfo . Even with ID, nodetool netstats will still show constantly changing file name with inaccurate bytes. My suggested change is here . SSTableMultiWriter#getFilename is also used in debug log when complete flushing SSTable(s), and because RangeAwareSSTableWriter can write SSTables when flushing, I think displaying just ks/table name there too is not confusing than displaying only last written file name. (edit: This looks like no problem here, my bad)
          Show
          krummas Marcus Eriksson added a comment - ok, merged, and new builds triggered: http://cassci.datastax.com/view/Dev/view/krummas/job/krummas-marcuse-6696-11-testall http://cassci.datastax.com/view/Dev/view/krummas/job/6696_dtest
          Hide
          yukim Yuki Morishita added a comment -

          +1

          Show
          yukim Yuki Morishita added a comment - +1
          Hide
          krummas Marcus Eriksson added a comment -

          committed, thanks guys!

          Show
          krummas Marcus Eriksson added a comment - committed, thanks guys!

            People

            • Assignee:
              krummas Marcus Eriksson
              Reporter:
              kohlisankalp sankalp kohli
              Reviewer:
              Carl Yeksigian
              Tester:
              Ryan McGuire
            • Votes:
              6 Vote for this issue
              Watchers:
              37 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development