diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 8172fdc..352e5e7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -73,7 +73,6 @@ public class LocalHBaseCluster { private final Class masterClass; private final Class regionServerClass; - ConsensusProvider consensusProvider; /** * Constructor. * @param conf @@ -140,7 +139,6 @@ public class LocalHBaseCluster { final Class regionServerClass) throws IOException { this.conf = conf; - consensusProvider = ConsensusProviderFactory.getConsensusProvider(conf); // Always have masters and regionservers come up on port '0' so we don't // clash over default ports. @@ -175,8 +173,14 @@ public class LocalHBaseCluster { // Create each regionserver with its own Configuration instance so each has // its HConnection instance rather than share (see HBASE_INSTANCES down in // the guts of HConnectionManager. + + // Also, create separate ConsensusProvider instance per Server. + // This is special case when we have to have more than 1 ConsensusProvider + // within 1 process. + ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf); + JVMClusterUtil.RegionServerThread rst = - JVMClusterUtil.createRegionServerThread(config, consensusProvider, + JVMClusterUtil.createRegionServerThread(config, cp, this.regionServerClass, index); this.regionThreads.add(rst); return rst; @@ -202,7 +206,12 @@ public class LocalHBaseCluster { // Create each master with its own Configuration instance so each has // its HConnection instance rather than share (see HBASE_INSTANCES down in // the guts of HConnectionManager. - JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, consensusProvider, + + // Also, create separate ConsensusProvider instance per Server. + // This is special case when we have to have more than 1 ConsensusProvider + // within 1 process. + ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf); + JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp, (Class) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index); this.masterThreads.add(mt); return mt; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseConsensusProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseConsensusProvider.java index 03f5fca..b0e67c5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseConsensusProvider.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseConsensusProvider.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.consensus; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.ConsensusProvider; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; /** * Base class for {@link org.apache.hadoop.hbase.ConsensusProvider} implementations. @@ -46,4 +47,6 @@ public abstract class BaseConsensusProvider implements ConsensusProvider { public Server getServer() { return null; } + + public abstract CloseRegionConsensus getCloseRegionConsensus(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkConsensusProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkConsensusProvider.java index 56175ae..af78795 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkConsensusProvider.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkConsensusProvider.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.consensus; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; +import org.apache.hadoop.hbase.regionserver.consensus.ZkCloseRegionConsensus; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; /** @@ -29,14 +31,25 @@ public class ZkConsensusProvider extends BaseConsensusProvider { private Server server; private ZooKeeperWatcher watcher; + // list of consensus-related classes, each responsible for particular area + // of coordination + private CloseRegionConsensus closeRegionConsensus; + @Override public void initialize(Server server) { this.server = server; this.watcher = server.getZooKeeper(); + + this.closeRegionConsensus = new ZkCloseRegionConsensus(this, watcher); } @Override public Server getServer() { return server; } + + @Override + public CloseRegionConsensus getCloseRegionConsensus() { + return closeRegionConsensus; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 267853b..9e042d9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.ConsensusProvider; import org.apache.hadoop.hbase.ConsensusProviderFactory; +import org.apache.hadoop.hbase.consensus.BaseConsensusProvider; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; @@ -107,6 +108,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -392,7 +394,7 @@ public class HRegionServer extends HasThread implements protected final RSRpcServices rpcServices; - protected ConsensusProvider consensusProvider; + protected BaseConsensusProvider consensusProvider; /** * Starts a HRegionServer at the default location. @@ -483,7 +485,7 @@ public class HRegionServer extends HasThread implements zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this, canCreateBaseZNode()); - this.consensusProvider = consensusProvider; + this.consensusProvider = (BaseConsensusProvider) consensusProvider; this.consensusProvider.initialize(this); this.consensusProvider.start(); @@ -2147,7 +2149,7 @@ public class HRegionServer extends HasThread implements } @Override - public ConsensusProvider getConsensusProvider() { + public BaseConsensusProvider getConsensusProvider() { return consensusProvider; } @@ -2310,7 +2312,9 @@ public class HRegionServer extends HasThread implements */ private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) { try { - if (!closeRegion(region.getEncodedName(), abort, false, -1, null)) { + CloseRegionConsensus.CloseRegionDetails details = + consensusProvider.getCloseRegionConsensus().getDetaultDetails(); + if (!closeRegion(region.getEncodedName(), abort, details, null)) { LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing"); } @@ -2335,17 +2339,13 @@ public class HRegionServer extends HasThread implements * * @param encodedName Region to close * @param abort True if we are aborting - * @param zk True if we are to update zk about the region close; if the close - * was orchestrated by master, then update zk. If the close is being run by - * the regionserver because its going down, don't update zk. - * @param versionOfClosingNode the version of znode to compare when RS transitions the znode from - * CLOSING state. + * @param crd details about closing region consensus-coordinated task * @return True if closed a region. * @throws NotServingRegionException if the region is not online * @throws RegionAlreadyInTransitionException if the region is already closing */ protected boolean closeRegion(String encodedName, final boolean abort, - final boolean zk, final int versionOfClosingNode, final ServerName sn) + CloseRegionConsensus.CloseRegionDetails crd, final ServerName sn) throws NotServingRegionException, RegionAlreadyInTransitionException { //Check for permissions to close. HRegion actualRegion = this.getFromOnlineRegions(encodedName); @@ -2369,7 +2369,7 @@ public class HRegionServer extends HasThread implements // We're going to try to do a standard close then. LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." + " Doing a standard close now"); - return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn); + return closeRegion(encodedName, abort, crd, sn); } // Let's get the region from the online region list again actualRegion = this.getFromOnlineRegions(encodedName); @@ -2403,9 +2403,11 @@ public class HRegionServer extends HasThread implements CloseRegionHandler crh; final HRegionInfo hri = actualRegion.getRegionInfo(); if (hri.isMetaRegion()) { - crh = new CloseMetaHandler(this, this, hri, abort, zk, versionOfClosingNode); + crh = new CloseMetaHandler(this, this, hri, abort, + consensusProvider.getCloseRegionConsensus(), crd); } else { - crh = new CloseRegionHandler(this, this, hri, abort, zk, versionOfClosingNode, sn); + crh = new CloseRegionHandler(this, this, hri, abort, + consensusProvider.getCloseRegionConsensus(), crd, sn); } this.service.submit(crh); return true; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 2e7b022..d7afca5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -140,6 +140,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -883,11 +884,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, @QosPriority(priority=HConstants.HIGH_QOS) public CloseRegionResponse closeRegion(final RpcController controller, final CloseRegionRequest request) throws ServiceException { - int versionOfClosingNode = -1; - if (request.hasVersionOfClosingNode()) { - versionOfClosingNode = request.getVersionOfClosingNode(); - } - boolean zk = request.getTransitionInZK(); final ServerName sn = (request.hasDestinationServer() ? ProtobufUtil.toServerName(request.getDestinationServer()) : null); @@ -911,10 +907,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } requestCount.increment(); - LOG.info("Close " + encodedRegionName + ", via zk=" + (zk ? "yes" : "no") - + ", znode version=" + versionOfClosingNode + ", on " + sn); + LOG.info("Close " + encodedRegionName + ", on " + sn); + CloseRegionConsensus.CloseRegionDetails crd = regionServer.getConsensusProvider() + .getCloseRegionConsensus().parseFromProtoRequest(request); - boolean closed = regionServer.closeRegion(encodedRegionName, false, zk, versionOfClosingNode, sn); + boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn); CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed); return builder.build(); } catch (IOException ie) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/CloseRegionConsensus.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/CloseRegionConsensus.java new file mode 100644 index 0000000..4ab93fa --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/CloseRegionConsensus.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.consensus; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; + +/** + * Consensus operations for close region handlers. + */ +@InterfaceAudience.Private +public interface CloseRegionConsensus { + + /** + * Called before actual region closing to check that we can do close operation + * on this region. + * @param regionInfo region being closed + * @param crd details about closing operation + * @return true if caller shall proceed and close, false if need to abort closing. + */ + boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd); + + /** + * Called after region is closed to notify all interesting parties / "register" + * region as finally closed. + * @param region region being closed + * @param sn ServerName on which task runs + * @param crd details about closing operation + */ + void setClosedState(HRegion region, ServerName sn, CloseRegionDetails crd); + + /** + * Construct CloseRegionDetails instance from CloseRegionRequest. + * @return instance of CloseRegionDetails + */ + CloseRegionDetails parseFromProtoRequest(AdminProtos.CloseRegionRequest request); + + /** + * Get details object with params for case when we're closing on + * regionserver side internally (not because of RPC call from master), + * so we don't parse details from protobuf request. + */ + CloseRegionDetails getDetaultDetails(); + + /** + * Marker interface for region closing tasks. Used to carry implementation details in + * encapsulated way through Handlers to the consensus API. + */ + static interface CloseRegionDetails { + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/ZkCloseRegionConsensus.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/ZkCloseRegionConsensus.java new file mode 100644 index 0000000..4de06f2 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/ZkCloseRegionConsensus.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.consensus; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ConsensusProvider; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; + +/** + * ZK-based implementation of {@link CloseRegionConsensus}. + */ +@InterfaceAudience.Private +public class ZkCloseRegionConsensus implements CloseRegionConsensus { + private static final Log LOG = LogFactory.getLog(ZkCloseRegionConsensus.class); + + private final static int FAILED_VERSION = -1; + + private ConsensusProvider consensus; + private final ZooKeeperWatcher watcher; + + public ZkCloseRegionConsensus(ConsensusProvider consensus, ZooKeeperWatcher watcher) { + this.consensus = consensus; + this.watcher = watcher; + } + + /** + * In ZK-based version we're checking for bad znode state, e.g. if we're + * trying to delete the znode, and it's not ours (version doesn't match). + */ + @Override + public boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd) { + ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd; + + try { + return zkCrd.isPublishStatusInZk() && !ZKAssign.checkClosingState(watcher, + regionInfo, ((ZkCloseRegionDetails) crd).getExpectedVersion()); + } catch (KeeperException ke) { + consensus.getServer().abort("Unrecoverable exception while checking state with zk " + + regionInfo.getRegionNameAsString() + ", still finishing close", ke); + throw new RuntimeException(ke); + } + } + + /** + * In ZK-based version we do some znodes transitioning. + */ + @Override + public void setClosedState(HRegion region, ServerName sn, CloseRegionDetails crd) { + ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd; + String name = region.getRegionInfo().getRegionNameAsString(); + + if (zkCrd.isPublishStatusInZk()) { + if (setClosedState(region,sn, zkCrd)) { + LOG.debug("Set closed state in zk for " + name + " on " + sn); + } else { + LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " + sn); + } + } + } + + /** + * Parse ZK-related fields from request. + */ + @Override + public CloseRegionDetails parseFromProtoRequest(AdminProtos.CloseRegionRequest request) { + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setPublishStatusInZk(request.getTransitionInZK()); + int versionOfClosingNode = -1; + if (request.hasVersionOfClosingNode()) { + versionOfClosingNode = request.getVersionOfClosingNode(); + } + zkCrd.setExpectedVersion(versionOfClosingNode); + + return zkCrd; + } + + /** + * No ZK tracking will be performed for that case. + * This method should be used when we want to construct CloseRegionDetails, + * but don't want any coordination on that (when it's initiated by regionserver), + * so no znode state transitions will be performed. + */ + @Override + public CloseRegionDetails getDetaultDetails() { + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setPublishStatusInZk(false); + zkCrd.setExpectedVersion(FAILED_VERSION); + + return zkCrd; + } + + /** + * Transition ZK node to CLOSED + * @param region HRegion instance being closed + * @param sn ServerName on which task runs + * @param zkCrd details about region closing operation. + * @return If the state is set successfully + */ + private boolean setClosedState(final HRegion region, + ServerName sn, + ZkCloseRegionDetails zkCrd) { + final int expectedVersion = zkCrd.getExpectedVersion(); + + try { + if (ZKAssign.transitionNodeClosed(watcher, region.getRegionInfo(), + sn, expectedVersion) == FAILED_VERSION) { + LOG.warn("Completed the CLOSE of a region but when transitioning from " + + " CLOSING to CLOSED got a version mismatch, someone else clashed " + + "so now unassigning"); + region.close(); + return false; + } + } catch (NullPointerException e) { + // I've seen NPE when table was deleted while close was running in unit tests. + LOG.warn("NPE during close -- catching and continuing...", e); + return false; + } catch (KeeperException e) { + LOG.error("Failed transitioning node from CLOSING to CLOSED", e); + return false; + } catch (IOException e) { + LOG.error("Failed to close region after failing to transition", e); + return false; + } + return true; + } + + /** + * ZK-based implementation. Has details about whether the state transition should be + * reflected in ZK, as well as expected version of znode. + */ + public static class ZkCloseRegionDetails implements CloseRegionConsensus.CloseRegionDetails { + + /** + * True if we are to update zk about the region close; if the close + * was orchestrated by master, then update zk. If the close is being run by + * the regionserver because its going down, don't update zk. + * */ + private boolean publishStatusInZk; + + /** + * The version of znode to compare when RS transitions the znode from + * CLOSING state. + */ + private int expectedVersion = FAILED_VERSION; + + public ZkCloseRegionDetails() { + } + + public ZkCloseRegionDetails(boolean publishStatusInZk, int expectedVersion) { + this.publishStatusInZk = publishStatusInZk; + this.expectedVersion = expectedVersion; + } + + public boolean isPublishStatusInZk() { + return publishStatusInZk; + } + + public void setPublishStatusInZk(boolean publishStatusInZk) { + this.publishStatusInZk = publishStatusInZk; + } + + public int getExpectedVersion() { + return expectedVersion; + } + + public void setExpectedVersion(int expectedVersion) { + this.expectedVersion = expectedVersion; + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java index fe69f28..34088e0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java @@ -23,24 +23,21 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; /** * Handles closing of the root region on a region server. */ @InterfaceAudience.Private public class CloseMetaHandler extends CloseRegionHandler { - // Called when master tells us shutdown a region via close rpc - public CloseMetaHandler(final Server server, - final RegionServerServices rsServices, final HRegionInfo regionInfo) { - this(server, rsServices, regionInfo, false, true, -1); - } // Called when regionserver determines its to go down; not master orchestrated public CloseMetaHandler(final Server server, final RegionServerServices rsServices, final HRegionInfo regionInfo, - final boolean abort, final boolean zk, final int versionOfClosingNode) { - super(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, - EventType.M_RS_CLOSE_META); + final boolean abort, CloseRegionConsensus closeRegionConsensus, + CloseRegionConsensus.CloseRegionDetails crd) { + super(server, rsServices, regionInfo, abort, closeRegionConsensus, + crd, EventType.M_RS_CLOSE_META); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java index 591728e..26cda2c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java @@ -30,8 +30,7 @@ import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus; /** * Handles closing of a region on a region server. @@ -45,29 +44,15 @@ public class CloseRegionHandler extends EventHandler { // have a running queue of user regions to close? private static final Log LOG = LogFactory.getLog(CloseRegionHandler.class); - private final int FAILED = -1; - int expectedVersion = FAILED; - private final RegionServerServices rsServices; - private final HRegionInfo regionInfo; // If true, the hosting server is aborting. Region close process is different // when we are aborting. private final boolean abort; - - // Update zk on closing transitions. Usually true. Its false if cluster - // is going down. In this case, its the rs that initiates the region - // close -- not the master process so state up in zk will unlikely be - // CLOSING. - private final boolean zk; private ServerName destination; - - // This is executed after receiving an CLOSE RPC from the master. - public CloseRegionHandler(final Server server, - final RegionServerServices rsServices, HRegionInfo regionInfo) { - this(server, rsServices, regionInfo, false, true, -1, EventType.M_RS_CLOSE_REGION, null); - } + private CloseRegionConsensus closeRegionConsensus; + private CloseRegionConsensus.CloseRegionDetails closeRegionDetails; /** * This method used internally by the RegionServer to close out regions. @@ -75,43 +60,48 @@ public class CloseRegionHandler extends EventHandler { * @param rsServices * @param regionInfo * @param abort If the regionserver is aborting. - * @param zk If the close should be noted out in zookeeper. + * @param closeRegionConsensus consensus for closing regions + * @param crd object carrying details about region close task. */ public CloseRegionHandler(final Server server, final RegionServerServices rsServices, - final HRegionInfo regionInfo, final boolean abort, final boolean zk, - final int versionOfClosingNode) { - this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, + final HRegionInfo regionInfo, final boolean abort, + CloseRegionConsensus closeRegionConsensus, + CloseRegionConsensus.CloseRegionDetails crd) { + this(server, rsServices, regionInfo, abort, closeRegionConsensus, crd, EventType.M_RS_CLOSE_REGION, null); } public CloseRegionHandler(final Server server, final RegionServerServices rsServices, - final HRegionInfo regionInfo, final boolean abort, final boolean zk, - final int versionOfClosingNode, ServerName destination) { - this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, + final HRegionInfo regionInfo, final boolean abort, + CloseRegionConsensus closeRegionConsensus, + CloseRegionConsensus.CloseRegionDetails crd, + ServerName destination) { + this(server, rsServices, regionInfo, abort, closeRegionConsensus, crd, EventType.M_RS_CLOSE_REGION, destination); } public CloseRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - boolean abort, final boolean zk, final int versionOfClosingNode, - EventType eventType) { - this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, eventType, null); + boolean abort, CloseRegionConsensus closeRegionConsensus, + CloseRegionConsensus.CloseRegionDetails crd, EventType eventType) { + this(server, rsServices, regionInfo, abort, closeRegionConsensus, crd, eventType, null); } protected CloseRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - boolean abort, final boolean zk, final int versionOfClosingNode, + boolean abort, CloseRegionConsensus closeRegionConsensus, + CloseRegionConsensus.CloseRegionDetails crd, EventType eventType, ServerName destination) { super(server, eventType); this.server = server; this.rsServices = rsServices; this.regionInfo = regionInfo; this.abort = abort; - this.zk = zk; - this.expectedVersion = versionOfClosingNode; this.destination = destination; + this.closeRegionConsensus = closeRegionConsensus; + this.closeRegionDetails = crd; } public HRegionInfo getRegionInfo() { @@ -128,18 +118,14 @@ public class CloseRegionHandler extends EventHandler { HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName); if (region == null) { LOG.warn("Received CLOSE for region " + name + " but currently not serving - ignoring"); - if (zk){ - LOG.error("The znode is not modified as we are not serving " + name); - } // TODO: do better than a simple warning return; } // Close the region try { - if (zk && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo, expectedVersion)){ - // bad znode state - return; // We're node deleting the znode, but it's not ours... + if (closeRegionConsensus.checkClosingState(regionInfo, closeRegionDetails)) { + return; } // TODO: If we need to keep updating CLOSING stamp to prevent against @@ -152,10 +138,6 @@ public class CloseRegionHandler extends EventHandler { regionInfo.getRegionNameAsString()); return; } - } catch (KeeperException ke) { - server.abort("Unrecoverable exception while checking state with zk " + - regionInfo.getRegionNameAsString() + ", still finishing close", ke); - throw new RuntimeException(ke); } catch (IOException ioe) { // An IOException here indicates that we couldn't successfully flush the // memstore before closing. So, we need to abort the server and allow @@ -166,15 +148,8 @@ public class CloseRegionHandler extends EventHandler { } this.rsServices.removeFromOnlineRegions(region, destination); - - if (this.zk) { - if (setClosedState(this.expectedVersion, region)) { - LOG.debug("Set closed state in zk for " + name + " on " + this.server.getServerName()); - } else { - LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " + - this.server.getServerName()); - } - } + closeRegionConsensus.setClosedState(region, this.server.getServerName(), + closeRegionDetails); // Done! Region is closed on this RS LOG.debug("Closed " + region.getRegionNameAsString()); @@ -183,33 +158,4 @@ public class CloseRegionHandler extends EventHandler { remove(this.regionInfo.getEncodedNameAsBytes()); } } - - /** - * Transition ZK node to CLOSED - * @param expectedVersion - * @return If the state is set successfully - */ - private boolean setClosedState(final int expectedVersion, final HRegion region) { - try { - if (ZKAssign.transitionNodeClosed(server.getZooKeeper(), regionInfo, - server.getServerName(), expectedVersion) == FAILED) { - LOG.warn("Completed the CLOSE of a region but when transitioning from " + - " CLOSING to CLOSED got a version mismatch, someone else clashed " + - "so now unassigning"); - region.close(); - return false; - } - } catch (NullPointerException e) { - // I've seen NPE when table was deleted while close was running in unit tests. - LOG.warn("NPE during close -- catching and continuing...", e); - return false; - } catch (KeeperException e) { - LOG.error("Failed transitioning node from CLOSING to CLOSED", e); - return false; - } catch (IOException e) { - LOG.error("Failed to close region after failing to transition", e); - return false; - } - return true; - } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java index a86ff09..47e47cc 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java @@ -33,10 +33,12 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.consensus.ZkConsensusProvider; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.consensus.ZkCloseRegionConsensus; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MockServer; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -110,8 +112,18 @@ public class TestCloseRegionHandler { rss.addToOnlineRegions(spy); // Assert the Server is NOT stopped before we call close region. assertFalse(server.isStopped()); - CloseRegionHandler handler = - new CloseRegionHandler(server, rss, hri, false, false, -1); + + ZkConsensusProvider consensusProvider = new ZkConsensusProvider(); + consensusProvider.initialize(server); + consensusProvider.start(); + + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setPublishStatusInZk(false); + zkCrd.setExpectedVersion(-1); + + CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false, + consensusProvider.getCloseRegionConsensus(), zkCrd); boolean throwable = false; try { handler.process(); @@ -153,9 +165,18 @@ public class TestCloseRegionHandler { // The CloseRegionHandler will validate the expected version // Given it is set to invalid versionOfClosingNode+1, // CloseRegionHandler should be M_ZK_REGION_CLOSING - CloseRegionHandler handler = - new CloseRegionHandler(server, rss, hri, false, true, - versionOfClosingNode+1); + + ZkConsensusProvider consensusProvider = new ZkConsensusProvider(); + consensusProvider.initialize(server); + consensusProvider.start(); + + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setPublishStatusInZk(true); + zkCrd.setExpectedVersion(versionOfClosingNode+1); + + CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false, + consensusProvider.getCloseRegionConsensus(), zkCrd); handler.process(); // Handler should remain in M_ZK_REGION_CLOSING @@ -190,9 +211,18 @@ public class TestCloseRegionHandler { // The CloseRegionHandler will validate the expected version // Given it is set to correct versionOfClosingNode, // CloseRegionHandlerit should be RS_ZK_REGION_CLOSED - CloseRegionHandler handler = - new CloseRegionHandler(server, rss, hri, false, true, - versionOfClosingNode); + + ZkConsensusProvider consensusProvider = new ZkConsensusProvider(); + consensusProvider.initialize(server); + consensusProvider.start(); + + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setPublishStatusInZk(true); + zkCrd.setExpectedVersion(versionOfClosingNode); + + CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false, + consensusProvider.getCloseRegionConsensus(), zkCrd); handler.process(); // Handler should have transitioned it to RS_ZK_REGION_CLOSED RegionTransition rt = RegionTransition.parseFrom(