Bookkeeper
  1. Bookkeeper
  2. BOOKKEEPER-237

Automatic recovery of under-replicated ledgers and its entries

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Implemented
    • Affects Version/s: 4.0.0, 4.1.0
    • Fix Version/s: None
    • Labels:
      None

      Description

      As per the current design of BookKeeper, if one of the BookKeeper server dies, there is no automatic mechanism to identify and recover the under replicated ledgers and its corresponding entries. This would lead to losing the successfully written entries, which will be a critical problem in sensitive systems. This document is trying to describe few proposals to overcome these limitations.

      1.
      Recording of underreplication of ledger entries Sub-task Closed Ivan Kelly
       
      2.
      Detection of under replication Sub-task Closed Ivan Kelly
       
      3.
      Rereplicating of under replicated data Sub-task Closed Uma Maheswara Rao G
       
      4.
      Provide automatic mechanism to know bookie failures Sub-task Closed Rakesh R
       
      5.
      Ability to disable auto recovery temporarily Sub-task Closed Rakesh R
       
      6.
      bookkeeper does not put enough meta-data in to do recovery properly Sub-task Closed Ivan Kelly
       
      7.
      Periodic checking of ledger replication status Sub-task Closed Ivan Kelly
       
      8.
      Provide LedgerFragmentReplicator which should replicate the fragments found from LedgerChecker Sub-task Closed Uma Maheswara Rao G
       
      9.
      Prepare bookie vs ledgers cache and will be used by the Auditor Sub-task Closed Rakesh R
       
      10.
      Provide distributed lock implementation which will be used by Replication worker while replicating fragments. Sub-task Resolved Uma Maheswara Rao G
       
      11.
      Exceptions for replication Sub-task Closed Ivan Kelly
       
      12.
      Manage auditing and replication processes Sub-task Closed Vinayakumar B
       
      13.
      Delay the replication of a ledger if RW found that its last fragment is in underReplication. Sub-task Closed Uma Maheswara Rao G
       
      14.
      Document about Auto replication service in BK Sub-task Closed Uma Maheswara Rao G
       
      15.
      LedgerManagers should consider 'underreplication' node as a special Znode Sub-task Closed Uma Maheswara Rao G
       
      16.
      ReplicationWorker may not get ZK watcher notification on UnderReplication ledger lock deletion. Sub-task Closed Uma Maheswara Rao G
       
      17.
      ZkLedgerUnderreplicationManager.markLedgerUnderreplicated() is adding duplicate missingReplicas while multiple bk failed for the same ledger Sub-task Closed Rakesh R
       
      18.
      Clean up LedgerManagerFactory and LedgerManager usage in tests Sub-task Closed Rakesh R
       
      19.
      replicateLedgerFragment should throw Exceptions in error conditions Sub-task Closed Uma Maheswara Rao G
       
      20.
      It should not be possible to replicate a ledger fragment which is at the end of an open ledger Sub-task Closed Ivan Kelly
       
      21.
      Ledger entries should be replicated sequentially instead of parallel. Sub-task Closed Uma Maheswara Rao G
       
      22.
      Let's add Thread name for ReplicationWorker thread. Sub-task Closed Uma Maheswara Rao G
       
      23.
      Integration Test - Perform bookie rereplication cycle by Auditor-RW processes Sub-task Closed Rakesh R
       
      24.
      LedgerChecker returns underreplicated fragments for an closed ledger with no entries Sub-task Closed Ivan Kelly
       
      25.
      Hierarchical zk underreplication manager should clean up its hierarchy when done to allow for fast acquisition of underreplicated entries Sub-task Closed Ivan Kelly
       
      26.
      Store hostname of locker in replication lock Sub-task Closed Ivan Kelly
       
      27.
      SingleFragmentCallback should be created with the fragment first entry id, not the first stored id Sub-task Resolved Uma Maheswara Rao G
       
      28.
      Lock does not guarantee any access order and not giving chance to longest-waiting RW Sub-task Resolved Rakesh R
       
      29.
      Make auditor Vote znode store a protobuf containing the host that voted Sub-task Closed Ivan Kelly
       
      30.
      Expose command options in bookie scripts to disable/enable auto recovery temporarily Sub-task Closed Rakesh R
       
      31.
      Provide an option to start Autorecovery along with Bookie Servers Sub-task Closed Uma Maheswara Rao G
       
      32.
      Ensure that the auditor and replication worker will shutdown if they lose their ZK session Sub-task Closed Ivan Kelly
       

        Activity

        Hide
        Rakesh R added a comment -

        Attached a design doc draft outlining the observations. Look forward to your comments. Hopefully, this will help in structuring the discussions going forward.

        Show
        Rakesh R added a comment - Attached a design doc draft outlining the observations. Look forward to your comments. Hopefully, this will help in structuring the discussions going forward.
        Hide
        Flavio Junqueira added a comment -

        This is great, Rakesh, I'll have a close look at the document. For now, let drop a couple of quick comments here based on the jira description:

        1. One concern with automatically restoring bookies I have is generating instability in a cluster under intermittent network problems. If there are lots of false suspicions because the network is in a bad shape, then we may end up shuffling a lot of data around unnecessarily.
        2. In principle we shouldn't lose data because of a faulty bookie. We would need multiple crashes to get to lose data.
        Show
        Flavio Junqueira added a comment - This is great, Rakesh, I'll have a close look at the document. For now, let drop a couple of quick comments here based on the jira description: One concern with automatically restoring bookies I have is generating instability in a cluster under intermittent network problems. If there are lots of false suspicions because the network is in a bad shape, then we may end up shuffling a lot of data around unnecessarily. In principle we shouldn't lose data because of a faulty bookie. We would need multiple crashes to get to lose data.
        Hide
        Rakesh R added a comment -

        Thanks Flavio for the interest.

        Yes, I agree with you and again it depends on the criticality of the data(tolerated failures). Here the proposed recovery mechanism would tolerate (ensemble - quorum) failures and trigger when (ensemble - quorum) + 1 Bookie reported as faulty. Still we would be able to improve by making it as an admin configurable item.

        Show
        Rakesh R added a comment - Thanks Flavio for the interest. Yes, I agree with you and again it depends on the criticality of the data(tolerated failures). Here the proposed recovery mechanism would tolerate (ensemble - quorum) failures and trigger when (ensemble - quorum) + 1 Bookie reported as faulty. Still we would be able to improve by making it as an admin configurable item.
        Hide
        Ivan Kelly added a comment -

        This doc is quite in line to what I had been thinking of. It needs to be broken into little parts though, because as it is now, it's quite hard to digest.

        Firstly, the change to replication is a really a special case of what can be done with BOOKKEEPER-208. With BK-208, we have the concept of a write and an ack quorum. The write quorum is the set of bookies which an entry will be set to. The ack quorum is the number of bookies who must acknowledge the entry before it is acked to the client application. What is proposed in your doc seems to be this, but with the write quorum always set to ensemble size.

        For the "Accountant", I think a better name would be "Auditor", as accountants have a habit of cooking the books . I think detection should be separated into two subparts, one performed by the accountant, which is elected. The other performed by the bookies themselves.

        • The accountant check should check the zookeeper metadata only. For each ledger, it should check whether all the bookies are available. If not the ledger should be marked as underreplicated. As well as the conditions you propose in 1.6, it should also run periodically.
        • The bookie should check whether it can read the first and last entry for each ledger fragment which it is supposed to have[1]. If it finds that it cannot read a ledger, it should mark the ledger as underreplicated. This is basically a fsck. This should run periodically.

        I don't think its necessary to maintain a mapping of bookies to ledgers. To rereplicate a ledger, it is necessary to read the whole ledger metadata for the ledger, so a mapping of which bookies should contain the ledger is straightforward to build on the fly.

        The rereplication process should be distributed across all bookies. Each bookie should run a replication process which takes the first available unreplicated ledger off the queue and rereplicates it. I don't think we should take load balancing into consideration yet, as it complicates matters a lot.

        [1] This won't necessarily be the first and last entry of the fragment, due to striping.

        Show
        Ivan Kelly added a comment - This doc is quite in line to what I had been thinking of. It needs to be broken into little parts though, because as it is now, it's quite hard to digest. Firstly, the change to replication is a really a special case of what can be done with BOOKKEEPER-208 . With BK-208, we have the concept of a write and an ack quorum. The write quorum is the set of bookies which an entry will be set to. The ack quorum is the number of bookies who must acknowledge the entry before it is acked to the client application. What is proposed in your doc seems to be this, but with the write quorum always set to ensemble size. For the "Accountant", I think a better name would be "Auditor", as accountants have a habit of cooking the books . I think detection should be separated into two subparts, one performed by the accountant, which is elected. The other performed by the bookies themselves. The accountant check should check the zookeeper metadata only. For each ledger, it should check whether all the bookies are available. If not the ledger should be marked as underreplicated. As well as the conditions you propose in 1.6, it should also run periodically. The bookie should check whether it can read the first and last entry for each ledger fragment which it is supposed to have [1] . If it finds that it cannot read a ledger, it should mark the ledger as underreplicated. This is basically a fsck. This should run periodically. I don't think its necessary to maintain a mapping of bookies to ledgers. To rereplicate a ledger, it is necessary to read the whole ledger metadata for the ledger, so a mapping of which bookies should contain the ledger is straightforward to build on the fly. The rereplication process should be distributed across all bookies. Each bookie should run a replication process which takes the first available unreplicated ledger off the queue and rereplicates it. I don't think we should take load balancing into consideration yet, as it complicates matters a lot. [1] This won't necessarily be the first and last entry of the fragment, due to striping.
        Hide
        Rakesh R added a comment -

        Thanks Ivan for the comments.

        The ack quorum is the number of bookies who must acknowledge the entry before it is acked to the client application

        I have gone through BookKeeper-208, 'write quorum' is using the RRDS algo for interleaving ?
        Also will reform the ensemble if any Bookie in this ensemble is slow/dead and timedout ?

        But, I was thinking to avoid the interleaving of entries and ensemble reformation on write entry failures to make recovery simple. Here the idea is, all the ensemble Bookies will be having the replicas in best case. Ack quorum also set to the ensemble size. Here, assured replica = quorum size.
        Please see the doc section '1.4.4 Example: How it works'

        For any ledger,
        in best case, total replicas = ensemble size
        in worst case, total replicas = quorum size, which is the assured/majority replicas.

        Also, I feel able to handle intermittent network problems as by default this approach will tolerate (ensemble - quorum) failures.

        How client will read?
        For an inprogress ledger, bkclient looks to the entire ensemble for reading.
        For a closed ledger, bkclient looks to the CLOSED ensemble for reading.

        I don't think its necessary to maintain a mapping of bookies to ledgers. To rereplicate a ledger, it is necessary to read the whole ledger metadata for the ledger, so a mapping of which bookies should contain the ledger is straightforward to build on the fly.

        I was trying to avoid multiple ZK calls to read ledger metadata for the detection. Only for the failed ledgers, Accountant would read the ledger metadata and initiate the rereplication.

        Also, the map contains the _inprogressreplica information, so the new Accountant would be knowing about the already initiated rereplicas.

        The rereplication process should be distributed across all bookies. Each bookie should run a replication process which takes the first available unreplicated ledger off the queue and rereplicates it.

        Yeah, its good. If I understand correctly, instead of assigning work to the Bookie, Accountant would keep the under replicated ledgers. Bookies (which is not an existing replica holder for that ledger) would takes the unreplicated ledger and after finish send ack to the Accountant. I feel, we should define proper locking here to avoid concurrency?

        Show
        Rakesh R added a comment - Thanks Ivan for the comments. The ack quorum is the number of bookies who must acknowledge the entry before it is acked to the client application I have gone through BookKeeper-208, 'write quorum' is using the RRDS algo for interleaving ? Also will reform the ensemble if any Bookie in this ensemble is slow/dead and timedout ? But, I was thinking to avoid the interleaving of entries and ensemble reformation on write entry failures to make recovery simple. Here the idea is, all the ensemble Bookies will be having the replicas in best case. Ack quorum also set to the ensemble size. Here, assured replica = quorum size. Please see the doc section '1.4.4 Example: How it works' For any ledger, in best case, total replicas = ensemble size in worst case, total replicas = quorum size, which is the assured/majority replicas. Also, I feel able to handle intermittent network problems as by default this approach will tolerate (ensemble - quorum) failures. How client will read? For an inprogress ledger, bkclient looks to the entire ensemble for reading. For a closed ledger, bkclient looks to the CLOSED ensemble for reading. I don't think its necessary to maintain a mapping of bookies to ledgers. To rereplicate a ledger, it is necessary to read the whole ledger metadata for the ledger, so a mapping of which bookies should contain the ledger is straightforward to build on the fly. I was trying to avoid multiple ZK calls to read ledger metadata for the detection. Only for the failed ledgers, Accountant would read the ledger metadata and initiate the rereplication. Also, the map contains the _inprogressreplica information, so the new Accountant would be knowing about the already initiated rereplicas. The rereplication process should be distributed across all bookies. Each bookie should run a replication process which takes the first available unreplicated ledger off the queue and rereplicates it. Yeah, its good. If I understand correctly, instead of assigning work to the Bookie, Accountant would keep the under replicated ledgers. Bookies (which is not an existing replica holder for that ledger) would takes the unreplicated ledger and after finish send ack to the Accountant. I feel, we should define proper locking here to avoid concurrency?
        Hide
        Ivan Kelly added a comment -

        I have gone through BookKeeper-208, 'write quorum' is using the RRDS algo for interleaving ?

        Yes, but if write quorum size is the same as ensemble size, the message will be sent to all

        Also will reform the ensemble if any Bookie in this ensemble is slow/dead and timedout ?

        In the case of a slow/dead bookie, the write can continue writing without issue. When the connection to the slow/dead bookie times out, the client will try to replace the bookie.

        But, I was thinking to avoid the interleaving of entries and ensemble reformation on write entry failures to make recovery simple. Here the idea is, all the ensemble Bookies will be having the replicas in best case. Ack quorum also set to the ensemble size. Here, assured replica = quorum size.
        Please see the doc section '1.4.4 Example: How it works'

        For any ledger,
        in best case, total replicas = ensemble size
        in worst case, total replicas = quorum size, which is the assured/majority replicas.

        This will behave the same if write quorum is the same as ensemble size, and ack quorum is the same as "assured replicas".

        Also, I feel able to handle intermittent network problems as by default this approach will tolerate (ensemble - quorum) failures.

        For intermittent network problems, this can be handled by setting the socket timeout to a large value.

        How client will read?
        For an inprogress ledger, bkclient looks to the entire ensemble for reading.

        For a non closed ledger, a read is sent once to all bookies in the ensemble to get the lastEntryConfirmed. Once we have this we read as if we are reading from a closed ledger in which lastEntryConfirmed is the last entry.

        For a closed ledger, bkclient looks to the CLOSED ensemble for reading.

        For a closed ledger it follows a roundrobin reading pattern as it does today. For each entry, it tries to read from a single bookie. If the read fails, it tries the next bookie in the roundrobin.

        I was trying to avoid multiple ZK calls to read ledger metadata for the detection. Only for the failed ledgers, Accountant would read the ledger metadata and initiate the rereplication.

        ZK is a read optimised system, so I think doing multiple reads on it is more effecient than maintaining another list.

        Also, the map contains the _inprogressreplica information, so the new Accountant would be knowing about the already initiated rereplicas.

        ...

        Yeah, its good. If I understand correctly, instead of assigning work to the Bookie, Accountant would keep the under replicated ledgers. Bookies (which is not an existing replica holder for that ledger) would takes the unreplicated ledger and after finish send ack to the Accountant. I feel, we should define proper locking here to avoid concurrency?

        Each rereplicating ledger can have a lock on it. When a bookie goes to rereplicate a ledger, it writes a ephemereal sequential znode as a child of the rereplicated ledger znode. Then it checks if it has the lowest sequence number. If not, it deletes the znode (as this indicates someone else is rereplicating that particular ledger). If it has the lowest sequence number, it has just acquired the lock. Then it checks what rereplication needs to happen on the ledger, it rereplicates, and finally it deletes its lock and then the ledger rereplication znode. I think this removes the need for _inprogress znodes also.

        Show
        Ivan Kelly added a comment - I have gone through BookKeeper-208, 'write quorum' is using the RRDS algo for interleaving ? Yes, but if write quorum size is the same as ensemble size, the message will be sent to all Also will reform the ensemble if any Bookie in this ensemble is slow/dead and timedout ? In the case of a slow/dead bookie, the write can continue writing without issue. When the connection to the slow/dead bookie times out, the client will try to replace the bookie. But, I was thinking to avoid the interleaving of entries and ensemble reformation on write entry failures to make recovery simple. Here the idea is, all the ensemble Bookies will be having the replicas in best case. Ack quorum also set to the ensemble size. Here, assured replica = quorum size. Please see the doc section '1.4.4 Example: How it works' For any ledger, in best case, total replicas = ensemble size in worst case, total replicas = quorum size, which is the assured/majority replicas. This will behave the same if write quorum is the same as ensemble size, and ack quorum is the same as "assured replicas". Also, I feel able to handle intermittent network problems as by default this approach will tolerate (ensemble - quorum) failures. For intermittent network problems, this can be handled by setting the socket timeout to a large value. How client will read? For an inprogress ledger, bkclient looks to the entire ensemble for reading. For a non closed ledger, a read is sent once to all bookies in the ensemble to get the lastEntryConfirmed. Once we have this we read as if we are reading from a closed ledger in which lastEntryConfirmed is the last entry. For a closed ledger, bkclient looks to the CLOSED ensemble for reading. For a closed ledger it follows a roundrobin reading pattern as it does today. For each entry, it tries to read from a single bookie. If the read fails, it tries the next bookie in the roundrobin. I was trying to avoid multiple ZK calls to read ledger metadata for the detection. Only for the failed ledgers, Accountant would read the ledger metadata and initiate the rereplication. ZK is a read optimised system, so I think doing multiple reads on it is more effecient than maintaining another list. Also, the map contains the _inprogressreplica information, so the new Accountant would be knowing about the already initiated rereplicas. ... Yeah, its good. If I understand correctly, instead of assigning work to the Bookie, Accountant would keep the under replicated ledgers. Bookies (which is not an existing replica holder for that ledger) would takes the unreplicated ledger and after finish send ack to the Accountant. I feel, we should define proper locking here to avoid concurrency? Each rereplicating ledger can have a lock on it. When a bookie goes to rereplicate a ledger, it writes a ephemereal sequential znode as a child of the rereplicated ledger znode. Then it checks if it has the lowest sequence number. If not, it deletes the znode (as this indicates someone else is rereplicating that particular ledger). If it has the lowest sequence number, it has just acquired the lock. Then it checks what rereplication needs to happen on the ledger, it rereplicates, and finally it deletes its lock and then the ledger rereplication znode. I think this removes the need for _inprogress znodes also.
        Hide
        Flavio Junqueira added a comment -

        There are a lot (really a lot) of good observations in this document, but I feel that it will be difficult to converge with so much detail at a time. It might be a good idea to agree on the high-level observations first, and here are the two key ones I've been able to extract:

        1. The first part of the document focuses on the difficulty of guaranteeing that all entries of a ledger are fully replicated;
        2. The second part proposes an accountant abstraction.

        For the first part, I got stuck on two points. First, in the regular case, I wouldn't expect many changes to the ensemble of a ledger, so my feeling is that the example on page 2 is a corner case, so I'm not sure we should optimize for such cases. Second, the same example points out that entries can be become underreplicated with so many consecutive replacements, but at the same time the same bookies pop up later in future ensembles. Are you considering that the memory of a bookie is gone once it is removed from an ensemble? If not, then there is no need to re-establish the degree of replication. If the memory of the bookie is wiped out, then we should consider just reconstructing the ledger fragments of the faulty bookie using the recovery tool. Why doesn't it work if we operate at the bookie level?

        About the second part, I like the idea of proposing a mechanism to make sure that ledgers are properly replicated. However, I'm not entirely convinced that we need a new entity in the system. Perhaps we can have bookie running an accountant thread instead and use a simpler mechanism. Here is one proposal. Using ZK, we can create a chain of bookies, where each bookie watches the previous bookie in the sequence of sequential znodes. Let's call the watcher bookie the buddy of the watched bookie. If a bookie crashes, its buddy receives a notification and the buddy is responsible for replicating the content of the crashed bookie. After a crash, we of course need to restore the chain by finding other buddies. Also, there are some corner cases related to multiple failures that we would need to think about more carefully. The bottom line it that a distributed solution might be more robust than a centralized one, and it does not require a new independent entity or a specialized bookie.

        I also have some thoughts about the new suggested schedules. I like the idea in general of having different schedules, especially the one that errors an operation to the ledger upon a crash instead of changing the ensemble automatically. But, I'll postpone my thoughts on them, if it is ok. This is already long...

        Show
        Flavio Junqueira added a comment - There are a lot (really a lot) of good observations in this document, but I feel that it will be difficult to converge with so much detail at a time. It might be a good idea to agree on the high-level observations first, and here are the two key ones I've been able to extract: The first part of the document focuses on the difficulty of guaranteeing that all entries of a ledger are fully replicated; The second part proposes an accountant abstraction. For the first part, I got stuck on two points. First, in the regular case, I wouldn't expect many changes to the ensemble of a ledger, so my feeling is that the example on page 2 is a corner case, so I'm not sure we should optimize for such cases. Second, the same example points out that entries can be become underreplicated with so many consecutive replacements, but at the same time the same bookies pop up later in future ensembles. Are you considering that the memory of a bookie is gone once it is removed from an ensemble? If not, then there is no need to re-establish the degree of replication. If the memory of the bookie is wiped out, then we should consider just reconstructing the ledger fragments of the faulty bookie using the recovery tool. Why doesn't it work if we operate at the bookie level? About the second part, I like the idea of proposing a mechanism to make sure that ledgers are properly replicated. However, I'm not entirely convinced that we need a new entity in the system. Perhaps we can have bookie running an accountant thread instead and use a simpler mechanism. Here is one proposal. Using ZK, we can create a chain of bookies, where each bookie watches the previous bookie in the sequence of sequential znodes. Let's call the watcher bookie the buddy of the watched bookie. If a bookie crashes, its buddy receives a notification and the buddy is responsible for replicating the content of the crashed bookie. After a crash, we of course need to restore the chain by finding other buddies. Also, there are some corner cases related to multiple failures that we would need to think about more carefully. The bottom line it that a distributed solution might be more robust than a centralized one, and it does not require a new independent entity or a specialized bookie. I also have some thoughts about the new suggested schedules. I like the idea in general of having different schedules, especially the one that errors an operation to the ledger upon a crash instead of changing the ensemble automatically. But, I'll postpone my thoughts on them, if it is ok. This is already long...
        Hide
        Ivan Kelly added a comment -

        However, I'm not entirely convinced that we need a new entity in the system. Perhaps we can have bookie running an accountant thread instead and use a simpler mechanism.

        This is what i imagined accountant to be anyhow. Each bookie would run one.

        I think we need to break this into sub jiras.

        1. detection of under replication
        2. recording of under replication
        3. rereplicating of under replicated data

        If you guys agree, I'll creates separate JIRAs to discuss each.

        Show
        Ivan Kelly added a comment - However, I'm not entirely convinced that we need a new entity in the system. Perhaps we can have bookie running an accountant thread instead and use a simpler mechanism. This is what i imagined accountant to be anyhow. Each bookie would run one. I think we need to break this into sub jiras. detection of under replication recording of under replication rereplicating of under replicated data If you guys agree, I'll creates separate JIRAs to discuss each.
        Hide
        Uma Maheswara Rao G added a comment -

        I think we need to break this into sub jiras.

        1.detection of under replication
        2.recording of under replication
        3.rereplicating of under replicated data
        If you guys agree, I'll creates separate JIRAs to discuss each.

        Also may need to add the item for detecting over replication and clearing the over replicated data on restarts of Bookies.

        Show
        Uma Maheswara Rao G added a comment - I think we need to break this into sub jiras. 1.detection of under replication 2.recording of under replication 3.rereplicating of under replicated data If you guys agree, I'll creates separate JIRAs to discuss each. Also may need to add the item for detecting over replication and clearing the over replicated data on restarts of Bookies.
        Hide
        Ivan Kelly added a comment -

        @Uma Will do, I think a revisit of garbage collection in general would be good.

        Show
        Ivan Kelly added a comment - @Uma Will do, I think a revisit of garbage collection in general would be good.
        Hide
        Rakesh R added a comment -

        Thanks again Flavio for the detailed info

        I wouldn't expect many changes to the ensemble of a ledger, so my feeling is that the example on page 2 is a corner case, so I'm not sure we should optimize for such cases

        I feel, should consider all the corner cases since WALs are too costly. Also we would be able to showcase BK as an efficient WAL tool.

        Second, the same example points out that entries can be become underreplicated with so many consecutive replacements, but at the same time the same bookies pop up later in future ensembles. Are you considering that the memory of a bookie is gone once it is removed from an ensemble? If not, then there is no need to re-establish the degree of replication
        .....
        Why doesn't it work if we operate at the bookie level?

        Yeah, its correct, when the Bookie comes back(either rejoins or restarted) it will be there in Bookie's memory. Only the exceptional case is, say a Bookie has few ledgers which are successfully written and unfortunately the current ledger writing is getting timedout. The client would reform the ensemble and continue writing.
        Here, only this ledger to be considered as under replicated as it may endup with partial entries and not in the Bookie level?

        Here is one proposal. Using ZK, we can create a chain of bookies, where each bookie watches the previous bookie in the sequence of sequential znodes. Let's call the watcher bookie the buddy of the watched bookie. If a bookie crashes, its buddy receives a notification and the buddy is responsible for replicating the content of the crashed bookie. After a crash, we of course need to restore the chain by finding other buddies. Also, there are some corner cases related to multiple failures that we would need to think about more carefully.

        Its good to see new ideas. Here, I have few concerns:

        1. As you pointed out needs to consider multiple crashes?
          Assume Bookie chain : BK1->BK2->BK3->BK4->BK5. Say, BK2 & BK3 dies. BK4 doesn't knows about BK2. It would be even more painful, if many consecutive failures.
        2. Say current ledger writing is getting timedout as mentioned above?
          Here, consider a case where intermittent n/w fluctuations.
        3. Watcher Bookie might be replica holder of that ledger.
          Assume Bookie chain : BK1->BK2->BK3->BK4->BK5. Say BK2 failed, BK3 would not be able to replicate the content as it may be an existing replica holder.

        The bottom line it that a distributed solution might be more robust than a centralized one, and it does not require a new independent entity or a specialized bookie.

        I feel, under replica detection should be centralized. He should be listening for the under replicas and raise alarm. So, whoever doesn't holds the ledger entry would takes from the queue and rereplicate to it. This would also help to avoid many concurrency due to multiple crashes.

        I like the idea in general of having different schedules, especially the one that errors an operation to the ledger upon a crash instead of changing the ensemble automatically.

        I would like to know more on this. IMHO, avoid the reformation within a ledger and throws specific exception back to the client, so that he would close the ledger and creates a new one. Still client would be able get the ensemble reformation/dynamic bookies on ledger level. My idea is to simplify the ledger parsing for detecting under replica ledger entries and identifying target replica Bookies.

        Show
        Rakesh R added a comment - Thanks again Flavio for the detailed info I wouldn't expect many changes to the ensemble of a ledger, so my feeling is that the example on page 2 is a corner case, so I'm not sure we should optimize for such cases I feel, should consider all the corner cases since WALs are too costly. Also we would be able to showcase BK as an efficient WAL tool. Second, the same example points out that entries can be become underreplicated with so many consecutive replacements, but at the same time the same bookies pop up later in future ensembles. Are you considering that the memory of a bookie is gone once it is removed from an ensemble? If not, then there is no need to re-establish the degree of replication ..... Why doesn't it work if we operate at the bookie level? Yeah, its correct, when the Bookie comes back(either rejoins or restarted) it will be there in Bookie's memory. Only the exceptional case is, say a Bookie has few ledgers which are successfully written and unfortunately the current ledger writing is getting timedout. The client would reform the ensemble and continue writing. Here, only this ledger to be considered as under replicated as it may endup with partial entries and not in the Bookie level? Here is one proposal. Using ZK, we can create a chain of bookies, where each bookie watches the previous bookie in the sequence of sequential znodes. Let's call the watcher bookie the buddy of the watched bookie. If a bookie crashes, its buddy receives a notification and the buddy is responsible for replicating the content of the crashed bookie. After a crash, we of course need to restore the chain by finding other buddies. Also, there are some corner cases related to multiple failures that we would need to think about more carefully. Its good to see new ideas. Here, I have few concerns: As you pointed out needs to consider multiple crashes? Assume Bookie chain : BK1->BK2->BK3->BK4->BK5. Say, BK2 & BK3 dies. BK4 doesn't knows about BK2. It would be even more painful, if many consecutive failures. Say current ledger writing is getting timedout as mentioned above? Here, consider a case where intermittent n/w fluctuations. Watcher Bookie might be replica holder of that ledger. Assume Bookie chain : BK1->BK2->BK3->BK4->BK5. Say BK2 failed, BK3 would not be able to replicate the content as it may be an existing replica holder. The bottom line it that a distributed solution might be more robust than a centralized one, and it does not require a new independent entity or a specialized bookie. I feel, under replica detection should be centralized. He should be listening for the under replicas and raise alarm. So, whoever doesn't holds the ledger entry would takes from the queue and rereplicate to it. This would also help to avoid many concurrency due to multiple crashes. I like the idea in general of having different schedules, especially the one that errors an operation to the ledger upon a crash instead of changing the ensemble automatically. I would like to know more on this. IMHO, avoid the reformation within a ledger and throws specific exception back to the client, so that he would close the ledger and creates a new one. Still client would be able get the ensemble reformation/dynamic bookies on ledger level. My idea is to simplify the ledger parsing for detecting under replica ledger entries and identifying target replica Bookies.
        Hide
        Flavio Junqueira added a comment -

        I feel, should consider all the corner cases since WALs are too costly. Also we would be able to showcase BK as an efficient WAL tool.

        Agreed, I'm not saying we should discard such cases, I'm just saying that I don't expect these to be the regular case. If you buy that these don't constitute the regular case, then we may not want to focus on such corner cases when it comes to optimize for performance. More concretely, if typically for a ledger, there is one ensemble change or none, we really just need to copy the entries of the faulty bookie. We do need to make sure that network fluctuations do not cause system instability, though.

        Only the exceptional case is, say a Bookie has few ledgers which are successfully written and unfortunately the current ledger writing is getting timedout. The client would reform the ensemble and continue writing.
        Here, only this ledger to be considered as under replicated as it may endup with partial entries and not in the Bookie level?

        Writes that haven't been acknowledged are errored out and sent to the bookie replacement in the new ensemble, they don't get under replicated.

        Here, I have few concerns:

        As you pointed out needs to consider multiple crashes?
        Assume Bookie chain : BK1->BK2->BK3->BK4->BK5. Say, BK2 & BK3 dies. BK4 doesn't knows about BK2. It would be even more painful, if many consecutive failures.
        Say current ledger writing is getting timedout as mentioned above?
        Here, consider a case where intermittent n/w fluctuations.
        Watcher Bookie might be replica holder of that ledger.
        Assume Bookie chain : BK1->BK2->BK3->BK4->BK5. Say BK2 failed, BK3 would not be able to replicate the content as it may be an existing replica holder.

        There are two ways I see to improve the naive scheme I proposed:

        1. Each bookie can keep a snapshot of the bookie list at the time it decided which other bookie to watch. This way once a bookie receives a crash notification, it can verify which bookies are gone and replicate accordingly. In your first bullet, BK1 knows that it has to replicate both BK2 and BK3.
        2. Bookies can have multiple pointers and watch multiple nodes. For example, BK4 could watch both BK3 and BK5.

        I feel, under replica detection should be centralized.

        If I understand your scheme correctly, then it is not exactly centralized. An accountant could be any bookie and all bookies would bid for accountantship. With ZK leader election, you guarantee that only one takes over the role at a time. It does put the burden of the accountant on a single machine at a time, and I wonder if we can spread the responsibility across the available machines to balance load.

        On a side note, I can't recall right now, but I think the accountant is stateless, correct?

        I would like to know more on this. IMHO, avoid the reformation within a ledger and throws specific exception back to the client, so that he would close the ledger and creates a new one. Still client would be able get the ensemble reformation/dynamic bookies on ledger level. My idea is to simplify the ledger parsing for detecting under replica ledger entries and identifying target replica Bookies.

        It would be nice to have a mechanism to inform the application of changes to the system state, like ensemble changes. Right now we rely on error codes of operations, and in some cases, like ensemble changes, it is transparent.

        Some applications might not want to have entries spread across multiple bookies. They could for example turn off striping and prefer not to create another ledger instead of having an ensemble change.

        Show
        Flavio Junqueira added a comment - I feel, should consider all the corner cases since WALs are too costly. Also we would be able to showcase BK as an efficient WAL tool. Agreed, I'm not saying we should discard such cases, I'm just saying that I don't expect these to be the regular case. If you buy that these don't constitute the regular case, then we may not want to focus on such corner cases when it comes to optimize for performance. More concretely, if typically for a ledger, there is one ensemble change or none, we really just need to copy the entries of the faulty bookie. We do need to make sure that network fluctuations do not cause system instability, though. Only the exceptional case is, say a Bookie has few ledgers which are successfully written and unfortunately the current ledger writing is getting timedout. The client would reform the ensemble and continue writing. Here, only this ledger to be considered as under replicated as it may endup with partial entries and not in the Bookie level? Writes that haven't been acknowledged are errored out and sent to the bookie replacement in the new ensemble, they don't get under replicated. Here, I have few concerns: As you pointed out needs to consider multiple crashes? Assume Bookie chain : BK1->BK2->BK3->BK4->BK5. Say, BK2 & BK3 dies. BK4 doesn't knows about BK2. It would be even more painful, if many consecutive failures. Say current ledger writing is getting timedout as mentioned above? Here, consider a case where intermittent n/w fluctuations. Watcher Bookie might be replica holder of that ledger. Assume Bookie chain : BK1->BK2->BK3->BK4->BK5. Say BK2 failed, BK3 would not be able to replicate the content as it may be an existing replica holder. There are two ways I see to improve the naive scheme I proposed: Each bookie can keep a snapshot of the bookie list at the time it decided which other bookie to watch. This way once a bookie receives a crash notification, it can verify which bookies are gone and replicate accordingly. In your first bullet, BK1 knows that it has to replicate both BK2 and BK3. Bookies can have multiple pointers and watch multiple nodes. For example, BK4 could watch both BK3 and BK5. I feel, under replica detection should be centralized. If I understand your scheme correctly, then it is not exactly centralized. An accountant could be any bookie and all bookies would bid for accountantship. With ZK leader election, you guarantee that only one takes over the role at a time. It does put the burden of the accountant on a single machine at a time, and I wonder if we can spread the responsibility across the available machines to balance load. On a side note, I can't recall right now, but I think the accountant is stateless, correct? I would like to know more on this. IMHO, avoid the reformation within a ledger and throws specific exception back to the client, so that he would close the ledger and creates a new one. Still client would be able get the ensemble reformation/dynamic bookies on ledger level. My idea is to simplify the ledger parsing for detecting under replica ledger entries and identifying target replica Bookies. It would be nice to have a mechanism to inform the application of changes to the system state, like ensemble changes. Right now we rely on error codes of operations, and in some cases, like ensemble changes, it is transparent. Some applications might not want to have entries spread across multiple bookies. They could for example turn off striping and prefer not to create another ledger instead of having an ensemble change.
        Hide
        Rakesh R added a comment -

        If I understand your scheme correctly, then it is not exactly centralized. An accountant could be any bookie and all bookies would bid for accountantship. It does put the burden of the accountant on a single machine at a time, and I wonder if we can spread the responsibility across the available machines to balance load.

        Here, Accountant is light weight and internally one daemon inside the elected Bookie. It would use ZK watchers for knowing Bookie failures and timeouts from clients. (like how the ZK Leader will do). Also I feel, the level of concurrency would get reduced.

        On a side note, I can't recall right now, but I think the accountant is stateless, correct?

        Yes, Accountant is stateless, when it identifies any under replicated ledgers, he will put into corresponding ZK node and watchers inturn give rereplica notification to peer Bookies. Also, able to withstand Accountant failures and re-election.

        Bookies can have multiple pointers and watch multiple nodes.

        Here, who will be creating groups and also needs to consider the group reformation on failures.
        Also, should design multiple groups and pointers to withstand multipe crashes. Instead can we make it simple by choosing one guy for monitoring?

        Some applications might not want to have entries spread across multiple bookies. They could for example turn off striping and prefer not to create another ledger instead of having an ensemble change.

        If I understand correctly, you are suggesting to provide turn off striping and prefer to create another ledger instead of having an ensemble change. Still recovery logic should consider ensemble reformation.

        Why I am thinking to avoid ensemble reformation for each bookie down,

        1. When a slow replica goes down, if client reforms the ensemble, from which entry the new ensemble will be formed?
        2. When a bookie goes down, all the ledgers in that Bookie can be assigned to another Bookie if no reformation is allowed as the unit of replication. Otw I should go one more level down and parse each ensemble level within a ledger and has to be considered as the unit of replication. Also, the tracking(rereplication) needs to be at that level?
        Show
        Rakesh R added a comment - If I understand your scheme correctly, then it is not exactly centralized. An accountant could be any bookie and all bookies would bid for accountantship. It does put the burden of the accountant on a single machine at a time, and I wonder if we can spread the responsibility across the available machines to balance load. Here, Accountant is light weight and internally one daemon inside the elected Bookie. It would use ZK watchers for knowing Bookie failures and timeouts from clients. (like how the ZK Leader will do). Also I feel, the level of concurrency would get reduced. On a side note, I can't recall right now, but I think the accountant is stateless, correct? Yes, Accountant is stateless, when it identifies any under replicated ledgers, he will put into corresponding ZK node and watchers inturn give rereplica notification to peer Bookies. Also, able to withstand Accountant failures and re-election. Bookies can have multiple pointers and watch multiple nodes. Here, who will be creating groups and also needs to consider the group reformation on failures. Also, should design multiple groups and pointers to withstand multipe crashes. Instead can we make it simple by choosing one guy for monitoring? Some applications might not want to have entries spread across multiple bookies. They could for example turn off striping and prefer not to create another ledger instead of having an ensemble change. If I understand correctly, you are suggesting to provide turn off striping and prefer to create another ledger instead of having an ensemble change. Still recovery logic should consider ensemble reformation. Why I am thinking to avoid ensemble reformation for each bookie down, When a slow replica goes down, if client reforms the ensemble, from which entry the new ensemble will be formed? When a bookie goes down, all the ledgers in that Bookie can be assigned to another Bookie if no reformation is allowed as the unit of replication. Otw I should go one more level down and parse each ensemble level within a ledger and has to be considered as the unit of replication. Also, the tracking(rereplication) needs to be at that level?
        Hide
        Rakesh R added a comment -

        @Ivan +1 for opening sub jiras.
        Do I need to create the sub jiras?

        Show
        Rakesh R added a comment - @Ivan +1 for opening sub jiras. Do I need to create the sub jiras?
        Hide
        Ivan Kelly added a comment -

        @Rakesh, I'll create them soon.

        Show
        Ivan Kelly added a comment - @Rakesh, I'll create them soon.
        Hide
        Flavio Junqueira added a comment -

        I'm assuming that the discussion about whether we should have a single accountant entity still belongs in this jira, so I'll keep it here.

        Here, Accountant is light weight and internally one daemon inside the elected Bookie. It would use ZK watchers for knowing Bookie failures and timeouts from clients. (like how the ZK Leader will do). Also I feel, the level of concurrency would get reduced.

        I'm getting to realize that the main difference between what you're proposing and my half-baked proposal is that I'm trying to get rid of master accountant election and have each bookie individually figuring out what it has to replicate in the case of a crash. I believe that's the key difference.

        Here, who will be creating groups and also needs to consider the group reformation on failures.

        Also, should design multiple groups and pointers to withstand multiple crashes. Instead can we make it simple by choosing one guy for monitoring?

        My proposal is based on the recipe we have proposed and used to avoid the herd effect with zookeeper leader election. A naive way to do leader election with zk is to have everyone watching the leader znode. If the leader crashes, then everyone receives a notification, which is unnecessary in some cases.

        An alternative way is the following. When a client bids for leadership, it creates an ephemeral and sequential znode. To decide which znode to watch, a node gets the list of ephemerals and watches the one immediately before according to the sequence numbers. In this setting, upon a crash only one notification is generated.

        Here we can use a similar approach, each bookie watches say the predecessor and the successor, and rebuilds the links upon receiving notifications. I'm proposing predecessors and successors but in reality we can create links in any way you want. The important observation is that we can do it in a distributed manner.

        If I understand correctly, you are suggesting to provide turn off striping and prefer to create another ledger instead of having an ensemble change. Still recovery logic should consider ensemble reformation.

        My observation is that some applications might prefer not to have automatic ensemble healing. I was not proposing to remove the current scheme, not even change the default. I was just considering another option.

        One alternative that has been proposed and I found interesting is the one of notifying the application of exceptional events, like ensemble changes. Such a mechanism can also give the application the opportunity of closing the ledger if it chooses to.

        Show
        Flavio Junqueira added a comment - I'm assuming that the discussion about whether we should have a single accountant entity still belongs in this jira, so I'll keep it here. Here, Accountant is light weight and internally one daemon inside the elected Bookie. It would use ZK watchers for knowing Bookie failures and timeouts from clients. (like how the ZK Leader will do). Also I feel, the level of concurrency would get reduced. I'm getting to realize that the main difference between what you're proposing and my half-baked proposal is that I'm trying to get rid of master accountant election and have each bookie individually figuring out what it has to replicate in the case of a crash. I believe that's the key difference. Here, who will be creating groups and also needs to consider the group reformation on failures. Also, should design multiple groups and pointers to withstand multiple crashes. Instead can we make it simple by choosing one guy for monitoring? My proposal is based on the recipe we have proposed and used to avoid the herd effect with zookeeper leader election. A naive way to do leader election with zk is to have everyone watching the leader znode. If the leader crashes, then everyone receives a notification, which is unnecessary in some cases. An alternative way is the following. When a client bids for leadership, it creates an ephemeral and sequential znode. To decide which znode to watch, a node gets the list of ephemerals and watches the one immediately before according to the sequence numbers. In this setting, upon a crash only one notification is generated. Here we can use a similar approach, each bookie watches say the predecessor and the successor, and rebuilds the links upon receiving notifications. I'm proposing predecessors and successors but in reality we can create links in any way you want. The important observation is that we can do it in a distributed manner. If I understand correctly, you are suggesting to provide turn off striping and prefer to create another ledger instead of having an ensemble change. Still recovery logic should consider ensemble reformation. My observation is that some applications might prefer not to have automatic ensemble healing. I was not proposing to remove the current scheme, not even change the default. I was just considering another option. One alternative that has been proposed and I found interesting is the one of notifying the application of exceptional events, like ensemble changes. Such a mechanism can also give the application the opportunity of closing the ledger if it chooses to.
        Hide
        Rakesh R added a comment -

        I'm getting to realize that the main difference between what you're proposing and my half-baked proposal is that I'm trying to get rid of master accountant election and have each bookie individually figuring out what it has to replicate in the case of a crash. I believe that's the key difference.

        Also, should design multiple groups and pointers to withstand multiple crashes. Instead can we make it simple by choosing one guy for monitoring?

        I'm just attaching(Auto Recovery Detection - distributed chain approach.doc) my thoughts about, how does chaining based distributed approach works?. Hope you are also thinking about similar approach. Please review.

        Show
        Rakesh R added a comment - I'm getting to realize that the main difference between what you're proposing and my half-baked proposal is that I'm trying to get rid of master accountant election and have each bookie individually figuring out what it has to replicate in the case of a crash. I believe that's the key difference. Also, should design multiple groups and pointers to withstand multiple crashes. Instead can we make it simple by choosing one guy for monitoring? I'm just attaching(Auto Recovery Detection - distributed chain approach.doc) my thoughts about, how does chaining based distributed approach works?. Hope you are also thinking about similar approach. Please review.
        Hide
        Rakesh R added a comment -

        @Flavio @Ivan

        Handle multiple crashes:
        Soln-1: Each bookie will be looking to all and will act if my peer failed otw ignore, here it will have herd effect(everyone receives a notification).
        Soln-2: Central node will see the failures and notify the peer bookie.

        I feel Soln-2 is better and is used in 'Auto Recovery Detection - distributed chain approach.doc'

        Show
        Rakesh R added a comment - @Flavio @Ivan Handle multiple crashes: Soln-1: Each bookie will be looking to all and will act if my peer failed otw ignore, here it will have herd effect(everyone receives a notification). Soln-2: Central node will see the failures and notify the peer bookie. I feel Soln-2 is better and is used in 'Auto Recovery Detection - distributed chain approach.doc'
        Hide
        Flavio Junqueira added a comment - - edited

        Hi Rakesh, I've had a look at the document you uploaded. I like the approach in general, and I'd like to ask you for some clarifications:

        1. Just to confirm, elements in the myId list have to be deleted manually, yes? If a node is decommissioned, then I suppose we will want to delete from the list.
        2. In step 2 of the monitor (managing the chain), it says that the auditor notifies some other bookie that it needs to handle re-replication. How exactly does this notification happen? Bookies currently don't talk to each other directly. We would need to do this communication through zookeeper if we want to keep bookies decoupled.
        3. In the description of replicators, it says that nodes will compete for re-replication entries of a ledger. I like this approach because a bookie may refrain from bidding in the case it is overwhelmed. I couldn't understand though how the lock is created. The description says L00001_ip:port, but it is not clear if ip:port corresponds to the lock holder, in which case the lock znode wouldn't be unique.

        Also, this proposal is similar to what I discussed with Roger offline. The general idea that Roger proposed was to separate assignment of work from actually doing the work. Assigning the work is not a heavy task so it is ok to be done by a single process.

        Roger, do you have anything to add?

        Show
        Flavio Junqueira added a comment - - edited Hi Rakesh, I've had a look at the document you uploaded. I like the approach in general, and I'd like to ask you for some clarifications: Just to confirm, elements in the myId list have to be deleted manually, yes? If a node is decommissioned, then I suppose we will want to delete from the list. In step 2 of the monitor (managing the chain), it says that the auditor notifies some other bookie that it needs to handle re-replication. How exactly does this notification happen? Bookies currently don't talk to each other directly. We would need to do this communication through zookeeper if we want to keep bookies decoupled. In the description of replicators, it says that nodes will compete for re-replication entries of a ledger. I like this approach because a bookie may refrain from bidding in the case it is overwhelmed. I couldn't understand though how the lock is created. The description says L00001_ip:port, but it is not clear if ip:port corresponds to the lock holder, in which case the lock znode wouldn't be unique. Also, this proposal is similar to what I discussed with Roger offline. The general idea that Roger proposed was to separate assignment of work from actually doing the work. Assigning the work is not a heavy task so it is ok to be done by a single process. Roger, do you have anything to add?
        Hide
        Rakesh R added a comment -

        Thanks Flavio for the comments. I hope the following will give more idea.

        Just to confirm, elements in the myId list have to be deleted manually, yes? If a node is decommissioned, then I suppose we will want to delete from the list.

        Yes, since these are persistent nodes need to delete manually or will be able to think automatic deletion after the full re-replication of that failed/decommissioned bookie. I feel, here again chances of race conditions to be avoided, like garbage collector(GC: for deleting inactive id) decides to delete the node, meanwhile the failed bookie rejoins and acquire his 'MyId'. Now, if the GC is going ahead with 'MyId' deletion, will cause inconsistencies.

        In step 2 of the monitor (managing the chain), it says that the auditor notifies some other bookie that it needs to handle re-replication. How exactly does this notification happen? Bookies currently don't talk to each other directly. We would need to do this communication through zookeeper if we want to keep bookies decoupled.

        Oh, seems not explained clearly in the docs. Yes, I'm trying to use ZK based communication.
        As mentioned in the doc, Bookie will first create/acquire MyId and then he will be adding child watchers to his MyId. So, when the Auditor identifies failed bookie and adding its Id into the observer, in turn will notifies the observer bookie.

        Please see the following example:
        Monitor chain is 01 <- 02 <- 03 <- 04 <- 05 <- 06 <- 01. (<- symbol implies re-replica observer)
        Say, 3, 5, 6 gone down and 1, 2, 4 are alive.
        Auditor will be adding the failed bookie's Id to the observer bookie
        4/3, 6/5, 1/6.

        On child notification, 4 will start replication of 3.
        On child notification 1 will start re-replication of 6 and 1 will also need to check is there any children present under 6 and if exists he will take care that node also. Here the chances of duplicates like, when 1 starting the action of 5, now immediately 6 has started and he will also start acting. Since there is no central co-ordination exists.

        I'm thinking that, a bookie will start parsing and re-replication on:

        1. every child notification,
        2. also, on bookie startup, he will check any node exists under MyId for re-replication.

        The description says L00001_ip:port, but it is not clear if ip:port corresponds to the lock holder, in which case the lock znode wouldn't be unique

        How re-replication works?
        Re-replication cycle is shown below:

        1. All bookies will be watching children of '/ledgers/underreplicas'
        2. On child watch notification, read the children; Here the child znode name format is LedgerName_ip:port.
          Here the ip:port is the failed bookie which has the 'LedgerName' entries.
        3. All the live bookies will try creating ephemeral znode 'lock'(zk distributed locking) under the znode 'LedgerName_ip:port'
        4. Whoever succeeds will start re-replication, say BK_X
        5. All the others will start watching on '/ledgers/underreplicas/LedgerName_ip:port/lock'
        6. When the BK_X finished re-replication of LedgerName_ip:port, he will update the ledger metadata. Then, delete the LedgerName_ip:port if re-replication is fully over. Otw (assume not able to fully re-replicate, please refer the example in the doc), he will remove the 'lock' under '/ledgers/underreplicas/LedgerName_ip:port' and others will get the notification.
        7. On lock release/delete notification, again others will compete each other and this cycle continues till the complete re-replication.

        Assume 3's IP:PORT is 10.18.40.13:2167
        Take the above example, consider 3 has failed and 4 identifies the ledger 'L00001' has entries in 3.
        4 will create a znode like : '/ledgers/underreplicas/L00001_10.18.40.13:2167'

        Since all the live bookies are watching children of '/ledgers/underreplicas', every one will get the notification and acquiring lock for doing the re-replication. Only one bookie (say BK5) will be able to create ephemeral znode 'lock' under '/ledgers/underreplicas/L00001_10.18.40.13:2167', and tries re-replication. And all other bookies will add '/ledgers/underreplicas/L00001_10.18.40.13:2167/lock' watching to see the status of re-replication. Say after first round, if some more entires are still to be re-replicated, then the first replica(BK5) will update the ledger metadata(if any) and release the lock. Again this cycle continues till re-replication is fully over.

        Show
        Rakesh R added a comment - Thanks Flavio for the comments. I hope the following will give more idea. Just to confirm, elements in the myId list have to be deleted manually, yes? If a node is decommissioned, then I suppose we will want to delete from the list. Yes, since these are persistent nodes need to delete manually or will be able to think automatic deletion after the full re-replication of that failed/decommissioned bookie. I feel, here again chances of race conditions to be avoided, like garbage collector(GC: for deleting inactive id) decides to delete the node, meanwhile the failed bookie rejoins and acquire his 'MyId'. Now, if the GC is going ahead with 'MyId' deletion, will cause inconsistencies. In step 2 of the monitor (managing the chain), it says that the auditor notifies some other bookie that it needs to handle re-replication. How exactly does this notification happen? Bookies currently don't talk to each other directly. We would need to do this communication through zookeeper if we want to keep bookies decoupled. Oh, seems not explained clearly in the docs. Yes, I'm trying to use ZK based communication. As mentioned in the doc, Bookie will first create/acquire MyId and then he will be adding child watchers to his MyId. So, when the Auditor identifies failed bookie and adding its Id into the observer, in turn will notifies the observer bookie. Please see the following example: Monitor chain is 01 <- 02 <- 03 <- 04 <- 05 <- 06 <- 01. (<- symbol implies re-replica observer) Say, 3, 5, 6 gone down and 1, 2, 4 are alive. Auditor will be adding the failed bookie's Id to the observer bookie 4/3, 6/5, 1/6. On child notification, 4 will start replication of 3. On child notification 1 will start re-replication of 6 and 1 will also need to check is there any children present under 6 and if exists he will take care that node also. Here the chances of duplicates like, when 1 starting the action of 5, now immediately 6 has started and he will also start acting. Since there is no central co-ordination exists. I'm thinking that, a bookie will start parsing and re-replication on: every child notification, also, on bookie startup, he will check any node exists under MyId for re-replication. The description says L00001_ip:port, but it is not clear if ip:port corresponds to the lock holder, in which case the lock znode wouldn't be unique How re-replication works? Re-replication cycle is shown below: All bookies will be watching children of '/ledgers/underreplicas' On child watch notification, read the children; Here the child znode name format is LedgerName_ip:port. Here the ip:port is the failed bookie which has the 'LedgerName' entries. All the live bookies will try creating ephemeral znode 'lock'(zk distributed locking) under the znode 'LedgerName_ip:port' Whoever succeeds will start re-replication, say BK_X All the others will start watching on '/ledgers/underreplicas/LedgerName_ip:port/lock' When the BK_X finished re-replication of LedgerName_ip:port, he will update the ledger metadata. Then, delete the LedgerName_ip:port if re-replication is fully over. Otw (assume not able to fully re-replicate, please refer the example in the doc), he will remove the 'lock' under '/ledgers/underreplicas/LedgerName_ip:port' and others will get the notification. On lock release/delete notification, again others will compete each other and this cycle continues till the complete re-replication. Assume 3's IP:PORT is 10.18.40.13:2167 Take the above example, consider 3 has failed and 4 identifies the ledger 'L00001' has entries in 3. 4 will create a znode like : '/ledgers/underreplicas/L00001_10.18.40.13:2167' Since all the live bookies are watching children of '/ledgers/underreplicas', every one will get the notification and acquiring lock for doing the re-replication. Only one bookie (say BK5) will be able to create ephemeral znode 'lock' under '/ledgers/underreplicas/L00001_10.18.40.13:2167', and tries re-replication. And all other bookies will add '/ledgers/underreplicas/L00001_10.18.40.13:2167/lock' watching to see the status of re-replication. Say after first round, if some more entires are still to be re-replicated, then the first replica(BK5) will update the ledger metadata(if any) and release the lock. Again this cycle continues till re-replication is fully over.
        Hide
        Uma Maheswara Rao G added a comment -
        Now, if the GC is going ahead with 'MyId' deletion, will cause inconsistencies.
        

        Yes, Anayway we planned to revisit the GC, considering this approaches once finalized the approach. I think we should have some timeout here. Within that time line if node did not rejoins then GC will go ahead clean the data as replication already might have progressed by that time.

        Show
        Uma Maheswara Rao G added a comment - Now, if the GC is going ahead with 'MyId' deletion, will cause inconsistencies. Yes, Anayway we planned to revisit the GC, considering this approaches once finalized the approach. I think we should have some timeout here. Within that time line if node did not rejoins then GC will go ahead clean the data as replication already might have progressed by that time.
        Hide
        Uma Maheswara Rao G added a comment -

        Here is one more point to address, for example if two bookies goes from the ensemble. Lets assume, predecessor nodes taking the action of replication. There there will be a chance that they both may choose the same target for same entries replication right? We have to consider target choosing also. Since they both are working independently for replication they may not know, one guy already choosen him for replication target for particular entries.

        Show
        Uma Maheswara Rao G added a comment - Here is one more point to address, for example if two bookies goes from the ensemble. Lets assume, predecessor nodes taking the action of replication. There there will be a chance that they both may choose the same target for same entries replication right? We have to consider target choosing also. Since they both are working independently for replication they may not know, one guy already choosen him for replication target for particular entries.
        Hide
        Uma Maheswara Rao G added a comment -

        For work assignment, how about competing for getting the replication work. We already using this approach for Hbase for distributed log splitting. Idea is like below,

        Current distributed chain of watchers can identify the failure nodes and add at some place in ZK. All bookies can watch on that node. Whenever new failure node added, bookeies will get notification and they can start competing to get the work. Winner will take the replication work. Also they can update the state of the replication under that aquired lock node. If cluster restarts, Again bookies can participate in competetion to get the Failed nodes replication work. Whenever replication completes, they can delete the lock entry and failed bookie entry from ZK. Infact, in Hbase we have master co-ordination. But here we will be depending on distributed watching to identify filed bookies.
        @Rakesh/Flavio how about your thoughts on this?

        Show
        Uma Maheswara Rao G added a comment - For work assignment, how about competing for getting the replication work. We already using this approach for Hbase for distributed log splitting. Idea is like below, Current distributed chain of watchers can identify the failure nodes and add at some place in ZK. All bookies can watch on that node. Whenever new failure node added, bookeies will get notification and they can start competing to get the work. Winner will take the replication work. Also they can update the state of the replication under that aquired lock node. If cluster restarts, Again bookies can participate in competetion to get the Failed nodes replication work. Whenever replication completes, they can delete the lock entry and failed bookie entry from ZK. Infact, in Hbase we have master co-ordination. But here we will be depending on distributed watching to identify filed bookies. @Rakesh/Flavio how about your thoughts on this?
        Hide
        Uma Maheswara Rao G added a comment -

        Ah, I have seen 'How re-replication works?' section in above comment. It does the same way.

        Show
        Uma Maheswara Rao G added a comment - Ah, I have seen 'How re-replication works?' section in above comment. It does the same way.
        Hide
        Ivan Kelly added a comment -

        I think the chaining mechanism over-complicates things. In fact i don't think we should be bookie focused at all. Rather we should focus on the ledgers and keeping them fully replicated. If we detect underreplication for a ledger, we have detected the loss of the bookie anyhow.

        I propose an alternative approach.

        Each bookie has a Recovery worker running.
        Bookies elect a Auditor among themselves.

        Auditor

        • Scans the full list of ledgers periodically.
        • Builds an inmemory bookie -> ledger index
        • Watches /ledgers/available
        • Periodically scan all ledgers

        On bookie failure:

        • Get ledgers for bookies from index.
        • Scan each of these ledgers.

        Scanning a ledger will return a number of LedgerFragmentReplicas corresponding to a missing ledger fragment replica.
        These are stored in /ledgers/underreplicated/L<ledgerid>-E<startentry>-R<replicaindex>

        Recovery workers on each bookie reads list from /ledgers/underreplicated/, picks an entry, locks it and rereplicates.
        If a recovery worker crashes half way, its lock will evaporate, and the new recovery worker will be able to do the replication.

        Show
        Ivan Kelly added a comment - I think the chaining mechanism over-complicates things. In fact i don't think we should be bookie focused at all. Rather we should focus on the ledgers and keeping them fully replicated. If we detect underreplication for a ledger, we have detected the loss of the bookie anyhow. I propose an alternative approach. Each bookie has a Recovery worker running. Bookies elect a Auditor among themselves. Auditor Scans the full list of ledgers periodically. Builds an inmemory bookie -> ledger index Watches /ledgers/available Periodically scan all ledgers On bookie failure: Get ledgers for bookies from index. Scan each of these ledgers. Scanning a ledger will return a number of LedgerFragmentReplicas corresponding to a missing ledger fragment replica. These are stored in /ledgers/underreplicated/L<ledgerid>-E<startentry>-R<replicaindex> Recovery workers on each bookie reads list from /ledgers/underreplicated/, picks an entry, locks it and rereplicates. If a recovery worker crashes half way, its lock will evaporate, and the new recovery worker will be able to do the replication.
        Hide
        Ivan Kelly added a comment - - edited

        In fact, this scheme could be made 100% distributed, by having an Auditor on each bookie, and having that auditor only monitor the ledgers which it knows that bookie contains. What may be tricky here is avoiding having all bookies in an ensemble run a scan on a single ledger at the same time.

        Show
        Ivan Kelly added a comment - - edited In fact, this scheme could be made 100% distributed, by having an Auditor on each bookie, and having that auditor only monitor the ledgers which it knows that bookie contains. What may be tricky here is avoiding having all bookies in an ensemble run a scan on a single ledger at the same time.
        Hide
        Flavio Junqueira added a comment -

        Ivan, Your proposed approach sounds like a more detailed version of what I posted before after a discussion with Roger:

        https://issues.apache.org/jira/browse/BOOKKEEPER-237?focusedCommentId=13281048&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13281048

        Having the central auditor would make it simpler to balance the load across the bookies, but I don't completely dislike your distributed proposal. In general, I prefer symmetric approaches as I have stated before.

        Show
        Flavio Junqueira added a comment - Ivan, Your proposed approach sounds like a more detailed version of what I posted before after a discussion with Roger: https://issues.apache.org/jira/browse/BOOKKEEPER-237?focusedCommentId=13281048&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13281048 Having the central auditor would make it simpler to balance the load across the bookies, but I don't completely dislike your distributed proposal. In general, I prefer symmetric approaches as I have stated before.
        Hide
        Ivan Kelly added a comment -

        In the completely distributed auditor, the auditor:

        • builds a list of fragments it is participating in
        • from this list, build a bookie -> fragment index
        • watches /ledgers/available
        • when a bookie fails, scan all fragments of that bookie[1]
        • periodically scan fragments

        [1] this can be the naive first approach. there'll be duplication of scanning here from all the bookies in the fragment ensemble, but these reads should be fast, as the first read should pull the entries into cache. We can think of removing the duplication at a later date if we want.

        Show
        Ivan Kelly added a comment - In the completely distributed auditor, the auditor: builds a list of fragments it is participating in from this list, build a bookie -> fragment index watches /ledgers/available when a bookie fails, scan all fragments of that bookie [1] periodically scan fragments [1] this can be the naive first approach. there'll be duplication of scanning here from all the bookies in the fragment ensemble, but these reads should be fast, as the first read should pull the entries into cache. We can think of removing the duplication at a later date if we want.
        Hide
        Flavio Junqueira added a comment -
        • builds a list of fragments it is participating in
        • from this list, build a bookie -> fragment index

        I'm not sure what you mean here. Do you mean to say a list of ledgers the bookie is participating in? Perhaps a concrete example would help. Say we have a e3-q2 ledger. Each bookie will be watching all others? If so, for a large number of ledgers, we might end up having all bookies watching everyone else, since we are likely to have every pair of bookies together in at least one ledger.

        I was also thinking that once we detect the crash, we need to decide where to rebuild each ledger fragment. An elected auditor might enable a simpler way to ensure that the load is evenly balanced across bookies. In a distributed manner, it might not be simple and I would have to think about an algorithm if no one else has one at hand.

        Show
        Flavio Junqueira added a comment - builds a list of fragments it is participating in from this list, build a bookie -> fragment index I'm not sure what you mean here. Do you mean to say a list of ledgers the bookie is participating in? Perhaps a concrete example would help. Say we have a e3-q2 ledger. Each bookie will be watching all others? If so, for a large number of ledgers, we might end up having all bookies watching everyone else, since we are likely to have every pair of bookies together in at least one ledger. I was also thinking that once we detect the crash, we need to decide where to rebuild each ledger fragment. An elected auditor might enable a simpler way to ensure that the load is evenly balanced across bookies. In a distributed manner, it might not be simple and I would have to think about an algorithm if no one else has one at hand.
        Hide
        Rakesh R added a comment -

        @Ivan, @Flavio
        Please go through the latest patch in BOOKKEEPER-272 for the detecting bookie failure and would like to know suggestions.

        Show
        Rakesh R added a comment - @Ivan, @Flavio Please go through the latest patch in BOOKKEEPER-272 for the detecting bookie failure and would like to know suggestions.
        Hide
        Ivan Kelly added a comment -

        @Flavio,
        A ledger is made of fragments; a fragment has a start id and an ensemble of bookies. A bookie is participating in a fragment if it is in this ensemble of bookies. Say we have bookies bA,bB,bC,bD,bE and ledgers 1-5, each with one fragment. The ledger fragments are.
        F1: Ledger 1 - Entry1 - bD, bE, bC
        F2: Ledger 2 - Entry1 - bE, bA, bC
        F3: Ledger 3 - Entry1 - bD, bB, bC
        F4: Ledger 4 - Entry1 - bA, bB, bE
        F5: Ledger 5 - Entry1 - bE, bC, bD

        bA gets the list of fragments it participates in, F2 & F4, from this it builds the fragment index,
        bB -> F4
        bC -> F2
        bE -> F2, F4

        bA watches /ledger/available for bookies disappearing.
        bE disappears.
        bA sees that bE disappears, and runs a check on F2 and F4. It finds the bE replica is missing for each, so adds an underreplicated znode for it.

        re: rebuilding, the loop of the recovery worker on each bookie can look like.

        while (true) {
           pickUnderreplicatedFragmentFromList();
           rereplicate();
        }
        

        A single bookie will only be rereplicating a single fragment at a time. As all bookies will be running the recovery worker, this automatically load balances.

        @Rakesh
        I was actually going through your patch when I came up with this. Will go back to looking at it now. I think there's a good bit of crossover.

        Show
        Ivan Kelly added a comment - @Flavio, A ledger is made of fragments; a fragment has a start id and an ensemble of bookies. A bookie is participating in a fragment if it is in this ensemble of bookies. Say we have bookies bA,bB,bC,bD,bE and ledgers 1-5, each with one fragment. The ledger fragments are. F1: Ledger 1 - Entry1 - bD, bE, bC F2: Ledger 2 - Entry1 - bE, bA, bC F3: Ledger 3 - Entry1 - bD, bB, bC F4: Ledger 4 - Entry1 - bA, bB, bE F5: Ledger 5 - Entry1 - bE, bC, bD bA gets the list of fragments it participates in, F2 & F4, from this it builds the fragment index, bB -> F4 bC -> F2 bE -> F2, F4 bA watches /ledger/available for bookies disappearing. bE disappears. bA sees that bE disappears, and runs a check on F2 and F4. It finds the bE replica is missing for each, so adds an underreplicated znode for it. re: rebuilding, the loop of the recovery worker on each bookie can look like. while ( true ) { pickUnderreplicatedFragmentFromList(); rereplicate(); } A single bookie will only be rereplicating a single fragment at a time. As all bookies will be running the recovery worker, this automatically load balances. @Rakesh I was actually going through your patch when I came up with this. Will go back to looking at it now. I think there's a good bit of crossover.
        Hide
        Flavio Junqueira added a comment -

        It sounds we are not agreeing on terminology. A ledger fragment is the sequence of entries an individual bookie stores for a given ledger. Say we have a e3-q2 ledger. If the writer writes entries A, B, C, we have the following fragments:

        • For bookie 1: A, C
        • For bookie 2: A, B
        • For bookie 3: B, C

        This is the terminology we have been using from the beginning. If there is any documentation that could be misleading, it would be good to fix. Now, I don't think we have agreed upon terminology to refer to the segment of a ledger mapping to a given ensemble. I personally have been calling it an ensemble view to refer to the ensemble composition and the ledger segment. Views is terminology borrowed from distributed computing work on virtual synchrony and group communication. The concept here is comparable.

        Show
        Flavio Junqueira added a comment - It sounds we are not agreeing on terminology. A ledger fragment is the sequence of entries an individual bookie stores for a given ledger. Say we have a e3-q2 ledger. If the writer writes entries A, B, C, we have the following fragments: For bookie 1: A, C For bookie 2: A, B For bookie 3: B, C This is the terminology we have been using from the beginning. If there is any documentation that could be misleading, it would be good to fix. Now, I don't think we have agreed upon terminology to refer to the segment of a ledger mapping to a given ensemble. I personally have been calling it an ensemble view to refer to the ensemble composition and the ledger segment. Views is terminology borrowed from distributed computing work on virtual synchrony and group communication. The concept here is comparable.
        Hide
        Ivan Kelly added a comment -

        I don't like ensemble view. There's nothing in it to attach it to the concept of being part of a ledger. Unless we call is ledger ensemble view, which is a bit too long. How about ledger slice?

        Show
        Ivan Kelly added a comment - I don't like ensemble view. There's nothing in it to attach it to the concept of being part of a ledger. Unless we call is ledger ensemble view, which is a bit too long. How about ledger slice?
        Hide
        Flavio Junqueira added a comment -

        Ledger slice alone does not capture the idea that there is a sequence of ensembles over time. Views are supposed to capture this idea of changes to the group of processes executing a distributed computation. It is true, though, that "ensemble view" doesn't say anything about the ledger content itself, and consequently an expression like "ledger slice" or "ledger segment" complements the notion of views. I have a slight preference for segment.

        Show
        Flavio Junqueira added a comment - Ledger slice alone does not capture the idea that there is a sequence of ensembles over time. Views are supposed to capture this idea of changes to the group of processes executing a distributed computation. It is true, though, that "ensemble view" doesn't say anything about the ledger content itself, and consequently an expression like "ledger slice" or "ledger segment" complements the notion of views. I have a slight preference for segment.
        Hide
        Ivan Kelly added a comment -

        Segment is good. Lets go with that.

        Show
        Ivan Kelly added a comment - Segment is good. Lets go with that.
        Hide
        Uma Maheswara Rao G added a comment -

        Latest doc 'updated to the current state' attached here as well for user reference.

        Show
        Uma Maheswara Rao G added a comment - Latest doc 'updated to the current state' attached here as well for user reference.
        Hide
        Uma Maheswara Rao G added a comment -

        Also updated the current state in Wiki

        Show
        Uma Maheswara Rao G added a comment - Also updated the current state in Wiki
        Hide
        Uma Maheswara Rao G added a comment -

        Can we close this JIRA as implemented? All sub tasks are in closed state now!

        Show
        Uma Maheswara Rao G added a comment - Can we close this JIRA as implemented? All sub tasks are in closed state now!
        Hide
        Flavio Junqueira added a comment -

        Awesome job, guys! Resolving as implemented.

        Show
        Flavio Junqueira added a comment - Awesome job, guys! Resolving as implemented.

          People

          • Assignee:
            Rakesh R
            Reporter:
            Rakesh R
          • Votes:
            1 Vote for this issue
            Watchers:
            14 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development