HBase
  1. HBase
  2. HBASE-2357

Coprocessors: Add read-only region replicas (slaves) for availability and fast region recovery

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Won't Fix
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: master, regionserver
    • Labels:
      None

      Description

      I dont plan on working on this in the short term, but the idea is to extend region ownership to have two modes. Each region has one primary region server and N slave region servers. The slaves would follow the master (probably by streaming the relevant HLog entries directly from it) and be able to serve stale reads. The benefit is twofold: (a) provides the ability to spread read load, (b) enables very fast region failover/rebalance since the memstore is already nearly up to date on the slave RS.

        Issue Links

          Activity

          Hide
          Andrew Purtell added a comment -

          This has been effectively superseded by HBASE-10070

          Show
          Andrew Purtell added a comment - This has been effectively superseded by HBASE-10070
          Hide
          Jason Rutherglen added a comment -

          In a discussion about read replicas, I don't think the push model works, because it's difficult for the master to determine where a slave is at in downloading a stream of events. Instead the slaves can read off the queue (per region)? A slave is behind when it's sequence ID is behind the last item in the queue?

          I think what's nice is HBase seems to have built in conflict resolution. However on the slave will a Put use a local timestamp or the one on the master?

          Show
          Jason Rutherglen added a comment - In a discussion about read replicas, I don't think the push model works, because it's difficult for the master to determine where a slave is at in downloading a stream of events. Instead the slaves can read off the queue (per region)? A slave is behind when it's sequence ID is behind the last item in the queue? I think what's nice is HBase seems to have built in conflict resolution. However on the slave will a Put use a local timestamp or the one on the master?
          Hide
          Jason Rutherglen added a comment -

          That is not how HBase RPC works. One connection between the endpoints (in this case regionserver and regionserver) is established upon demand, reaped when idle too long, and multiplexed over in the meantime.

          Ok, great. The replication master will need to examine ZK, and find out which slaves to RPC connect to.

          And spill the queues under heap pressure presumably. Or give up on a too laggy slave and have it killed to avoid blowing out heap in the alternative

          Spilling would probably add too much complexity (eg, where would it spill to?). I think we need to define how a slave gets too far behind, and then assume it'll need to refresh itself when it does (on a per-region basis, or does the entire RS need to recover?).

          No we need to think of the HMaster as always on the verge of going away to be supplanted by ZooKeeper mediated distributed actions.

          Ok, good to know. Before implementing we should hammer this part of the design out.

          Show
          Jason Rutherglen added a comment - That is not how HBase RPC works. One connection between the endpoints (in this case regionserver and regionserver) is established upon demand, reaped when idle too long, and multiplexed over in the meantime. Ok, great. The replication master will need to examine ZK, and find out which slaves to RPC connect to. And spill the queues under heap pressure presumably. Or give up on a too laggy slave and have it killed to avoid blowing out heap in the alternative Spilling would probably add too much complexity (eg, where would it spill to?). I think we need to define how a slave gets too far behind, and then assume it'll need to refresh itself when it does (on a per-region basis, or does the entire RS need to recover?). No we need to think of the HMaster as always on the verge of going away to be supplanted by ZooKeeper mediated distributed actions. Ok, good to know. Before implementing we should hammer this part of the design out.
          Hide
          Andrew Purtell added a comment -

          CP Endpoints operate over HBase RPC.

          Sounds like the basic design is there are N slaves that connect to one master using a socket based protocol. There will be a socket connection open per-region per slave.

          That is not how HBase RPC works. One connection between the endpoints (in this case regionserver and regionserver) is established upon demand, reaped when idle too long, and multiplexed over in the meantime.

          The Coprocessor will place edits into a per region queue

          And spill the queues under heap pressure presumably. Or give up on a too laggy slave and have it killed to avoid blowing out heap in the alternative.

          Or should it function in the HMaster?

          No we need to think of the HMaster as always on the verge of going away to be supplanted by ZooKeeper mediated distributed actions.

          Show
          Andrew Purtell added a comment - CP Endpoints operate over HBase RPC. Sounds like the basic design is there are N slaves that connect to one master using a socket based protocol. There will be a socket connection open per-region per slave. That is not how HBase RPC works. One connection between the endpoints (in this case regionserver and regionserver) is established upon demand, reaped when idle too long, and multiplexed over in the meantime. The Coprocessor will place edits into a per region queue And spill the queues under heap pressure presumably. Or give up on a too laggy slave and have it killed to avoid blowing out heap in the alternative. Or should it function in the HMaster? No we need to think of the HMaster as always on the verge of going away to be supplanted by ZooKeeper mediated distributed actions.
          Hide
          Jason Rutherglen added a comment -

          Sounds like the basic design is there are N slaves that connect to one master using a socket based protocol. There will be a socket connection open per-region per slave. The Coprocessor will place edits into a per region queue, and a separate thread will write the edits onto the slave socket connections.

          How will this look in Zookeeper? Or should it function in the HMaster?

          Show
          Jason Rutherglen added a comment - Sounds like the basic design is there are N slaves that connect to one master using a socket based protocol. There will be a socket connection open per-region per slave. The Coprocessor will place edits into a per region queue, and a separate thread will write the edits onto the slave socket connections. How will this look in Zookeeper? Or should it function in the HMaster?
          Hide
          Andrew Purtell added a comment -

          I'd agree. If a slave finds a replica too far behind or desynchronized due to error the replica should be torn down.

          General recovery is then replacing a failed replica on a new slave elsewhere.

          Show
          Andrew Purtell added a comment - I'd agree. If a slave finds a replica too far behind or desynchronized due to error the replica should be torn down. General recovery is then replacing a failed replica on a new slave elsewhere.
          Hide
          Jason Rutherglen added a comment -

          @Andrew What happens if the queue is drained and the client is well behind? I think it then falls into general recovery?

          Show
          Jason Rutherglen added a comment - @Andrew What happens if the queue is drained and the client is well behind? I think it then falls into general recovery?
          Hide
          Andrew Purtell added a comment -

          I think the MySQL approach is the slave(s) connect to the master, then read the transaction log starting from a given sequence id. The Coprocessor doesn't enable that?

          Coprocessors can register arbitrary RPC endpoints, so yes slaves can contact the CP on the master to drain their respective queues in a pull model.

          Show
          Andrew Purtell added a comment - I think the MySQL approach is the slave(s) connect to the master, then read the transaction log starting from a given sequence id. The Coprocessor doesn't enable that? Coprocessors can register arbitrary RPC endpoints, so yes slaves can contact the CP on the master to drain their respective queues in a pull model.
          Hide
          Jason Rutherglen added a comment -

          A solution is for the master to stream edits to slaves from Put, Delete, etc. post hooks via synchronous replication (or ZAB transaction). Could also be via asynchronously drained replication queues that don't block the current client operation unless full, but one should worry about increasing heap pressure

          Maybe we should call this 'push' based Coprocessor replication. A queue would probably be necessary, as if a slave server goes down, we'd want to mitigate the errant network calls. Would the push be multi-threaded?

          I think the MySQL approach is the slave(s) connect to the master, then read the transaction log starting from a given sequence id. The Coprocessor doesn't enable that?

          Show
          Jason Rutherglen added a comment - A solution is for the master to stream edits to slaves from Put, Delete, etc. post hooks via synchronous replication (or ZAB transaction). Could also be via asynchronously drained replication queues that don't block the current client operation unless full, but one should worry about increasing heap pressure Maybe we should call this 'push' based Coprocessor replication. A queue would probably be necessary, as if a slave server goes down, we'd want to mitigate the errant network calls. Would the push be multi-threaded? I think the MySQL approach is the slave(s) connect to the master, then read the transaction log starting from a given sequence id. The Coprocessor doesn't enable that?
          Hide
          Andrew Purtell added a comment -

          I do have mixed feelings. Slaves would need to access foreign store files for regions that are not open on the RS. So then tailing HLogs, more foreign files, at the slave is not unreasonable. But that is a major violation of assumptions that store files are private. Sharing store files will require a coordination dance between master and slaves upon compaction and flushes. Sharing active HLogs is more evil given the master may become involved.

          Also, the trouble with watching the WAL either on the slave side at the file or on the master side with WALObserver is that .writeToWAL(false) edits will be unnoticed until flush. I'd like to reevaluation if this limitation could be acceptable. Your thoughts? A solution is for the master to stream edits to slaves from Put, Delete, etc. post hooks via synchronous replication (or ZAB transaction). Could also be via asynchronously drained replication queues that don't block the current client operation unless full, but one should worry about increasing heap pressure.

          Show
          Andrew Purtell added a comment - I do have mixed feelings. Slaves would need to access foreign store files for regions that are not open on the RS. So then tailing HLogs, more foreign files, at the slave is not unreasonable. But that is a major violation of assumptions that store files are private. Sharing store files will require a coordination dance between master and slaves upon compaction and flushes. Sharing active HLogs is more evil given the master may become involved. Also, the trouble with watching the WAL either on the slave side at the file or on the master side with WALObserver is that .writeToWAL(false) edits will be unnoticed until flush. I'd like to reevaluation if this limitation could be acceptable. Your thoughts? A solution is for the master to stream edits to slaves from Put, Delete, etc. post hooks via synchronous replication (or ZAB transaction). Could also be via asynchronously drained replication queues that don't block the current client operation unless full, but one should worry about increasing heap pressure.
          Hide
          Andrew Purtell added a comment -

          Andrew, looping on the HLog sounds good.

          -1

          Directly accessing HFiles from a coprocessor should be discouraged, this is something I've seen have general agreement in discussions where it comes up. We created the WALObserver interface for exactly the purpose of capturing (and/or altering) the stream of edits going to the WAL.

          Show
          Andrew Purtell added a comment - Andrew, looping on the HLog sounds good. -1 Directly accessing HFiles from a coprocessor should be discouraged, this is something I've seen have general agreement in discussions where it comes up. We created the WALObserver interface for exactly the purpose of capturing (and/or altering) the stream of edits going to the WAL.
          Hide
          Jason Rutherglen added a comment -

          Andrew, looping on the HLog sounds good. I guess the next thing to conclude is how replication is defined in Zookeeper? Should we implement something similar to HBASE-1295 or change that system to accommodate master -> slave(s)?

          Show
          Jason Rutherglen added a comment - Andrew, looping on the HLog sounds good. I guess the next thing to conclude is how replication is defined in Zookeeper? Should we implement something similar to HBASE-1295 or change that system to accommodate master -> slave(s)?
          Hide
          Andrew Purtell added a comment -

          Another way to implement this functionality is for the slave(s) to loop on the HLog.Reader?

          Yes.

          Are there any potential problems with that?

          Like with my above "first cut" proposal to scan HLogs upon roll, it would miss anything not .writeToWAL(true).

          I'm not sure how the Coprocessor implementation would look, would the master push entries out?

          Yes, it would either stream updates out of all hooks for mutations or run a consensus protocol in parallel with WAL commit out of the same.

          Isn't that somewhat problematic, eg, when a slave goes down, an entry isn't sent or is skipped?

          With simple streaming, when a slave goes down its replica becomes invalid and should be simply discarded. So then I suppose there will be a period of time after that happens, when a new slave is allocated and is behind until the master sends over all the memstore. With ZAB, a transaction log and updates from peers is part of the protocol.

          Show
          Andrew Purtell added a comment - Another way to implement this functionality is for the slave(s) to loop on the HLog.Reader? Yes. Are there any potential problems with that? Like with my above "first cut" proposal to scan HLogs upon roll, it would miss anything not .writeToWAL(true). I'm not sure how the Coprocessor implementation would look, would the master push entries out? Yes, it would either stream updates out of all hooks for mutations or run a consensus protocol in parallel with WAL commit out of the same. Isn't that somewhat problematic, eg, when a slave goes down, an entry isn't sent or is skipped? With simple streaming, when a slave goes down its replica becomes invalid and should be simply discarded. So then I suppose there will be a period of time after that happens, when a new slave is allocated and is behind until the master sends over all the memstore. With ZAB, a transaction log and updates from peers is part of the protocol.
          Hide
          Jason Rutherglen added a comment -

          Another way to implement this functionality is for the slave(s) to loop on the HLog.Reader? Are there any potential problems with that?

          I'm not sure how the Coprocessor implementation would look, would the master push entries out? Isn't that somewhat problematic, eg, when a slave goes down, an entry isn't sent or is skipped?

          Show
          Jason Rutherglen added a comment - Another way to implement this functionality is for the slave(s) to loop on the HLog.Reader? Are there any potential problems with that? I'm not sure how the Coprocessor implementation would look, would the master push entries out? Isn't that somewhat problematic, eg, when a slave goes down, an entry isn't sent or is skipped?
          Hide
          Jason Rutherglen added a comment -

          @Andrew The ZAB would be very cool, as then there wouldn't be a need for too much logic when a master fails? However I wonder about the write performance, as it means additional network overhead (to each node) per write?

          Initial thoughts on this is a region slave can get notice from the region owner via zk that a log has rolled and process the new edits from there

          What is the expected latency between a write and then reading the new value(s) from the slave? I'm not sure if this means writing a series of WAL edits to a file, then waiting for the file to reach a given limit , and then the slave reads from the newly flushed log in HDFS? If this is the case, perhaps we'll want to implement replication that is more immediate (like MySQL)?

          Show
          Jason Rutherglen added a comment - @Andrew The ZAB would be very cool, as then there wouldn't be a need for too much logic when a master fails? However I wonder about the write performance, as it means additional network overhead (to each node) per write? Initial thoughts on this is a region slave can get notice from the region owner via zk that a log has rolled and process the new edits from there What is the expected latency between a write and then reading the new value(s) from the slave? I'm not sure if this means writing a series of WAL edits to a file, then waiting for the file to reach a given limit , and then the slave reads from the newly flushed log in HDFS? If this is the case, perhaps we'll want to implement replication that is more immediate (like MySQL)?
          Hide
          Andrew Purtell added a comment -

          @Jason No, no design doc yet. I mean to do one when I can get a suitable block of time for this.

          ZAB is not necessary for basic read replicas that sync "eventually", basic MySQL-like master-slave. That would be the first step of course since most would only need that. Initial thoughts on this is a region slave can get notice from the region owner via zk that a log has rolled and process the new edits from there. Slaves will be under different memory pressure for their mix of regions than the owner, is the only significant detail to work through I think. So for this possibly shadow/temporary flush file storage for slaves that are managing shadow memstores, while sharing the permanent store files with the owner. Also need some zk-mediated coordination around splitting and compaction. Preferably the owner can do splits and compactions leaving the shared store files alone to the last possible moment, then do a change notification via zk and a HDFS rename. And, when all slaves have stopped sharing old storefiles, then garbage collection.

          ZAB would be for a next step, getting cliques to all see and agree upon edits coming in, in effect master-master-master replication. This is blue sky stuff. Regions would have higher availability than single region server hosting, yet all clients would have a consistent view of the data contained therein at any moment. However a region would need be deployed to 2N+1 regionservers, where N is the number of expected concurrent node failures, or it would not be writable as long as lacking quorum.

          Show
          Andrew Purtell added a comment - @Jason No, no design doc yet. I mean to do one when I can get a suitable block of time for this. ZAB is not necessary for basic read replicas that sync "eventually", basic MySQL-like master-slave. That would be the first step of course since most would only need that. Initial thoughts on this is a region slave can get notice from the region owner via zk that a log has rolled and process the new edits from there. Slaves will be under different memory pressure for their mix of regions than the owner, is the only significant detail to work through I think. So for this possibly shadow/temporary flush file storage for slaves that are managing shadow memstores, while sharing the permanent store files with the owner. Also need some zk-mediated coordination around splitting and compaction. Preferably the owner can do splits and compactions leaving the shared store files alone to the last possible moment, then do a change notification via zk and a HDFS rename. And, when all slaves have stopped sharing old storefiles, then garbage collection. ZAB would be for a next step, getting cliques to all see and agree upon edits coming in, in effect master-master-master replication. This is blue sky stuff. Regions would have higher availability than single region server hosting, yet all clients would have a consistent view of the data contained therein at any moment. However a region would need be deployed to 2N+1 regionservers, where N is the number of expected concurrent node failures, or it would not be writable as long as lacking quorum.
          Hide
          Jason Rutherglen added a comment -

          @Andrew Can Zookeeper be used (as is) to elect a master (eg, why is ZAB necessary)? Is there a solidified design for this issue? I think simply using the MySQL replication paradigm is sufficient for the first implementation?

          Show
          Jason Rutherglen added a comment - @Andrew Can Zookeeper be used (as is) to elect a master (eg, why is ZAB necessary)? Is there a solidified design for this issue? I think simply using the MySQL replication paradigm is sufficient for the first implementation?
          Hide
          Andrew Purtell added a comment -

          Looks like someone may have extracted ZAB some time around the 3.1.0 timeframe: https://svn.cs.hmc.edu/svn/linkedin08/zab-multibranch/

          Show
          Andrew Purtell added a comment - Looks like someone may have extracted ZAB some time around the 3.1.0 timeframe: https://svn.cs.hmc.edu/svn/linkedin08/zab-multibranch/
          Hide
          Andrew Purtell added a comment -

          I just committed to doing this (eventually) up on Quora so I guess I better own it.

          Show
          Andrew Purtell added a comment - I just committed to doing this (eventually) up on Quora so I guess I better own it.
          Hide
          Lars George added a comment -

          Ah this is nice! I had asked this many times and insinuated something like that to avoid that dreaded "region is a goner for a while until redeployed" in high availability environments. Using a consensus brings us into the realm of using a Dynamo like RegionServer architecture. With all the pros and cons, the latter being if a strict consistency is asked for then you pay a performance penalty. That is the case with any other open source projects implementing "R+W>N". Can't we employ ZooKeeper for this somehow?

          I love it!

          Show
          Lars George added a comment - Ah this is nice! I had asked this many times and insinuated something like that to avoid that dreaded "region is a goner for a while until redeployed" in high availability environments. Using a consensus brings us into the realm of using a Dynamo like RegionServer architecture. With all the pros and cons, the latter being if a strict consistency is asked for then you pay a performance penalty. That is the case with any other open source projects implementing "R+W>N". Can't we employ ZooKeeper for this somehow? I love it!
          Hide
          Todd Lipcon added a comment -

          cool! looking forward to seeing it!

          Show
          Todd Lipcon added a comment - cool! looking forward to seeing it!
          Hide
          Andrew Purtell added a comment -

          We are going to attempt this with coprocessors.

          Show
          Andrew Purtell added a comment - We are going to attempt this with coprocessors.
          Hide
          Andrew Purtell added a comment -

          Minor clarification

          But with a consensus protocol, read load can be spread as is the intent of this issue and yet the data is still strongly consistent

          at any time on any region server hosting the region (or a replica)

          so an ICV is atomic no matter what region server any client is talking to, for example.

          Show
          Andrew Purtell added a comment - Minor clarification But with a consensus protocol, read load can be spread as is the intent of this issue and yet the data is still strongly consistent at any time on any region server hosting the region (or a replica) so an ICV is atomic no matter what region server any client is talking to, for example.
          Hide
          Andrew Purtell added a comment -

          Writes would be blocked by the slowest of the clique but if this scheme is allowing (strongly consistent!) read load to be more spread out, then in theory anyway the probability of hot accesses to a particular region server starving the write side is lowered accordingly. We could mock it and see what happens and/or try to work through some of the particulars formally. Like Ryan I wonder how slow updates might get. Consider if we run ZAB on a 3-node clique and hflush in parallel to commit with a barrier on completion of both. Who wins the race? How often would hflush take longer? Could be a substantial percentage, especially in a mixed HBase and HDFS (plain mapreduce or Hive or Pig or Cascading or...) loaded environment. It's not clear that hflush would not dominate, is my point.

          What I don't like about log shipping is the read replicas are not going to be useful to someone who is using HBase for its strong consistency and needs it, with exception for use cases where one could accept consistent results looking back from the timestamp of the last replication. (But that timestamp could be different on each slave, so master and slaves might all have different views!) But with a consensus protocol, read load can be spread as is the intent of this issue and yet the data is still strongly consistent.

          So I might humbly suggest that both ideas have pros and cons and neither warrants a -1 nor a +1 at this point, IMO.

          Show
          Andrew Purtell added a comment - Writes would be blocked by the slowest of the clique but if this scheme is allowing (strongly consistent!) read load to be more spread out, then in theory anyway the probability of hot accesses to a particular region server starving the write side is lowered accordingly. We could mock it and see what happens and/or try to work through some of the particulars formally. Like Ryan I wonder how slow updates might get. Consider if we run ZAB on a 3-node clique and hflush in parallel to commit with a barrier on completion of both. Who wins the race? How often would hflush take longer? Could be a substantial percentage, especially in a mixed HBase and HDFS (plain mapreduce or Hive or Pig or Cascading or...) loaded environment. It's not clear that hflush would not dominate, is my point. What I don't like about log shipping is the read replicas are not going to be useful to someone who is using HBase for its strong consistency and needs it, with exception for use cases where one could accept consistent results looking back from the timestamp of the last replication. (But that timestamp could be different on each slave, so master and slaves might all have different views!) But with a consensus protocol, read load can be spread as is the intent of this issue and yet the data is still strongly consistent. So I might humbly suggest that both ideas have pros and cons and neither warrants a -1 nor a +1 at this point, IMO.
          Hide
          Todd Lipcon added a comment -

          well, this whole thing is a crazy idea, I don't anticipate working on it until a lot of other much more important things are done

          As for your specific brand of crazy idea, I think log shipping is a well proven and simple method that should really cover the majority of use cases. Consensus is tricky to get right, and while using an underlying well tested protocol like ZAB helps, it still is nowhere near easy. It also means that writes on one node are blocked by slaves. So I'm -1 on that, but only as much as one can be -1 on a crazy idea while proposing another crazy idea

          Show
          Todd Lipcon added a comment - well, this whole thing is a crazy idea, I don't anticipate working on it until a lot of other much more important things are done As for your specific brand of crazy idea, I think log shipping is a well proven and simple method that should really cover the majority of use cases. Consensus is tricky to get right, and while using an underlying well tested protocol like ZAB helps, it still is nowhere near easy. It also means that writes on one node are blocked by slaves. So I'm -1 on that, but only as much as one can be -1 on a crazy idea while proposing another crazy idea
          Hide
          ryan rawson added a comment -

          that is so crazy it just might work!

          i wonder how slow updates might get?

          Show
          ryan rawson added a comment - that is so crazy it just might work! i wonder how slow updates might get?
          Hide
          Andrew Purtell added a comment -

          Stream edits with no freshness guarantee or use ZAB or Paxos over small (N=3) cliques? The latter can do away with the WAL as an option or the leader can maintain the WAL as part of the write transaction. This would still allow (a) and (b) but strengthen the consistency of both. It's not clear if there would be a significant write penalty beyond what we already take with durable WAL (hflush), especially if the WAL is only used if all members of a clique fail, so the consensus protocol and hflush can happen in parallel. Crazy idea?

          Show
          Andrew Purtell added a comment - Stream edits with no freshness guarantee or use ZAB or Paxos over small (N=3) cliques? The latter can do away with the WAL as an option or the leader can maintain the WAL as part of the write transaction. This would still allow (a) and (b) but strengthen the consistency of both. It's not clear if there would be a significant write penalty beyond what we already take with durable WAL (hflush), especially if the WAL is only used if all members of a clique fail, so the consensus protocol and hflush can happen in parallel. Crazy idea?

            People

            • Assignee:
              Unassigned
              Reporter:
              Todd Lipcon
            • Votes:
              3 Vote for this issue
              Watchers:
              24 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development