Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/zk/ZkCloseRegionConsensus.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/zk/ZkCloseRegionConsensus.java (revision ) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/zk/ZkCloseRegionConsensus.java (revision ) @@ -0,0 +1,153 @@ +package org.apache.hadoop.hbase.regionserver.consensus.zk; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; + +/** + * ZK-based implementation of {@link CloseRegionConsensus}. + */ +public class ZkCloseRegionConsensus implements CloseRegionConsensus { + private static final Log LOG = LogFactory.getLog(ZkCloseRegionConsensus.class); + + private final static int FAILED_VERSION = -1; + + private final ZooKeeperWatcher watcher; + + public ZkCloseRegionConsensus(ZooKeeperWatcher watcher) { + this.watcher = watcher; + } + + /** + * In ZK-based version we're checking for bad znode state, e.g. if we're + * trying to delete the znode, and it's not ours (version doesn't match). + */ + @Override + public boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd) { + ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd; + + try { + return zkCrd.isZk() && !ZKAssign.checkClosingState(watcher, + regionInfo, ((ZkCloseRegionDetails) crd).getExpectedVersion()); + } catch (KeeperException e) { + e.printStackTrace(); + } + return false; + } + + /** + * In ZK-based version we do some znodes transitioning. + */ + @Override + public void setClosedState(HRegion region, CloseRegionDetails crd) { + ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd; + String name = region.getRegionInfo().getRegionNameAsString(); + ServerName serverName = zkCrd.getServerName(); + + if (zkCrd.isZk()) { + if (setClosedState(region, zkCrd)) { + LOG.debug("Set closed state in zk for " + name + " on " + serverName); + } else { + LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " + + serverName); + } + } + } + + /** + * Transition ZK node to CLOSED + * @param region HRegion instance being closed + * @param zkCrd details about region closing operation. + * @return If the state is set successfully + */ + private boolean setClosedState(final HRegion region, ZkCloseRegionDetails zkCrd) { + final int expectedVersion = zkCrd.getExpectedVersion(); + + try { + if (ZKAssign.transitionNodeClosed(watcher, region.getRegionInfo(), + zkCrd.getServerName(), expectedVersion) == FAILED_VERSION) { + LOG.warn("Completed the CLOSE of a region but when transitioning from " + + " CLOSING to CLOSED got a version mismatch, someone else clashed " + + "so now unassigning"); + region.close(); + return false; + } + } catch (NullPointerException e) { + // I've seen NPE when table was deleted while close was running in unit tests. + LOG.warn("NPE during close -- catching and continuing...", e); + return false; + } catch (KeeperException e) { + LOG.error("Failed transitioning node from CLOSING to CLOSED", e); + return false; + } catch (IOException e) { + LOG.error("Failed to close region after failing to transition", e); + return false; + } + return true; + } + + /** + * ZK-based implementation. Has details about whether the state transition should be + * reflected in ZK, as well as expected version of znode. + */ + public static class ZkCloseRegionDetails implements CloseRegionConsensus.CloseRegionDetails { + + // Update zk on closing transitions. Usually true. Its false if cluster + // is going down. In this case, its the rs that initiates the region + // close -- not the master process so state up in zk will unlikely be + // CLOSING. + private boolean zk; + + /** + * Expected version of the tracking znode. + */ + private int expectedVersion = FAILED_VERSION; + + /** + * Server name the handler is running on. + */ + private ServerName serverName; + + public ZkCloseRegionDetails() { + } + + public ZkCloseRegionDetails(boolean zk, int expectedVersion) { + this.zk = zk; + this.expectedVersion = expectedVersion; + } + + public boolean isZk() { + return zk; + } + + public void setZk(boolean zk) { + this.zk = zk; + } + + public int getExpectedVersion() { + return expectedVersion; + } + + public void setExpectedVersion(int expectedVersion) { + this.expectedVersion = expectedVersion; + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public void setServerName(ServerName serverName) { + this.serverName = serverName; + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/CloseRegionConsensus.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/CloseRegionConsensus.java (revision ) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/CloseRegionConsensus.java (revision ) @@ -0,0 +1,46 @@ +package org.apache.hadoop.hbase.regionserver.consensus; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.HRegion; + +/** + * Consensus operations for close region handlers. + */ +@InterfaceAudience.Private +public interface CloseRegionConsensus { + + /** + * Called before actual region closing to check that we can do close operation + * on this region. + * @param regionInfo region being closed + * @param crd details about closing operation + * @return true if caller shall proceed and close, false if need to abort closing. + */ + boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd); + + /** + * Called after region is closed to notify all interesting parties / "register" + * region as finally closed. + * @param region region being closed + * @param crd details about closing operation + */ + void setClosedState(HRegion region, CloseRegionDetails crd); + + /** + * Interface for region closing tasks. Used to carry implementation details in + * encapsulated way through Handlers to the consensus API. + */ + static interface CloseRegionDetails { + /** + * Sets server name on which closing operation is running. + */ + void setServerName(ServerName serverName); + + /** + * @return server name on which closing op is running. + */ + ServerName getServerName(); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1da464c3bcc66f027540c15d8784c3989577fd35) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision ) @@ -105,6 +105,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; +import org.apache.hadoop.hbase.regionserver.consensus.zk.ZkCloseRegionConsensus; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -2368,10 +2370,18 @@ CloseRegionHandler crh; final HRegionInfo hri = actualRegion.getRegionInfo(); + + CloseRegionConsensus consensus = new ZkCloseRegionConsensus(this.getZooKeeper()); + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setZk(zk); + zkCrd.setExpectedVersion(versionOfClosingNode); + zkCrd.setServerName(this.getServerName()); + if (hri.isMetaRegion()) { - crh = new CloseMetaHandler(this, this, hri, abort, zk, versionOfClosingNode); + crh = new CloseMetaHandler(this, this, hri, abort, consensus, zkCrd); } else { - crh = new CloseRegionHandler(this, this, hri, abort, zk, versionOfClosingNode, sn); + crh = new CloseRegionHandler(this, this, hri, abort, consensus, zkCrd, sn); } this.service.submit(crh); return true; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (revision 1da464c3bcc66f027540c15d8784c3989577fd35) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (revision ) @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver.handler; -import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -30,8 +28,7 @@ import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; /** * Handles closing of a region on a region server. @@ -45,73 +42,61 @@ // have a running queue of user regions to close? private static final Log LOG = LogFactory.getLog(CloseRegionHandler.class); - private final int FAILED = -1; - int expectedVersion = FAILED; - private final RegionServerServices rsServices; - private final HRegionInfo regionInfo; // If true, the hosting server is aborting. Region close process is different // when we are aborting. private final boolean abort; - - // Update zk on closing transitions. Usually true. Its false if cluster - // is going down. In this case, its the rs that initiates the region - // close -- not the master process so state up in zk will unlikely be - // CLOSING. - private final boolean zk; private ServerName destination; + private CloseRegionConsensus consensus; + private CloseRegionConsensus.CloseRegionDetails closeRegionDetails; - // This is executed after receiving an CLOSE RPC from the master. - public CloseRegionHandler(final Server server, - final RegionServerServices rsServices, HRegionInfo regionInfo) { - this(server, rsServices, regionInfo, false, true, -1, EventType.M_RS_CLOSE_REGION, null); - } - /** * This method used internally by the RegionServer to close out regions. * @param server * @param rsServices * @param regionInfo * @param abort If the regionserver is aborting. - * @param zk If the close should be noted out in zookeeper. + * @param consensus instance of consensus for closing region + * @param crd object carrying details about region close task. */ public CloseRegionHandler(final Server server, final RegionServerServices rsServices, - final HRegionInfo regionInfo, final boolean abort, final boolean zk, - final int versionOfClosingNode) { - this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, + final HRegionInfo regionInfo, final boolean abort, + CloseRegionConsensus consensus, CloseRegionConsensus.CloseRegionDetails crd) { + this(server, rsServices, regionInfo, abort, consensus, crd, EventType.M_RS_CLOSE_REGION, null); } public CloseRegionHandler(final Server server, final RegionServerServices rsServices, - final HRegionInfo regionInfo, final boolean abort, final boolean zk, - final int versionOfClosingNode, ServerName destination) { - this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, + final HRegionInfo regionInfo, final boolean abort, + CloseRegionConsensus consensus, CloseRegionConsensus.CloseRegionDetails crd, + ServerName destination) { + this(server, rsServices, regionInfo, abort, consensus,crd, EventType.M_RS_CLOSE_REGION, destination); } public CloseRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - boolean abort, final boolean zk, final int versionOfClosingNode, + boolean abort, CloseRegionConsensus consensus, CloseRegionConsensus.CloseRegionDetails crd, EventType eventType) { - this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, eventType, null); + this(server, rsServices, regionInfo, abort, consensus,crd, eventType, null); } protected CloseRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - boolean abort, final boolean zk, final int versionOfClosingNode, + boolean abort, CloseRegionConsensus consensus, CloseRegionConsensus.CloseRegionDetails crd, EventType eventType, ServerName destination) { super(server, eventType); this.server = server; this.rsServices = rsServices; this.regionInfo = regionInfo; this.abort = abort; - this.zk = zk; - this.expectedVersion = versionOfClosingNode; this.destination = destination; + this.consensus = consensus; + this.closeRegionDetails = crd; } public HRegionInfo getRegionInfo() { @@ -128,18 +113,14 @@ HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName); if (region == null) { LOG.warn("Received CLOSE for region " + name + " but currently not serving - ignoring"); - if (zk){ - LOG.error("The znode is not modified as we are not serving " + name); - } // TODO: do better than a simple warning return; } // Close the region try { - if (zk && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo, expectedVersion)){ - // bad znode state - return; // We're node deleting the znode, but it's not ours... + if (consensus.checkClosingState(regionInfo, closeRegionDetails)) { + return; } // TODO: If we need to keep updating CLOSING stamp to prevent against @@ -162,50 +143,13 @@ } this.rsServices.removeFromOnlineRegions(region, destination); + consensus.setClosedState(region, closeRegionDetails); - if (this.zk) { - if (setClosedState(this.expectedVersion, region)) { - LOG.debug("Set closed state in zk for " + name + " on " + this.server.getServerName()); - } else { - LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " + - this.server.getServerName()); - } - } - // Done! Region is closed on this RS LOG.debug("Closed " + region.getRegionNameAsString()); } finally { this.rsServices.getRegionsInTransitionInRS(). remove(this.regionInfo.getEncodedNameAsBytes()); } - } - - /** - * Transition ZK node to CLOSED - * @param expectedVersion - * @return If the state is set successfully - */ - private boolean setClosedState(final int expectedVersion, final HRegion region) { - try { - if (ZKAssign.transitionNodeClosed(server.getZooKeeper(), regionInfo, - server.getServerName(), expectedVersion) == FAILED) { - LOG.warn("Completed the CLOSE of a region but when transitioning from " + - " CLOSING to CLOSED got a version mismatch, someone else clashed " + - "so now unassigning"); - region.close(); - return false; - } - } catch (NullPointerException e) { - // I've seen NPE when table was deleted while close was running in unit tests. - LOG.warn("NPE during close -- catching and continuing...", e); - return false; - } catch (KeeperException e) { - LOG.error("Failed transitioning node from CLOSING to CLOSED", e); - return false; - } catch (IOException e) { - LOG.error("Failed to close region after failing to transition", e); - return false; - } - return true; } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java (revision 1da464c3bcc66f027540c15d8784c3989577fd35) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java (revision ) @@ -23,24 +23,21 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; /** * Handles closing of the root region on a region server. */ @InterfaceAudience.Private public class CloseMetaHandler extends CloseRegionHandler { - // Called when master tells us shutdown a region via close rpc - public CloseMetaHandler(final Server server, - final RegionServerServices rsServices, final HRegionInfo regionInfo) { - this(server, rsServices, regionInfo, false, true, -1); - } // Called when regionserver determines its to go down; not master orchestrated public CloseMetaHandler(final Server server, final RegionServerServices rsServices, final HRegionInfo regionInfo, - final boolean abort, final boolean zk, final int versionOfClosingNode) { - super(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, + final boolean abort, CloseRegionConsensus consensus, + CloseRegionConsensus.CloseRegionDetails crd) { + super(server, rsServices, regionInfo, abort, consensus, crd, EventType.M_RS_CLOSE_META); } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java (revision 1da464c3bcc66f027540c15d8784c3989577fd35) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java (revision ) @@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; +import org.apache.hadoop.hbase.regionserver.consensus.zk.ZkCloseRegionConsensus; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MockServer; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -110,8 +112,16 @@ rss.addToOnlineRegions(spy); // Assert the Server is NOT stopped before we call close region. assertFalse(server.isStopped()); + + CloseRegionConsensus consensus = new ZkCloseRegionConsensus(null); + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setZk(false); + zkCrd.setExpectedVersion(-1); + zkCrd.setServerName(server.getServerName()); + CloseRegionHandler handler = - new CloseRegionHandler(server, rss, hri, false, false, -1); + new CloseRegionHandler(server, rss, hri, false, consensus, zkCrd); boolean throwable = false; try { handler.process(); @@ -153,9 +163,16 @@ // The CloseRegionHandler will validate the expected version // Given it is set to invalid versionOfClosingNode+1, // CloseRegionHandler should be M_ZK_REGION_CLOSING + + CloseRegionConsensus consensus = new ZkCloseRegionConsensus(server.getZooKeeper()); + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setZk(true); + zkCrd.setExpectedVersion(versionOfClosingNode+1); + zkCrd.setServerName(server.getServerName()); + CloseRegionHandler handler = - new CloseRegionHandler(server, rss, hri, false, true, - versionOfClosingNode+1); + new CloseRegionHandler(server, rss, hri, false, consensus, zkCrd); handler.process(); // Handler should remain in M_ZK_REGION_CLOSING @@ -190,9 +207,16 @@ // The CloseRegionHandler will validate the expected version // Given it is set to correct versionOfClosingNode, // CloseRegionHandlerit should be RS_ZK_REGION_CLOSED + + CloseRegionConsensus consensus = new ZkCloseRegionConsensus(server.getZooKeeper()); + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setZk(true); + zkCrd.setExpectedVersion(versionOfClosingNode); + zkCrd.setServerName(server.getServerName()); + CloseRegionHandler handler = - new CloseRegionHandler(server, rss, hri, false, true, - versionOfClosingNode); + new CloseRegionHandler(server, rss, hri, false, consensus, zkCrd); handler.process(); // Handler should have transitioned it to RS_ZK_REGION_CLOSED RegionTransition rt = RegionTransition.parseFrom(