Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Won't Fix
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      this sounds cool, we should reimplement it in the open source cassandra;

      http://www.acunu.com/2/post/2012/07/incremental-repair.html

        Activity

        Hide
        Jonathan Ellis added a comment - - edited

        This is how the original Dynamo paper describes maintaining merkle trees. The problem that Acunu doesn't mention is that this forces you to do read and rehash all the rows sharing the tree leaf with row X, whenever any row X is updated. So you are trading sequential i/o for random i/o... not a good move, unless you assume SSD or a small dataset (and even then, you're rehashing many rows on each update, not just one, so it's far from clear that this is a good trade).

        I'm a bigger fan of the continuous repair approach enabled by CASSANDRA-3912, and discussed in CASSANDRA-2699 (although i think 2699 overcomplicates things).

        Show
        Jonathan Ellis added a comment - - edited This is how the original Dynamo paper describes maintaining merkle trees. The problem that Acunu doesn't mention is that this forces you to do read and rehash all the rows sharing the tree leaf with row X, whenever any row X is updated. So you are trading sequential i/o for random i/o... not a good move, unless you assume SSD or a small dataset (and even then, you're rehashing many rows on each update, not just one, so it's far from clear that this is a good trade). I'm a bigger fan of the continuous repair approach enabled by CASSANDRA-3912 , and discussed in CASSANDRA-2699 (although i think 2699 overcomplicates things).
        Hide
        Mike Bulman added a comment -

        fwiw, continuous repair is on the roadmap for DataStax OpsCenter, so taking advantage of incremental repair will be extremely simple from an end user standpoint.

        Show
        Mike Bulman added a comment - fwiw, continuous repair is on the roadmap for DataStax OpsCenter, so taking advantage of incremental repair will be extremely simple from an end user standpoint.
        Hide
        Nicolas Favre-Felix added a comment -

        Jonathan,

        I wrote the blog post linked in this ticket; the incremental repair process we've implemented is not doing any random I/O on insert as you suggest.

        Instead, we maintain a Merkle Tree (MT) in memory and update it with every single column insert in ColumnFamilyStore.apply(). We use column.updateDigest(digest) on all the changes in order to create a hash per column update and then XOR this hash with the existing one in the Merkle Tree bucket for the corresponding row.
        This Merkle Tree is created with the column family (one per range), initialized with zeros, and persisted to disk with regular snapshots.
        The commutative properties of XOR make it possible to update the MT incrementally without having to read on write.

        When an incremental repair session starts, the CFS swap out their existing MTs for new empty ones that will receive subsequent updates.

        There are a few downsides to this approach:

        • It is possible for the incremental MTs to miss a few inserts that happen when the replicas involved swap out their MTs for new ones. An insert will be in the previous MT for node "A" but in the fresh one for node "B", for instance. This leads to either a very small amount of extra streaming or some unrepaired changes. For this reason, we still recommend that users run either a full repair or a "tombstone-only repair" at least once every GCGraceSeconds.
        • There is some overhead to keeping these MTs in memory. We actually maintain only the leaves as a single ByteBuffer instead of creating all the intermediate nodes like the MerkleTree class does. To avoid using too much RAM, we allocate a fixed amount of memory per CF and divide it into a number of smaller buffers (one per range) in order to give the same guarantees regardless of the number of ranges per CF.
        • There is a small cost in insert, about half of which is due to the hash function (MD5).

        We are looking into making our patch available to the community and would welcome suggestions to solve or improve on these limitations.

        Show
        Nicolas Favre-Felix added a comment - Jonathan, I wrote the blog post linked in this ticket; the incremental repair process we've implemented is not doing any random I/O on insert as you suggest. Instead, we maintain a Merkle Tree (MT) in memory and update it with every single column insert in ColumnFamilyStore.apply(). We use column.updateDigest(digest) on all the changes in order to create a hash per column update and then XOR this hash with the existing one in the Merkle Tree bucket for the corresponding row. This Merkle Tree is created with the column family (one per range), initialized with zeros, and persisted to disk with regular snapshots. The commutative properties of XOR make it possible to update the MT incrementally without having to read on write. When an incremental repair session starts, the CFS swap out their existing MTs for new empty ones that will receive subsequent updates. There are a few downsides to this approach: It is possible for the incremental MTs to miss a few inserts that happen when the replicas involved swap out their MTs for new ones. An insert will be in the previous MT for node "A" but in the fresh one for node "B", for instance. This leads to either a very small amount of extra streaming or some unrepaired changes. For this reason, we still recommend that users run either a full repair or a "tombstone-only repair" at least once every GCGraceSeconds. There is some overhead to keeping these MTs in memory. We actually maintain only the leaves as a single ByteBuffer instead of creating all the intermediate nodes like the MerkleTree class does. To avoid using too much RAM, we allocate a fixed amount of memory per CF and divide it into a number of smaller buffers (one per range) in order to give the same guarantees regardless of the number of ranges per CF. There is a small cost in insert, about half of which is due to the hash function (MD5). We are looking into making our patch available to the community and would welcome suggestions to solve or improve on these limitations.
        Hide
        Jonathan Ellis added a comment -

        The commutative properties of XOR make it possible to update the MT incrementally without having to read on write.

        Thanks for the clarification, Nicolas. That sounds like a reasonable approach.

        To avoid using too much RAM, we allocate a fixed amount of memory per CF and divide it into a number of smaller buffers (one per range) in order to give the same guarantees regardless of the number of ranges per CF

        Meaning, you give each CF less than 64k ranges * 16 bytes / range?

        Show
        Jonathan Ellis added a comment - The commutative properties of XOR make it possible to update the MT incrementally without having to read on write. Thanks for the clarification, Nicolas. That sounds like a reasonable approach. To avoid using too much RAM, we allocate a fixed amount of memory per CF and divide it into a number of smaller buffers (one per range) in order to give the same guarantees regardless of the number of ranges per CF Meaning, you give each CF less than 64k ranges * 16 bytes / range?
        Hide
        T Jake Luciani added a comment -


        Is there a startup cost associated with the approach? i.e. How to you know the initial hash?

        Show
        T Jake Luciani added a comment - Is there a startup cost associated with the approach? i.e. How to you know the initial hash?
        Hide
        Nicolas Favre-Felix added a comment -

        Meaning, you give each CF less than 64k ranges * 16 bytes / range?

        Right, that would be too much. At the moment, we give each CF 256 KB to be split into all of its ranges. For num_tokens=256, that's 1 KB per range on average - we do not yet scale this number according to the range size.

        A node with num_tokens = 1 owning a single range would allocate 256 KB in a single direct ByteBuffer. Moving to num_tokens = 256 gives the ColumnFamilyStore 256 ranges, and allocates a 1 KB ByteBuffer per range. In both cases the keys in any given range are covered by as many "leaf bytes" on average, regardless of the number of ranges.

        Is there a startup cost associated with the approach? i.e. How to you know the initial hash?

        We do have to reload $num_tokens ByteBuffers when creating the ColumnFamilyStore, for a total of 256KB per CF with our current defaults. This is not something we've measured but I suspect that the cost is fairly small, as it is now for the cache snapshots: it is O(number of CFs), not O(N) like the old cache preloads.

        Show
        Nicolas Favre-Felix added a comment - Meaning, you give each CF less than 64k ranges * 16 bytes / range? Right, that would be too much. At the moment, we give each CF 256 KB to be split into all of its ranges. For num_tokens=256, that's 1 KB per range on average - we do not yet scale this number according to the range size. A node with num_tokens = 1 owning a single range would allocate 256 KB in a single direct ByteBuffer. Moving to num_tokens = 256 gives the ColumnFamilyStore 256 ranges, and allocates a 1 KB ByteBuffer per range. In both cases the keys in any given range are covered by as many "leaf bytes" on average, regardless of the number of ranges. Is there a startup cost associated with the approach? i.e. How to you know the initial hash? We do have to reload $num_tokens ByteBuffers when creating the ColumnFamilyStore, for a total of 256KB per CF with our current defaults. This is not something we've measured but I suspect that the cost is fairly small, as it is now for the cache snapshots: it is O(number of CFs), not O(N) like the old cache preloads.
        Hide
        Jonathan Ellis added a comment -

        The commutative properties of XOR make it possible to update the MT incrementally without having to read on write

        Hang on, let's flesh this out.

        I have an md5 hash (or part of one, see below) per row in a MerkleTree TreeRange. I xor all these together to get my initial state, S. To update row A to row A', I need to take S xor hash(A) xor hash(A').

        So I still need to read-on-write to compute hash(A), I just don't have to rehash everything else in the same TreeRange.

        (I can imagine breaking this down into xoring individual columns, which would mean we would only need to read modified columns and not the entire row, but the principle is the same.)

        For num_tokens=256, that's 1 KB per range on average

        I see, you mean vnode ranges. What I meant was MT TreeRanges... a MT can have 64k TR. Ideally you will have 16 bytes (md5 size) per TR. You can throw away some bytes at the cost of false negatives, i.e., with a single byte per TR, two replicas will think they have the same data even when they do not 1/256 of the time.

        But if you have 64k 1-byte treeranges, how do you fit that into 1KB? Do you reduce the TR granularity further? 64k already feels too low... although this is mitigated somewhat by vnodes.

        do have to reload $num_tokens ByteBuffers when creating the ColumnFamilyStore

        And sync the BB saving with CF flushes so CL replay matches up, I imagine.

        Show
        Jonathan Ellis added a comment - The commutative properties of XOR make it possible to update the MT incrementally without having to read on write Hang on, let's flesh this out. I have an md5 hash (or part of one, see below) per row in a MerkleTree TreeRange. I xor all these together to get my initial state, S. To update row A to row A', I need to take S xor hash(A) xor hash(A'). So I still need to read-on-write to compute hash(A), I just don't have to rehash everything else in the same TreeRange. (I can imagine breaking this down into xoring individual columns, which would mean we would only need to read modified columns and not the entire row, but the principle is the same.) For num_tokens=256, that's 1 KB per range on average I see, you mean vnode ranges. What I meant was MT TreeRanges... a MT can have 64k TR. Ideally you will have 16 bytes (md5 size) per TR. You can throw away some bytes at the cost of false negatives, i.e., with a single byte per TR, two replicas will think they have the same data even when they do not 1/256 of the time. But if you have 64k 1-byte treeranges, how do you fit that into 1KB? Do you reduce the TR granularity further? 64k already feels too low... although this is mitigated somewhat by vnodes. do have to reload $num_tokens ByteBuffers when creating the ColumnFamilyStore And sync the BB saving with CF flushes so CL replay matches up, I imagine.
        Hide
        Nicolas Favre-Felix added a comment -

        Indeed, I've used "range" as Range<Token>, the range of tokens owned by a node; I should have made this clearer.
        We are not using the MerkleTree class or its TreeRange objects, but updating a single ByteBuffer directly instead of creating the whole tree with its hundreds of internal objects. This is equivalent to updating the leaves alone, without propagating the hash upwards in the tree. Yes, that means comparing two trees is O(leaf count).

        I xor all these together to get my initial state, S. To update row A to row A', I need to take S xor hash(A) xor hash(A').

        If you've lready xor'd all these together, S does include the hash of your existing row A. Updating A to A' hashes A' and returns S' = S xor hash(A'), which is hash(A') xor hash(A).

        In practice, this is how it works step by step:

        1. Load existing buffers when the ColumnFamilyStore is created: per Range<Token>, load an existing buffer or create a new one initialized with zeros.
        2. ColumnFamilyStore.apply() is called with columns X and Y in row A. For instance, row A could have token 0x10, falling in the range (0x00, 0x20]. The incremental repair ByteBuffer for this range is 1 KB in size.
        3. Create a new digest and run Column.updateDigest() on X and Y sucessively. We end up with H = hash(X) xor hash(Y); H is 16 bytes long.
        4. Calculate O, the offset in the ByteBuffer that corresponds to H: in this case, it's around 512 since 0x10 is close to the middle of the range (0x00, 0x20].
        5. For each byte i of H, we set buffer[O+i] = buffer[O+i] xor H[i].

        During the repair session, the replicas send out their existing ByteBuffers for the range being repaired and replace them with empty ones that will receive subsequent inserts.

        And sync the BB saving with CF flushes so CL replay matches up, I imagine.

        Yes. If you terminate Cassandra at this stage, the ByteBuffer is written to disk and will contains [0,0.... a few bytes of hash(X) xor hash(Y) around the middle ... 0,0,0,0].

        Show
        Nicolas Favre-Felix added a comment - Indeed, I've used "range" as Range<Token>, the range of tokens owned by a node; I should have made this clearer. We are not using the MerkleTree class or its TreeRange objects, but updating a single ByteBuffer directly instead of creating the whole tree with its hundreds of internal objects. This is equivalent to updating the leaves alone, without propagating the hash upwards in the tree. Yes, that means comparing two trees is O(leaf count). I xor all these together to get my initial state, S. To update row A to row A', I need to take S xor hash(A) xor hash(A'). If you've lready xor'd all these together, S does include the hash of your existing row A. Updating A to A' hashes A' and returns S' = S xor hash(A'), which is hash(A') xor hash(A). In practice, this is how it works step by step: Load existing buffers when the ColumnFamilyStore is created: per Range<Token>, load an existing buffer or create a new one initialized with zeros. ColumnFamilyStore.apply() is called with columns X and Y in row A. For instance, row A could have token 0x10, falling in the range (0x00, 0x20]. The incremental repair ByteBuffer for this range is 1 KB in size. Create a new digest and run Column.updateDigest() on X and Y sucessively. We end up with H = hash(X) xor hash(Y); H is 16 bytes long. Calculate O, the offset in the ByteBuffer that corresponds to H: in this case, it's around 512 since 0x10 is close to the middle of the range (0x00, 0x20]. For each byte i of H, we set buffer [O+i] = buffer [O+i] xor H [i] . During the repair session, the replicas send out their existing ByteBuffers for the range being repaired and replace them with empty ones that will receive subsequent inserts. And sync the BB saving with CF flushes so CL replay matches up, I imagine. Yes. If you terminate Cassandra at this stage, the ByteBuffer is written to disk and will contains [0,0.... a few bytes of hash(X) xor hash(Y) around the middle ... 0,0,0,0] .
        Hide
        Jonathan Ellis added a comment -

        If you've lready xor'd all these together, S does include the hash of your existing row A. Updating A to A' hashes A'

        Right, I'm saying that given S-including-hash-of-A, what you want is to take A out when you add A'. Otherwise, if you have a node that (correctly) has A', but doesn't know about A (maybe it was compacted out before it got to build S), then it won't agree on the same state S even though it has the same data.

        Show
        Jonathan Ellis added a comment - If you've lready xor'd all these together, S does include the hash of your existing row A. Updating A to A' hashes A' Right, I'm saying that given S-including-hash-of-A, what you want is to take A out when you add A'. Otherwise, if you have a node that (correctly) has A', but doesn't know about A (maybe it was compacted out before it got to build S), then it won't agree on the same state S even though it has the same data.
        Hide
        Nicolas Favre-Felix added a comment -

        A node wouldn't have missed A because of a compaction but because A was not inserted there, since S is not built from the existing data on disk but incrementally with each change: S really represents the combined history of all the changes performed on the token range since the last repair session.
        So nodes don't have to scan their data to build S, they simply start with S=0 when incremental repair is first enabled regardless of their initial differences and start again with S=0 after each incremental repair session.

        But it is indeed possible for two replicas to have the same data but differing values for S, for instance if a replica gets A and A' whereas another misses A but gets A': this would lead to some unnecessary streaming even though they both have the latest value A'. This could be avoided by removing A from S as you suggest, but the cost of doing random I/O after each write is too expensive, as you pointed out earlier.

        We are open to suggestions on how to improve this process and get this feature upstreamed with these issues addressed or understood as inherent limitations.

        Show
        Nicolas Favre-Felix added a comment - A node wouldn't have missed A because of a compaction but because A was not inserted there, since S is not built from the existing data on disk but incrementally with each change: S really represents the combined history of all the changes performed on the token range since the last repair session. So nodes don't have to scan their data to build S, they simply start with S=0 when incremental repair is first enabled regardless of their initial differences and start again with S=0 after each incremental repair session. But it is indeed possible for two replicas to have the same data but differing values for S, for instance if a replica gets A and A' whereas another misses A but gets A': this would lead to some unnecessary streaming even though they both have the latest value A'. This could be avoided by removing A from S as you suggest, but the cost of doing random I/O after each write is too expensive, as you pointed out earlier. We are open to suggestions on how to improve this process and get this feature upstreamed with these issues addressed or understood as inherent limitations.
        Hide
        Jonathan Ellis added a comment -

        we still recommend that users run either a full repair or a "tombstone-only repair" at least once every GCGraceSeconds

        What is a tombstone-only repair?

        If we still need repair every gcgs, I'm not sure how much win there is here, given that with "durable HH" (CASSANDRA-2034) you only need AES repair when nodes die (or lose disks) permanently.

        Could be interesting to replace TreeRange with your optimized ByteBuffer (or BigLongArray – CASSANDRA-3432) though, with or without full "incremental mode" later. I'd be glad to review a patch along those lines as a first step.

        Show
        Jonathan Ellis added a comment - we still recommend that users run either a full repair or a "tombstone-only repair" at least once every GCGraceSeconds What is a tombstone-only repair? If we still need repair every gcgs, I'm not sure how much win there is here, given that with "durable HH" ( CASSANDRA-2034 ) you only need AES repair when nodes die (or lose disks) permanently. Could be interesting to replace TreeRange with your optimized ByteBuffer (or BigLongArray – CASSANDRA-3432 ) though, with or without full "incremental mode" later. I'd be glad to review a patch along those lines as a first step.
        Hide
        Nicolas Favre-Felix added a comment -

        What is a tombstone-only repair?

        This is a separate feature that we provide, that runs the AES on tombstones exclusively. We provide it for users who repair in order to avoid reappearing tombstones rather that to reduce entropy. The main point is that repair and tombstone repair are both guaranteed to result in a synchronized cluster, which is not the case for incremental repair.

        Thanks for the review offer, I will extract and clean up the patch and submit it to this ticket.

        Show
        Nicolas Favre-Felix added a comment - What is a tombstone-only repair? This is a separate feature that we provide, that runs the AES on tombstones exclusively. We provide it for users who repair in order to avoid reappearing tombstones rather that to reduce entropy. The main point is that repair and tombstone repair are both guaranteed to result in a synchronized cluster, which is not the case for incremental repair. Thanks for the review offer, I will extract and clean up the patch and submit it to this ticket.
        Hide
        Stefan Fleiter added a comment -

        Since it has gotten very silent here:
        Is it still planned to integrate this feature in Apache Cassandra?

        Show
        Stefan Fleiter added a comment - Since it has gotten very silent here: Is it still planned to integrate this feature in Apache Cassandra?
        Hide
        Jonathan Ellis added a comment -

        I am still happy to review a patch to optimize AES memory usage, but per the above discussion the in-memory trees themselves do not sound very useful (since you still need full repair as well).

        Show
        Jonathan Ellis added a comment - I am still happy to review a patch to optimize AES memory usage, but per the above discussion the in-memory trees themselves do not sound very useful (since you still need full repair as well).
        Hide
        Jonathan Ellis added a comment -

        resolving as wontfix; let's move the memory optimization into another ticket.

        Show
        Jonathan Ellis added a comment - resolving as wontfix; let's move the memory optimization into another ticket.
        Hide
        Stefan Fleiter added a comment -

        Won't the in-memory trees still be useful especially for doing repairs under heavy load situation?
        With Apache Cassandra Anti Entropy finding the inconsistencies adds a big additional load on the servers while with continuous Anti Entropy only the amount of inconsistencies adds load.
        A cheap continuous Anti Entropy which repairs more than 99% of all inconsistencies automatically and can be active even if the cluster is under heavy load seems beneficial to me.
        This is especially the case if QUORUM read/writes can not used or for a recovering cluster after an outage of several nodes for more than max_hint_window_in_ms.
        Getting things 100% correct can for some scenarios wait for a longer time than repairing most inconsistencies.

        Show
        Stefan Fleiter added a comment - Won't the in-memory trees still be useful especially for doing repairs under heavy load situation? With Apache Cassandra Anti Entropy finding the inconsistencies adds a big additional load on the servers while with continuous Anti Entropy only the amount of inconsistencies adds load. A cheap continuous Anti Entropy which repairs more than 99% of all inconsistencies automatically and can be active even if the cluster is under heavy load seems beneficial to me. This is especially the case if QUORUM read/writes can not used or for a recovering cluster after an outage of several nodes for more than max_hint_window_in_ms. Getting things 100% correct can for some scenarios wait for a longer time than repairing most inconsistencies.
        Hide
        Jonathan Ellis added a comment -

        A cheap continuous Anti Entropy which repairs more than 99% of all inconsistencies automatically and can be active even if the cluster is under heavy load seems beneficial to me.

        You already get this with hinted handoff.

        Show
        Jonathan Ellis added a comment - A cheap continuous Anti Entropy which repairs more than 99% of all inconsistencies automatically and can be active even if the cluster is under heavy load seems beneficial to me. You already get this with hinted handoff.
        Hide
        Stefan Fleiter added a comment -

        This solutions seems conceptionally simpler and more robust to me than hinted handoff.

        • No additional write load is added
        • No necessary knowledge of other nodes state
        • Repair can be controlled by each node themselves
        • If a repair gets interrupted it can be restarted easily
        • Existing Hinted Handoff problems are large in count
          • A search for open Cassandra Hinted Handoff bugs results in a list of over 500 issues

        So a usage scenario for "In-memory merkle trees for repair" might be to let this replace HH entirely.

        Show
        Stefan Fleiter added a comment - This solutions seems conceptionally simpler and more robust to me than hinted handoff. No additional write load is added No necessary knowledge of other nodes state Repair can be controlled by each node themselves If a repair gets interrupted it can be restarted easily Existing Hinted Handoff problems are large in count A search for open Cassandra Hinted Handoff bugs results in a list of over 500 issues So a usage scenario for "In-memory merkle trees for repair" might be to let this replace HH entirely.
        Hide
        Jonathan Ellis added a comment -

        It is possible for the incremental MTs to miss a few inserts that happen when the replicas involved swap out their MTs for new ones

        Replacing HH with something less robust is not very attractive to me.

        Show
        Jonathan Ellis added a comment - It is possible for the incremental MTs to miss a few inserts that happen when the replicas involved swap out their MTs for new ones Replacing HH with something less robust is not very attractive to me.

          People

          • Assignee:
            Unassigned
            Reporter:
            Marcus Eriksson
          • Votes:
            1 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development