Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 9f362ac918cac99dc365edd897e1c6e130a8dd4d)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision )
@@ -75,6 +75,8 @@
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.consensus.ConsensusProviderFactory;
+import org.apache.hadoop.hbase.consensus.ConsensusProvider;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -105,6 +107,7 @@
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -390,6 +393,8 @@
protected final RSRpcServices rpcServices;
+ protected ConsensusProvider consensus;
+
/**
* Starts a HRegionServer at the default location
*
@@ -480,6 +485,9 @@
catalogTracker = createCatalogTracker();
catalogTracker.start();
+
+ consensus = ConsensusProviderFactory.getConsensusProvider(conf);
+ consensus.start(this);
}
rpcServices.start();
@@ -2117,6 +2125,10 @@
return zooKeeper;
}
+ public ConsensusProvider getConsensus() {
+ return consensus;
+ }
+
@Override
public ServerName getServerName() {
return serverName;
@@ -2276,7 +2288,9 @@
*/
private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
try {
- if (!closeRegion(region.getEncodedName(), abort, false, -1, null)) {
+ CloseRegionConsensus.CloseRegionDetails details =
+ consensus.getCloseRegionConsensus().getDetailsForNonCoordinatedClosing();
+ if (!closeRegion(region.getEncodedName(), abort, details, null)) {
LOG.warn("Failed to close " + region.getRegionNameAsString() +
" - ignoring and continuing");
}
@@ -2301,17 +2315,13 @@
*
* @param encodedName Region to close
* @param abort True if we are aborting
- * @param zk True if we are to update zk about the region close; if the close
- * was orchestrated by master, then update zk. If the close is being run by
- * the regionserver because its going down, don't update zk.
- * @param versionOfClosingNode the version of znode to compare when RS transitions the znode from
- * CLOSING state.
+ * @param crd details about closing region consensus-coordinated task
* @return True if closed a region.
* @throws NotServingRegionException if the region is not online
* @throws RegionAlreadyInTransitionException if the region is already closing
*/
protected boolean closeRegion(String encodedName, final boolean abort,
- final boolean zk, final int versionOfClosingNode, final ServerName sn)
+ CloseRegionConsensus.CloseRegionDetails crd, final ServerName sn)
throws NotServingRegionException, RegionAlreadyInTransitionException {
//Check for permissions to close.
HRegion actualRegion = this.getFromOnlineRegions(encodedName);
@@ -2335,7 +2345,7 @@
// We're going to try to do a standard close then.
LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
" Doing a standard close now");
- return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn);
+ return closeRegion(encodedName, abort, crd, sn);
}
// Let's get the region from the online region list again
actualRegion = this.getFromOnlineRegions(encodedName);
@@ -2369,9 +2379,11 @@
CloseRegionHandler crh;
final HRegionInfo hri = actualRegion.getRegionInfo();
if (hri.isMetaRegion()) {
- crh = new CloseMetaHandler(this, this, hri, abort, zk, versionOfClosingNode);
+ crh = new CloseMetaHandler(this, this, hri, abort,
+ consensus.getCloseRegionConsensus(), crd);
} else {
- crh = new CloseRegionHandler(this, this, hri, abort, zk, versionOfClosingNode, sn);
+ crh = new CloseRegionHandler(this, this, hri, abort,
+ consensus.getCloseRegionConsensus(), crd, sn);
}
this.service.submit(crh);
return true;
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 9f362ac918cac99dc365edd897e1c6e130a8dd4d)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision )
@@ -974,4 +974,7 @@
private HConstants() {
// Can't be instantiated with this ctor.
}
+
+ /** Config for pluggable consensus provider */
+ public static final String HBASE_CONSENSUS_PROVIDER_CLASS = "hbase.consensus.provider.class";
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/CloseRegionConsensus.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/CloseRegionConsensus.java (revision )
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/CloseRegionConsensus.java (revision )
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.consensus;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * Consensus operations for close region handlers.
+ */
+@InterfaceAudience.Private
+public interface CloseRegionConsensus {
+
+ /**
+ * Called before actual region closing to check that we can do close operation
+ * on this region.
+ * @param regionInfo region being closed
+ * @param crd details about closing operation
+ * @return true if caller shall proceed and close, false if need to abort closing.
+ */
+ boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd);
+
+ /**
+ * Called after region is closed to notify all interesting parties / "register"
+ * region as finally closed.
+ * @param region region being closed
+ * @param crd details about closing operation
+ */
+ void setClosedState(HRegion region, CloseRegionDetails crd);
+
+ /**
+ * Construct CloseRegionDetails instance from CloseRegionRequest.
+ * @return instance of CloseRegionDetails
+ */
+ CloseRegionDetails parseFromProtoRequest(AdminProtos.CloseRegionRequest request);
+
+ /**
+ * Get details object with params for case when we're closing on
+ * regionserver side internally (not because of RPC call from master).
+ */
+ CloseRegionDetails getDetailsForNonCoordinatedClosing();
+
+ /**
+ * Interface for region closing tasks. Used to carry implementation details in
+ * encapsulated way through Handlers to the consensus API.
+ */
+ static interface CloseRegionDetails {
+ /**
+ * Sets server name on which closing operation is running.
+ */
+ void setServerName(ServerName serverName);
+
+ /**
+ * @return server name on which closing op is running.
+ */
+ ServerName getServerName();
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/ZkOpenRegionConsensus.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/ZkOpenRegionConsensus.java (revision )
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/ZkOpenRegionConsensus.java (revision )
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.consensus;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.consensus.ConsensusProvider;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+
+public class ZkOpenRegionConsensus implements OpenRegionConsensus {
+ private static final Log LOG = LogFactory.getLog(ZkOpenRegionConsensus.class);
+
+ private ConsensusProvider consensus;
+ private final ZooKeeperWatcher watcher;
+
+ private boolean tomActivated;
+
+ public ZkOpenRegionConsensus(ConsensusProvider consensus,
+ ZooKeeperWatcher watcher) {
+ this.consensus = consensus;
+ this.watcher = watcher;
+
+ tomActivated = consensus.getServer().getConfiguration().
+ getBoolean(AssignmentManager.ASSIGNMENT_TIMEOUT_MANAGEMENT,
+ AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT);
+ }
+
+ /**
+ * @param r Region we're working on.
+ * @return whether znode is successfully transitioned to OPENED state.
+ * @throws java.io.IOException
+ */
+ @Override
+ public boolean transitionToOpened(final HRegion r, OpenRegionDetails ord) throws IOException {
+ ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+
+ boolean result = false;
+ HRegionInfo hri = r.getRegionInfo();
+ final String name = hri.getRegionNameAsString();
+ // Finally, Transition ZK node to OPENED
+ try {
+ if (ZKAssign.transitionNodeOpened(watcher, hri,
+ zkOrd.getServerName(), zkOrd.getVersion()) == -1) {
+ String warnMsg = "Completed the OPEN of region " + name +
+ " but when transitioning from " + " OPENING to OPENED ";
+ try {
+ String node = ZKAssign.getNodeName(watcher, hri.getEncodedName());
+ if (ZKUtil.checkExists(watcher, node) < 0) {
+ // if the znode
+ consensus.getServer().abort(warnMsg + "the znode disappeared", null);
+ } else {
+ LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " +
+ "so now unassigning -- closing region on server: " + zkOrd.getServerName());
+ }
+ } catch (KeeperException ke) {
+ consensus.getServer().abort(warnMsg, ke);
+ }
+ } else {
+ LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() +
+ " to OPENED in zk on " + zkOrd.getServerName());
+ result = true;
+ }
+ } catch (KeeperException e) {
+ LOG.error("Failed transitioning node " + name +
+ " from OPENING to OPENED -- closing region", e);
+ }
+ return result;
+ }
+
+ /**
+ * Transition ZK node from OFFLINE to OPENING.
+ * @param regionInfo region info instance
+ * @param ord - instance of open region details, for ZK implementation
+ * will include version Of OfflineNode that needs to be compared.
+ * before changing the node's state from OFFLINE
+ * @return True if successful transition.
+ */
+ @Override
+ public boolean transitionFromOfflineToOpening(HRegionInfo regionInfo,
+ OpenRegionDetails ord) {
+ ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+
+ // encoded name is used as znode encoded name in ZK
+ final String encodedName = regionInfo.getEncodedName();
+
+ // TODO: should also handle transition from CLOSED?
+ try {
+ // Initialize the znode version.
+ zkOrd.setVersion(ZKAssign.transitionNode(watcher, regionInfo,
+ zkOrd.getServerName(), EventType.M_ZK_REGION_OFFLINE,
+ EventType.RS_ZK_REGION_OPENING, zkOrd.getVersionOfOfflineNode()));
+ } catch (KeeperException e) {
+ LOG.error("Error transition from OFFLINE to OPENING for region=" +
+ encodedName, e);
+ zkOrd.setVersion(-1);
+ return false;
+ }
+ boolean b = isGoodVersion(zkOrd);
+ if (!b) {
+ LOG.warn("Failed transition from OFFLINE to OPENING for region=" +
+ encodedName);
+ }
+ return b;
+ }
+
+ /**
+ * Update our OPENING state in zookeeper.
+ * Do this so master doesn't timeout this region-in-transition.
+ * @param context Some context to add to logs if failure
+ * @return True if successful transition.
+ */
+ @Override
+ public boolean tickleOpening(OpenRegionDetails ord, HRegionInfo regionInfo, RegionServerServices rsServices,
+ final String context) {
+ ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+ if (!isRegionStillOpening(regionInfo, rsServices)) {
+ LOG.warn("Open region aborted since it isn't opening any more");
+ return false;
+ }
+ // If previous checks failed... do not try again.
+ if (!isGoodVersion(zkOrd)) return false;
+ String encodedName = regionInfo.getEncodedName();
+ try {
+ zkOrd.setVersion(ZKAssign.retransitionNodeOpening(watcher,
+ regionInfo, zkOrd.getServerName(), zkOrd.getVersion(), tomActivated));
+ } catch (KeeperException e) {
+ consensus.getServer().abort("Exception refreshing OPENING; region=" + encodedName +
+ ", context=" + context, e);
+ zkOrd.setVersion(-1);
+ return false;
+ }
+ boolean b = isGoodVersion(zkOrd);
+ if (!b) {
+ LOG.warn("Failed refreshing OPENING; region=" + encodedName +
+ ", context=" + context);
+ }
+ return b;
+ }
+
+ /**
+ * Try to transition to open.
+ *
+ * This is not guaranteed to succeed, we just do our best.
+ *
+ * @param rsServices
+ * @param hri Region we're working on.
+ * @param ord Details about region open task
+ * @return whether znode is successfully transitioned to FAILED_OPEN state.
+ */
+ @Override
+ public boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
+ final HRegionInfo hri, OpenRegionDetails ord) {
+ ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+ boolean result = false;
+ final String name = hri.getRegionNameAsString();
+ try {
+ LOG.info("Opening of region " + hri + " failed, transitioning" +
+ " from OFFLINE to FAILED_OPEN in ZK, expecting version " + zkOrd.getVersionOfOfflineNode());
+ if (ZKAssign.transitionNode(
+ rsServices.getZooKeeper(), hri,
+ rsServices.getServerName(),
+ EventType.M_ZK_REGION_OFFLINE,
+ EventType.RS_ZK_REGION_FAILED_OPEN,
+ zkOrd.getVersionOfOfflineNode()) == -1) {
+ LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
+ "It's likely that the master already timed out this open " +
+ "attempt, and thus another RS already has the region.");
+ } else {
+ result = true;
+ }
+ } catch (KeeperException e) {
+ LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e);
+ }
+ return result;
+ }
+
+ private boolean isGoodVersion(ZkOpenRegionDetails zkOrd) {
+ return zkOrd.getVersion() != -1;
+ }
+
+ /**
+ * This is not guaranteed to succeed, we just do our best.
+ * @param hri Region we're working on.
+ * @return whether znode is successfully transitioned to FAILED_OPEN state.
+ */
+ @Override
+ public boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri, OpenRegionDetails ord) {
+ ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord;
+ boolean result = false;
+ final String name = hri.getRegionNameAsString();
+ try {
+ LOG.info("Opening of region " + hri + " failed, transitioning" +
+ " from OPENING to FAILED_OPEN in ZK, expecting version " + zkOrd.getVersion());
+ if (ZKAssign.transitionNode(
+ watcher, hri,
+ zkOrd.getServerName(),
+ EventType.RS_ZK_REGION_OPENING,
+ EventType.RS_ZK_REGION_FAILED_OPEN,
+ zkOrd.getVersion()) == -1) {
+ LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
+ "It's likely that the master already timed out this open " +
+ "attempt, and thus another RS already has the region.");
+ } else {
+ result = true;
+ }
+ } catch (KeeperException e) {
+ LOG.error("Failed transitioning node " + name +
+ " from OPENING to FAILED_OPEN", e);
+ }
+ return result;
+ }
+
+ /**
+ * Parse ZK-related fields from request.
+ */
+ @Override
+ public OpenRegionConsensus.OpenRegionDetails parseFromProtoRequest(
+ AdminProtos.OpenRegionRequest.RegionOpenInfo regionOpenInfo) {
+ ZkOpenRegionConsensus.ZkOpenRegionDetails zkCrd =
+ new ZkOpenRegionConsensus.ZkOpenRegionDetails();
+
+ int versionOfOfflineNode = -1;
+ if (regionOpenInfo.hasVersionOfOfflineNode()) {
+ versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
+ }
+ zkCrd.setVersionOfOfflineNode(versionOfOfflineNode);
+ zkCrd.setServerName(consensus.getServer().getServerName());
+
+ return zkCrd;
+ }
+
+ /**
+ * No ZK tracking will be performed for that case.
+ * This method should be used when we want to construct CloseRegionDetails,
+ * but don't want any coordination on that (when it's initiated by regionserver),
+ * so no znode state transitions will be performed.
+ */
+ @Override
+ public OpenRegionConsensus.OpenRegionDetails getDetailsForNonCoordinatedOpening() {
+ ZkOpenRegionConsensus.ZkOpenRegionDetails zkCrd =
+ new ZkOpenRegionConsensus.ZkOpenRegionDetails();
+ zkCrd.setVersionOfOfflineNode(-1);
+ zkCrd.setServerName(consensus.getServer().getServerName());
+
+ return zkCrd;
+ }
+
+ /**
+ * ZK-based implementation. Has details about whether the state transition should be
+ * reflected in ZK, as well as expected version of znode.
+ */
+ public static class ZkOpenRegionDetails implements OpenRegionConsensus.OpenRegionDetails {
+
+ // We get version of our znode at start of open process and monitor it across
+ // the total open. We'll fail the open if someone hijacks our znode; we can
+ // tell this has happened if version is not as expected.
+ private volatile int version = -1;
+
+ //version of the offline node that was set by the master
+ private volatile int versionOfOfflineNode = -1;
+
+ /**
+ * Server name the handler is running on.
+ */
+ private ServerName serverName;
+
+ public ZkOpenRegionDetails() {
+ }
+
+ public ZkOpenRegionDetails(int versionOfOfflineNode) {
+ this.versionOfOfflineNode = versionOfOfflineNode;
+ }
+
+ public int getVersionOfOfflineNode() {
+ return versionOfOfflineNode;
+ }
+
+ public void setVersionOfOfflineNode(int versionOfOfflineNode) {
+ this.versionOfOfflineNode = versionOfOfflineNode;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return serverName;
+ }
+
+ @Override
+ public void setServerName(ServerName serverName) {
+ this.serverName = serverName;
+ }
+ }
+
+ private boolean isRegionStillOpening(HRegionInfo regionInfo, RegionServerServices rsServices) {
+ byte[] encodedName = regionInfo.getEncodedNameAsBytes();
+ Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
+ return Boolean.TRUE.equals(action); // true means opening for RIT
+ }
+}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision 9f362ac918cac99dc365edd897e1c6e130a8dd4d)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java (revision )
@@ -25,9 +25,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.consensus.ZkConsensusProvider;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.consensus.ZkOpenRegionConsensus;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MockServer;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -96,7 +98,15 @@
.getConfiguration(), htd);
assertNotNull(region);
try {
- OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd) {
+ ZkConsensusProvider consensus = new ZkConsensusProvider();
+ consensus.start(server);
+
+ ZkOpenRegionConsensus.ZkOpenRegionDetails zkCrd =
+ new ZkOpenRegionConsensus.ZkOpenRegionDetails();
+ zkCrd.setServerName(server.getServerName());
+
+ OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd, consensus.getOpenRegionConsensus(),
+ zkCrd) {
HRegion openRegion() {
// Open region first, then remove znode as though it'd been hijacked.
HRegion region = super.openRegion();
@@ -150,10 +160,19 @@
.getConfiguration(), htd);
assertNotNull(region);
try {
- OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd) {
- boolean transitionToOpened(final HRegion r) throws IOException {
+
+ ZkConsensusProvider consensus = new ZkConsensusProvider();
+ consensus.start(server);
+
+ ZkOpenRegionConsensus.ZkOpenRegionDetails zkCrd =
+ new ZkOpenRegionConsensus.ZkOpenRegionDetails();
+ zkCrd.setServerName(server.getServerName());
+
+ ZkOpenRegionConsensus openRegionConsensus = new ZkOpenRegionConsensus(consensus, server.getZooKeeper()) {
+ @Override
+ public boolean transitionToOpened(final HRegion r, OpenRegionDetails ord) throws IOException {
// remove znode simulating intermittent zookeeper connection issue
- ZooKeeperWatcher zkw = this.server.getZooKeeper();
+ ZooKeeperWatcher zkw = server.getZooKeeper();
String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
try {
ZKUtil.deleteNodeFailSilent(zkw, node);
@@ -161,9 +180,11 @@
throw new RuntimeException("Ugh failed delete of " + node, e);
}
// then try to transition to OPENED
- return super.transitionToOpened(r);
+ return super.transitionToOpened(r, ord);
}
- };
+ };
+
+ OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd, openRegionConsensus, zkCrd);
rss.getRegionsInTransitionInRS().put(
hri.getEncodedNameAsBytes(), Boolean.TRUE);
// Call process without first creating OFFLINE region in zk, see if
@@ -193,9 +214,17 @@
// Create it OFFLINE, which is what it expects
ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
+ ZkConsensusProvider consensus = new ZkConsensusProvider();
+ consensus.start(server);
+
+ ZkOpenRegionConsensus.ZkOpenRegionDetails zkCrd =
+ new ZkOpenRegionConsensus.ZkOpenRegionDetails();
+ zkCrd.setServerName(server.getServerName());
+
// Create the handler
OpenRegionHandler handler =
- new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
+ new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, consensus.getOpenRegionConsensus(),
+ zkCrd) {
@Override
HRegion openRegion() {
// Fake failure of opening a region due to an IOE, which is caught
@@ -221,8 +250,15 @@
ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
// Create the handler
+ ZkConsensusProvider consensus = new ZkConsensusProvider();
+ consensus.start(server);
+
+ ZkOpenRegionConsensus.ZkOpenRegionDetails zkCrd =
+ new ZkOpenRegionConsensus.ZkOpenRegionDetails();
+ zkCrd.setServerName(server.getServerName());
+
OpenRegionHandler handler =
- new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
+ new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, consensus.getOpenRegionConsensus(), zkCrd) {
@Override
boolean updateMeta(final HRegion r) {
// Fake failure of updating META
@@ -246,7 +282,15 @@
// Create it OFFLINE, which is what it expects
ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
// Create the handler
- OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
+ ZkConsensusProvider consensus = new ZkConsensusProvider();
+ consensus.start(server);
+
+ ZkOpenRegionConsensus.ZkOpenRegionDetails zkCrd =
+ new ZkOpenRegionConsensus.ZkOpenRegionDetails();
+ zkCrd.setServerName(server.getServerName());
+
+ OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
+ consensus.getOpenRegionConsensus(), zkCrd) {
@Override
boolean updateMeta(HRegion r) {
return false;
@@ -275,13 +319,23 @@
// Create it OFFLINE, which is what it expects
ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName());
// Create the handler
- OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) {
+ ZkConsensusProvider consensus = new ZkConsensusProvider();
+ consensus.start(server);
+ ZkOpenRegionConsensus.ZkOpenRegionDetails zkCrd =
+ new ZkOpenRegionConsensus.ZkOpenRegionDetails();
+ zkCrd.setServerName(server.getServerName());
+
+ ZkOpenRegionConsensus openRegionConsensus = new ZkOpenRegionConsensus(consensus, server.getZooKeeper()) {
@Override
- boolean transitionZookeeperOfflineToOpening(String encodedName, int versionOfOfflineNode) {
+ public boolean transitionFromOfflineToOpening(HRegionInfo regionInfo,
+ OpenRegionDetails ord) {
return false;
}
};
+
+ OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD,
+ openRegionConsensus, zkCrd);
rsServices.getRegionsInTransitionInRS().put(TEST_HRI.getEncodedNameAsBytes(), Boolean.TRUE);
handler.process();
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java (revision 9f362ac918cac99dc365edd897e1c6e130a8dd4d)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java (revision )
@@ -139,6 +139,8 @@
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus;
+import org.apache.hadoop.hbase.regionserver.consensus.OpenRegionConsensus;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -859,11 +861,6 @@
@QosPriority(priority=HConstants.HIGH_QOS)
public CloseRegionResponse closeRegion(final RpcController controller,
final CloseRegionRequest request) throws ServiceException {
- int versionOfClosingNode = -1;
- if (request.hasVersionOfClosingNode()) {
- versionOfClosingNode = request.getVersionOfClosingNode();
- }
- boolean zk = request.getTransitionInZK();
final ServerName sn = (request.hasDestinationServer() ?
ProtobufUtil.toServerName(request.getDestinationServer()) : null);
@@ -887,10 +884,11 @@
}
requestCount.increment();
- LOG.info("Close " + encodedRegionName + ", via zk=" + (zk ? "yes" : "no")
- + ", znode version=" + versionOfClosingNode + ", on " + sn);
+ LOG.info("Close " + encodedRegionName + ", on " + sn);
+ CloseRegionConsensus.CloseRegionDetails crd =
+ regionServer.getConsensus().getCloseRegionConsensus().parseFromProtoRequest(request);
- boolean closed = regionServer.closeRegion(encodedRegionName, false, zk, versionOfClosingNode, sn);
+ boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
return builder.build();
} catch (IOException ie) {
@@ -1167,11 +1165,10 @@
final boolean isBulkAssign = regionCount > 1;
for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
+ OpenRegionConsensus consensus = regionServer.getConsensus().getOpenRegionConsensus();
+ OpenRegionConsensus.OpenRegionDetails ord =
+ consensus.parseFromProtoRequest(regionOpenInfo);
- int versionOfOfflineNode = -1;
- if (regionOpenInfo.hasVersionOfOfflineNode()) {
- versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
- }
HTableDescriptor htd;
try {
final HRegion onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
@@ -1216,9 +1213,9 @@
if (Boolean.FALSE.equals(previous)) {
// There is a close in progress. We need to mark this open as failed in ZK.
- OpenRegionHandler.
- tryTransitionFromOfflineToFailedOpen(regionServer, region, versionOfOfflineNode);
-
+
+ consensus.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
+
throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
+ region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
}
@@ -1245,12 +1242,12 @@
// Need to pass the expected version in the constructor.
if (region.isMetaRegion()) {
regionServer.service.submit(new OpenMetaHandler(
- regionServer, regionServer, region, htd, versionOfOfflineNode));
+ regionServer, regionServer, region, htd, consensus, ord));
} else {
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
regionOpenInfo.getFavoredNodesList());
regionServer.service.submit(new OpenRegionHandler(
- regionServer, regionServer, region, htd, versionOfOfflineNode));
+ regionServer, regionServer, region, htd, consensus, ord));
}
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ConsensusProviderFactory.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ConsensusProviderFactory.java (revision )
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ConsensusProviderFactory.java (revision )
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.consensus;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Creates instance of {@link org.apache.hadoop.hbase.consensus.ConsensusProvider}
+ * based on conf.
+ */
+@InterfaceAudience.Private
+public class ConsensusProviderFactory {
+
+ /**
+ * Creates consensus provider from the given conf.
+ * @param conf Configuration
+ * @return A {@link org.apache.hadoop.hbase.consensus.ConsensusProvider}
+ */
+ public static ConsensusProvider getConsensusProvider(Configuration conf) {
+ Class extends ConsensusProvider> consensusKlass =
+ conf.getClass(HConstants.HBASE_CONSENSUS_PROVIDER_CLASS, ZkConsensusProvider.class,
+ ConsensusProvider.class);
+ return ReflectionUtils.newInstance(consensusKlass, conf);
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (revision 9f362ac918cac99dc365edd897e1c6e130a8dd4d)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (revision )
@@ -30,8 +30,7 @@
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.zookeeper.KeeperException;
+import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus;
/**
* Handles closing of a region on a region server.
@@ -45,73 +44,64 @@
// have a running queue of user regions to close?
private static final Log LOG = LogFactory.getLog(CloseRegionHandler.class);
- private final int FAILED = -1;
- int expectedVersion = FAILED;
-
private final RegionServerServices rsServices;
-
private final HRegionInfo regionInfo;
// If true, the hosting server is aborting. Region close process is different
// when we are aborting.
private final boolean abort;
-
- // Update zk on closing transitions. Usually true. Its false if cluster
- // is going down. In this case, its the rs that initiates the region
- // close -- not the master process so state up in zk will unlikely be
- // CLOSING.
- private final boolean zk;
private ServerName destination;
+ private CloseRegionConsensus closeRegionConsensus;
+ private CloseRegionConsensus.CloseRegionDetails closeRegionDetails;
- // This is executed after receiving an CLOSE RPC from the master.
- public CloseRegionHandler(final Server server,
- final RegionServerServices rsServices, HRegionInfo regionInfo) {
- this(server, rsServices, regionInfo, false, true, -1, EventType.M_RS_CLOSE_REGION, null);
- }
-
/**
* This method used internally by the RegionServer to close out regions.
* @param server
* @param rsServices
* @param regionInfo
* @param abort If the regionserver is aborting.
- * @param zk If the close should be noted out in zookeeper.
+ * @param closeRegionConsensus consensus for closing regions
+ * @param crd object carrying details about region close task.
*/
public CloseRegionHandler(final Server server,
final RegionServerServices rsServices,
- final HRegionInfo regionInfo, final boolean abort, final boolean zk,
- final int versionOfClosingNode) {
- this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode,
+ final HRegionInfo regionInfo, final boolean abort,
+ CloseRegionConsensus closeRegionConsensus,
+ CloseRegionConsensus.CloseRegionDetails crd) {
+ this(server, rsServices, regionInfo, abort, closeRegionConsensus, crd,
EventType.M_RS_CLOSE_REGION, null);
}
public CloseRegionHandler(final Server server,
final RegionServerServices rsServices,
- final HRegionInfo regionInfo, final boolean abort, final boolean zk,
- final int versionOfClosingNode, ServerName destination) {
- this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode,
+ final HRegionInfo regionInfo, final boolean abort,
+ CloseRegionConsensus closeRegionConsensus,
+ CloseRegionConsensus.CloseRegionDetails crd,
+ ServerName destination) {
+ this(server, rsServices, regionInfo, abort, closeRegionConsensus, crd,
EventType.M_RS_CLOSE_REGION, destination);
}
public CloseRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
- boolean abort, final boolean zk, final int versionOfClosingNode,
- EventType eventType) {
- this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, eventType, null);
+ boolean abort, CloseRegionConsensus closeRegionConsensus,
+ CloseRegionConsensus.CloseRegionDetails crd, EventType eventType) {
+ this(server, rsServices, regionInfo, abort, closeRegionConsensus, crd, eventType, null);
}
protected CloseRegionHandler(final Server server,
final RegionServerServices rsServices, HRegionInfo regionInfo,
- boolean abort, final boolean zk, final int versionOfClosingNode,
+ boolean abort, CloseRegionConsensus closeRegionConsensus,
+ CloseRegionConsensus.CloseRegionDetails crd,
EventType eventType, ServerName destination) {
super(server, eventType);
this.server = server;
this.rsServices = rsServices;
this.regionInfo = regionInfo;
this.abort = abort;
- this.zk = zk;
- this.expectedVersion = versionOfClosingNode;
this.destination = destination;
+ this.closeRegionConsensus = closeRegionConsensus;
+ this.closeRegionDetails = crd;
}
public HRegionInfo getRegionInfo() {
@@ -128,18 +118,14 @@
HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName);
if (region == null) {
LOG.warn("Received CLOSE for region " + name + " but currently not serving - ignoring");
- if (zk){
- LOG.error("The znode is not modified as we are not serving " + name);
- }
// TODO: do better than a simple warning
return;
}
// Close the region
try {
- if (zk && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo, expectedVersion)){
- // bad znode state
- return; // We're node deleting the znode, but it's not ours...
+ if (closeRegionConsensus.checkClosingState(regionInfo, closeRegionDetails)) {
+ return;
}
// TODO: If we need to keep updating CLOSING stamp to prevent against
@@ -152,10 +138,6 @@
regionInfo.getRegionNameAsString());
return;
}
- } catch (KeeperException ke) {
- server.abort("Unrecoverable exception while checking state with zk " +
- regionInfo.getRegionNameAsString() + ", still finishing close", ke);
- throw new RuntimeException(ke);
} catch (IOException ioe) {
// An IOException here indicates that we couldn't successfully flush the
// memstore before closing. So, we need to abort the server and allow
@@ -166,50 +148,13 @@
}
this.rsServices.removeFromOnlineRegions(region, destination);
+ closeRegionConsensus.setClosedState(region, closeRegionDetails);
- if (this.zk) {
- if (setClosedState(this.expectedVersion, region)) {
- LOG.debug("Set closed state in zk for " + name + " on " + this.server.getServerName());
- } else {
- LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " +
- this.server.getServerName());
- }
- }
-
// Done! Region is closed on this RS
LOG.debug("Closed " + region.getRegionNameAsString());
} finally {
this.rsServices.getRegionsInTransitionInRS().
remove(this.regionInfo.getEncodedNameAsBytes());
}
- }
-
- /**
- * Transition ZK node to CLOSED
- * @param expectedVersion
- * @return If the state is set successfully
- */
- private boolean setClosedState(final int expectedVersion, final HRegion region) {
- try {
- if (ZKAssign.transitionNodeClosed(server.getZooKeeper(), regionInfo,
- server.getServerName(), expectedVersion) == FAILED) {
- LOG.warn("Completed the CLOSE of a region but when transitioning from " +
- " CLOSING to CLOSED got a version mismatch, someone else clashed " +
- "so now unassigning");
- region.close();
- return false;
- }
- } catch (NullPointerException e) {
- // I've seen NPE when table was deleted while close was running in unit tests.
- LOG.warn("NPE during close -- catching and continuing...", e);
- return false;
- } catch (KeeperException e) {
- LOG.error("Failed transitioning node from CLOSING to CLOSED", e);
- return false;
- } catch (IOException e) {
- LOG.error("Failed to close region after failing to transition", e);
- return false;
- }
- return true;
}
}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java (revision 9f362ac918cac99dc365edd897e1c6e130a8dd4d)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java (revision )
@@ -31,11 +31,13 @@
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.consensus.ZkConsensusProvider;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.regionserver.consensus.ZkOpenRegionConsensus;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
@@ -341,8 +343,18 @@
// Let's start the open handler
HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTable());
- getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 0));
+ ZkConsensusProvider consensus = new ZkConsensusProvider();
+ consensus.start(getRS());
+
+ ZkOpenRegionConsensus.ZkOpenRegionDetails zkCrd =
+ new ZkOpenRegionConsensus.ZkOpenRegionDetails();
+ zkCrd.setServerName(getRS().getServerName());
+ zkCrd.setVersionOfOfflineNode(0);
+
+ getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd,
+ consensus.getOpenRegionConsensus(), zkCrd));
+
// The open handler should have removed the region from RIT but kept the region closed
checkRegionIsClosed();
@@ -395,7 +407,17 @@
// 2) The region in RIT was changed.
// The order is more or less implementation dependant.
HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTable());
- getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 0));
+
+ ZkConsensusProvider consensus = new ZkConsensusProvider();
+ consensus.start(getRS());
+
+ ZkOpenRegionConsensus.ZkOpenRegionDetails zkCrd =
+ new ZkOpenRegionConsensus.ZkOpenRegionDetails();
+ zkCrd.setServerName(getRS().getServerName());
+ zkCrd.setVersionOfOfflineNode(0);
+
+ getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd,
+ consensus.getOpenRegionConsensus(), zkCrd));
// The open handler should have removed the region from RIT but kept the region closed
checkRegionIsClosed();
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ConsensusProvider.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ConsensusProvider.java (revision )
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ConsensusProvider.java (revision )
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.consensus;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus;
+import org.apache.hadoop.hbase.regionserver.consensus.OpenRegionConsensus;
+
+/**
+ * Interface for consensus providers used by HBase
+ * for coordination.
+ *
+ * Property hbase.consensus.provider.class in hbase-site.xml controls
+ * which provider to use.
+ */
+@InterfaceAudience.Private
+public interface ConsensusProvider {
+
+ /**
+ * Starts consensus within specified server instance.
+ * @param server master or region server to run consensus within.
+ */
+ void start(Server server);
+
+ void stop();
+
+ /**
+ * @return instance of Server consensus runs within
+ */
+ Server getServer();
+
+ /**
+ * @return instance of consensus operations for closing regions.
+ */
+ CloseRegionConsensus getCloseRegionConsensus();
+
+ /**
+ * @return instance of consensus operations for opening regions.
+ */
+ OpenRegionConsensus getOpenRegionConsensus();
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java (revision 9f362ac918cac99dc365edd897e1c6e130a8dd4d)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java (revision )
@@ -23,24 +23,21 @@
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.consensus.CloseRegionConsensus;
/**
* Handles closing of the root region on a region server.
*/
@InterfaceAudience.Private
public class CloseMetaHandler extends CloseRegionHandler {
- // Called when master tells us shutdown a region via close rpc
- public CloseMetaHandler(final Server server,
- final RegionServerServices rsServices, final HRegionInfo regionInfo) {
- this(server, rsServices, regionInfo, false, true, -1);
- }
// Called when regionserver determines its to go down; not master orchestrated
public CloseMetaHandler(final Server server,
final RegionServerServices rsServices,
final HRegionInfo regionInfo,
- final boolean abort, final boolean zk, final int versionOfClosingNode) {
- super(server, rsServices, regionInfo, abort, zk, versionOfClosingNode,
- EventType.M_RS_CLOSE_META);
+ final boolean abort, CloseRegionConsensus closeRegionConsensus,
+ CloseRegionConsensus.CloseRegionDetails crd) {
+ super(server, rsServices, regionInfo, abort, closeRegionConsensus,
+ crd, EventType.M_RS_CLOSE_META);
}
}
Index: hbase-common/src/main/resources/hbase-default.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-common/src/main/resources/hbase-default.xml (revision 9f362ac918cac99dc365edd897e1c6e130a8dd4d)
+++ hbase-common/src/main/resources/hbase-default.xml (revision )
@@ -1160,4 +1160,9 @@
procedure. After implementing your own MasterProcedureManager, just put it in HBase's classpath
and add the fully qualified class name here.
+
@@ -48,44 +47,34 @@ protected final RegionServerServices rsServices; + private int assignmentTimeout; + private final HRegionInfo regionInfo; private final HTableDescriptor htd; - private boolean tomActivated; - private int assignmentTimeout; + private OpenRegionConsensus consensus; + private OpenRegionConsensus.OpenRegionDetails ord; - // We get version of our znode at start of open process and monitor it across - // the total open. We'll fail the open if someone hijacks our znode; we can - // tell this has happened if version is not as expected. - private volatile int version = -1; - //version of the offline node that was set by the master - private volatile int versionOfOfflineNode = -1; - public OpenRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - HTableDescriptor htd) { - this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1); - } - public OpenRegionHandler(final Server server, - final RegionServerServices rsServices, HRegionInfo regionInfo, - HTableDescriptor htd, int versionOfOfflineNode) { + HTableDescriptor htd, OpenRegionConsensus consensus, + OpenRegionConsensus.OpenRegionDetails ord) { this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, - versionOfOfflineNode); + consensus, ord); } protected OpenRegionHandler(final Server server, final RegionServerServices rsServices, final HRegionInfo regionInfo, final HTableDescriptor htd, EventType eventType, - final int versionOfOfflineNode) { + OpenRegionConsensus consensus, OpenRegionConsensus.OpenRegionDetails ord) { super(server, eventType); this.rsServices = rsServices; this.regionInfo = regionInfo; this.htd = htd; - this.versionOfOfflineNode = versionOfOfflineNode; - tomActivated = this.server.getConfiguration(). - getBoolean(AssignmentManager.ASSIGNMENT_TIMEOUT_MANAGEMENT, - AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT); - assignmentTimeout = this.server.getConfiguration(). + this.consensus = consensus; + this.ord = ord; + + assignmentTimeout = server.getConfiguration(). getInt(AssignmentManager.ASSIGNMENT_TIMEOUT, AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT); } @@ -122,13 +111,13 @@ // Check that we're still supposed to open the region and transition. // If fails, just return. Someone stole the region from under us. - // Calling transitionZookeeperOfflineToOpening initializes this.version. + // Calling transitionFromOfflineToOpening initializes this.version. if (!isRegionStillOpening()){ LOG.error("Region " + encodedName + " opening cancelled"); return; } - if (!transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) { + if (!consensus.transitionFromOfflineToOpening(regionInfo, ord)) { LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName); // This is a desperate attempt: the znode is unlikely to be ours. But we can't do more. return; @@ -142,7 +131,7 @@ } boolean failed = true; - if (tickleOpening("post_region_open")) { + if (consensus.tickleOpening(ord, regionInfo, rsServices, "post_region_open")) { if (updateMeta(region)) { failed = false; } @@ -152,8 +141,7 @@ return; } - - if (!isRegionStillOpening() || !transitionToOpened(region)) { + if (!isRegionStillOpening() || !consensus.transitionToOpened(region, ord)) { // If we fail to transition to opened, it's because of one of two cases: // (a) we lost our ZK lease // OR (b) someone else opened the region before us @@ -183,7 +171,7 @@ } finally { // Do all clean up here if (!openSuccessful) { - doCleanUpOnFailedOpen(region, transitionedToOpening); + doCleanUpOnFailedOpen(region, transitionedToOpening, ord); } final Boolean current = this.rsServices.getRegionsInTransitionInRS(). remove(this.regionInfo.getEncodedNameAsBytes()); @@ -210,7 +198,8 @@ } } - private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening) + private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening, + OpenRegionConsensus.OpenRegionDetails ord) throws IOException { if (transitionedToOpening) { try { @@ -220,12 +209,12 @@ } finally { // Even if cleanupFailed open fails we need to do this transition // See HBASE-7698 - tryTransitionFromOpeningToFailedOpen(regionInfo); + consensus.tryTransitionFromOpeningToFailedOpen(regionInfo, ord); } } else { // If still transition to OPENING is not done, we need to transition znode // to FAILED_OPEN - tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, versionOfOfflineNode); + consensus.tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, ord); } } @@ -262,7 +251,7 @@ if (elapsed > period) { // Only tickle OPENING if postOpenDeployTasks is taking some time. lastUpdate = now; - tickleOpening = tickleOpening("post_open_deploy"); + tickleOpening = consensus.tickleOpening(ord, regionInfo, rsServices,"post_open_deploy"); } synchronized (signaller) { try { @@ -324,7 +313,7 @@ try { this.services.postOpenDeployTasks(this.region, this.server.getCatalogTracker()); - } catch (KeeperException e) { + } catch (IOException e) { server.abort("Exception running postOpenDeployTasks; region=" + this.region.getRegionInfo().getEncodedName(), e); } catch (Exception e) { @@ -347,114 +336,7 @@ } } - /** - * @param r Region we're working on. - * @return whether znode is successfully transitioned to OPENED state. - * @throws IOException - */ - boolean transitionToOpened(final HRegion r) throws IOException { - boolean result = false; - HRegionInfo hri = r.getRegionInfo(); - final String name = hri.getRegionNameAsString(); - // Finally, Transition ZK node to OPENED - try { - if (ZKAssign.transitionNodeOpened(this.server.getZooKeeper(), hri, - this.server.getServerName(), this.version) == -1) { - String warnMsg = "Completed the OPEN of region " + name + - " but when transitioning from " + " OPENING to OPENED "; - try { - String node = ZKAssign.getNodeName(this.server.getZooKeeper(), hri.getEncodedName()); - if (ZKUtil.checkExists(this.server.getZooKeeper(), node) < 0) { - // if the znode - rsServices.abort(warnMsg + "the znode disappeared", null); - } else { - LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " + - "so now unassigning -- closing region on server: " + this.server.getServerName()); - } - } catch (KeeperException ke) { - rsServices.abort(warnMsg, ke); - } - } else { - LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() + - " to OPENED in zk on " + this.server.getServerName()); - result = true; - } - } catch (KeeperException e) { - LOG.error("Failed transitioning node " + name + - " from OPENING to OPENED -- closing region", e); - } - return result; - } - - /** - * This is not guaranteed to succeed, we just do our best. - * @param hri Region we're working on. - * @return whether znode is successfully transitioned to FAILED_OPEN state. - */ - private boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri) { - boolean result = false; - final String name = hri.getRegionNameAsString(); - try { - LOG.info("Opening of region " + hri + " failed, transitioning" + - " from OPENING to FAILED_OPEN in ZK, expecting version " + this.version); - if (ZKAssign.transitionNode( - this.server.getZooKeeper(), hri, - this.server.getServerName(), - EventType.RS_ZK_REGION_OPENING, - EventType.RS_ZK_REGION_FAILED_OPEN, - this.version) == -1) { - LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " + - "It's likely that the master already timed out this open " + - "attempt, and thus another RS already has the region."); - } else { - result = true; - } - } catch (KeeperException e) { - LOG.error("Failed transitioning node " + name + - " from OPENING to FAILED_OPEN", e); - } - return result; - } - - /** - * Try to transition to open. This function is static to make it usable before creating the - * handler. - * - * This is not guaranteed to succeed, we just do our best. - * - * @param rsServices - * @param hri Region we're working on. - * @param versionOfOfflineNode version to checked. - * @return whether znode is successfully transitioned to FAILED_OPEN state. - */ - public static boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices, - final HRegionInfo hri, final int versionOfOfflineNode) { - boolean result = false; - final String name = hri.getRegionNameAsString(); - try { - LOG.info("Opening of region " + hri + " failed, transitioning" + - " from OFFLINE to FAILED_OPEN in ZK, expecting version " + versionOfOfflineNode); - if (ZKAssign.transitionNode( - rsServices.getZooKeeper(), hri, - rsServices.getServerName(), - EventType.M_ZK_REGION_OFFLINE, - EventType.RS_ZK_REGION_FAILED_OPEN, - versionOfOfflineNode) == -1) { - LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " + - "It's likely that the master already timed out this open " + - "attempt, and thus another RS already has the region."); - } else { - result = true; - } - } catch (KeeperException e) { - LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e); - } - return result; - } - - - /** * @return Instance of HRegion if successful open else null. */ HRegion openRegion() { @@ -463,15 +345,15 @@ // Instantiate the region. This also periodically tickles our zk OPENING // state so master doesn't timeout this region in transition. region = HRegion.openHRegion(this.regionInfo, this.htd, - this.rsServices.getWAL(this.regionInfo), - this.server.getConfiguration(), - this.rsServices, + this.rsServices.getWAL(this.regionInfo), + this.server.getConfiguration(), + this.rsServices, new CancelableProgressable() { public boolean progress() { // We may lose the znode ownership during the open. Currently its // too hard interrupting ongoing region open. Just let it complete // and check we still have the znode after region open. - return tickleOpening("open_region_progress"); + return consensus.tickleOpening(ord, regionInfo, rsServices, "open_region_progress"); } }); } catch (Throwable t) { @@ -504,71 +386,5 @@ byte[] encodedName = regionInfo.getEncodedNameAsBytes(); Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName); return Boolean.TRUE.equals(action); // true means opening for RIT - } - - /** - * Transition ZK node from OFFLINE to OPENING. - * @param encodedName Name of the znode file (Region encodedName is the znode - * name). - * @param versionOfOfflineNode - version Of OfflineNode that needs to be compared - * before changing the node's state from OFFLINE - * @return True if successful transition. - */ - boolean transitionZookeeperOfflineToOpening(final String encodedName, - int versionOfOfflineNode) { - // TODO: should also handle transition from CLOSED? - try { - // Initialize the znode version. - this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo, - server.getServerName(), EventType.M_ZK_REGION_OFFLINE, - EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode); - } catch (KeeperException e) { - LOG.error("Error transition from OFFLINE to OPENING for region=" + - encodedName, e); - this.version = -1; - return false; - } - boolean b = isGoodVersion(); - if (!b) { - LOG.warn("Failed transition from OFFLINE to OPENING for region=" + - encodedName); - } - return b; - } - - /** - * Update our OPENING state in zookeeper. - * Do this so master doesn't timeout this region-in-transition. - * @param context Some context to add to logs if failure - * @return True if successful transition. - */ - boolean tickleOpening(final String context) { - if (!isRegionStillOpening()) { - LOG.warn("Open region aborted since it isn't opening any more"); - return false; - } - // If previous checks failed... do not try again. - if (!isGoodVersion()) return false; - String encodedName = this.regionInfo.getEncodedName(); - try { - this.version = - ZKAssign.retransitionNodeOpening(server.getZooKeeper(), - this.regionInfo, this.server.getServerName(), this.version, tomActivated); - } catch (KeeperException e) { - server.abort("Exception refreshing OPENING; region=" + encodedName + - ", context=" + context, e); - this.version = -1; - return false; - } - boolean b = isGoodVersion(); - if (!b) { - LOG.warn("Failed refreshing OPENING; region=" + encodedName + - ", context=" + context); - } - return b; - } - - private boolean isGoodVersion() { - return this.version != -1; } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/OpenRegionConsensus.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/OpenRegionConsensus.java (revision ) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/OpenRegionConsensus.java (revision ) @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.consensus; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +import java.io.IOException; + +/** + * Consensus operations for opening regions. + */ +@InterfaceAudience.Private +public interface OpenRegionConsensus { + + /** + * Tries to move regions to OPENED state. + * + * @param r Region we're working on. + * @param ord details about region opening task + * @return whether transition was successful or not + * @throws java.io.IOException + */ + boolean transitionToOpened(HRegion r, OpenRegionDetails ord) throws IOException; + + /** + * Transitions region from offline to opening state. + * @param regionInfo region we're working on. + * @param ord details about opening task. + * @return true if successful, false otherwise + */ + boolean transitionFromOfflineToOpening(HRegionInfo regionInfo, + OpenRegionDetails ord); + + /** + * Heartbeats to prevent timeouts. + * + * @param ord details about opening task. + * @param regionInfo region we're working on. + * @param rsServices instance of RegionServerrServices + * @param context used for logging purposes only + * @return true if successful heartbeat, false otherwise. + */ + boolean tickleOpening(OpenRegionDetails ord, HRegionInfo regionInfo, + RegionServerServices rsServices, String context); + + /** + * Tries transition region from offline to failed open. + * @param rsServices instance of RegionServerrServices + * @param hri region we're working on + * @param ord details about region opening task + * @return true if successful, false otherwise + */ + boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices, + HRegionInfo hri, OpenRegionDetails ord); + + /** + * Tries transition from Opening to Failed open. + * @param hri region we're working on + * @param ord details about region opening task + * @return true if successfu. false otherwise. + */ + boolean tryTransitionFromOpeningToFailedOpen(HRegionInfo hri, OpenRegionDetails ord); + + /** + * Construct OpenRegionDetails instance from part of protobuf request. + * @return instance of OpenRegionDetails. + */ + OpenRegionDetails parseFromProtoRequest(AdminProtos.OpenRegionRequest.RegionOpenInfo + regionOpenInfo); + + /** + * Get details object with params for case when we're opening on + * regionserver side with all "default" properties. + */ + OpenRegionDetails getDetailsForNonCoordinatedOpening(); + + /** + * Interface for region opening tasks. Used to carry implementation details in + * encapsulated way through Handlers to the consensus API. + */ + static interface OpenRegionDetails { + /** + * Sets server name on which opening operation is running. + */ + void setServerName(ServerName serverName); + + /** + * @return server name on which opening op is running. + */ + ServerName getServerName(); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/ZkCloseRegionConsensus.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/ZkCloseRegionConsensus.java (revision ) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/ZkCloseRegionConsensus.java (revision ) @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.consensus; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.consensus.ConsensusProvider; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; + +/** + * ZK-based implementation of {@link CloseRegionConsensus}. + */ +@InterfaceAudience.Private +public class ZkCloseRegionConsensus implements CloseRegionConsensus { + private static final Log LOG = LogFactory.getLog(ZkCloseRegionConsensus.class); + + private final static int FAILED_VERSION = -1; + + private ConsensusProvider consensus; + private final ZooKeeperWatcher watcher; + + public ZkCloseRegionConsensus(ConsensusProvider consensus, ZooKeeperWatcher watcher) { + this.consensus = consensus; + this.watcher = watcher; + } + + /** + * In ZK-based version we're checking for bad znode state, e.g. if we're + * trying to delete the znode, and it's not ours (version doesn't match). + */ + @Override + public boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd) { + ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd; + + try { + return zkCrd.isZk() && !ZKAssign.checkClosingState(watcher, + regionInfo, ((ZkCloseRegionDetails) crd).getExpectedVersion()); + } catch (KeeperException e) { + LOG.error(e.getMessage()); + } + return false; + } + + /** + * In ZK-based version we do some znodes transitioning. + */ + @Override + public void setClosedState(HRegion region, CloseRegionDetails crd) { + ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd; + String name = region.getRegionInfo().getRegionNameAsString(); + ServerName serverName = zkCrd.getServerName(); + + if (zkCrd.isZk()) { + if (setClosedState(region, zkCrd)) { + LOG.debug("Set closed state in zk for " + name + " on " + serverName); + } else { + LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " + + serverName); + } + } + } + + /** + * Parse ZK-related fields from request. + */ + @Override + public CloseRegionDetails parseFromProtoRequest(AdminProtos.CloseRegionRequest request) { + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setZk(request.getTransitionInZK()); + int versionOfClosingNode = -1; + if (request.hasVersionOfClosingNode()) { + versionOfClosingNode = request.getVersionOfClosingNode(); + } + zkCrd.setExpectedVersion(versionOfClosingNode); + zkCrd.setServerName(consensus.getServer().getServerName()); + + return zkCrd; + } + + /** + * No ZK tracking will be performed for that case. + * This method should be used when we want to construct CloseRegionDetails, + * but don't want any coordination on that (when it's initiated by regionserver), + * so no znode state transitions will be performed. + */ + @Override + public CloseRegionDetails getDetailsForNonCoordinatedClosing() { + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setZk(false); + zkCrd.setExpectedVersion(FAILED_VERSION); + zkCrd.setServerName(consensus.getServer().getServerName()); + + return zkCrd; + } + + /** + * Transition ZK node to CLOSED + * @param region HRegion instance being closed + * @param zkCrd details about region closing operation. + * @return If the state is set successfully + */ + private boolean setClosedState(final HRegion region, ZkCloseRegionDetails zkCrd) { + final int expectedVersion = zkCrd.getExpectedVersion(); + + try { + if (ZKAssign.transitionNodeClosed(watcher, region.getRegionInfo(), + zkCrd.getServerName(), expectedVersion) == FAILED_VERSION) { + LOG.warn("Completed the CLOSE of a region but when transitioning from " + + " CLOSING to CLOSED got a version mismatch, someone else clashed " + + "so now unassigning"); + region.close(); + return false; + } + } catch (NullPointerException e) { + // I've seen NPE when table was deleted while close was running in unit tests. + LOG.warn("NPE during close -- catching and continuing...", e); + return false; + } catch (KeeperException e) { + LOG.error("Failed transitioning node from CLOSING to CLOSED", e); + return false; + } catch (IOException e) { + LOG.error("Failed to close region after failing to transition", e); + return false; + } + return true; + } + + /** + * ZK-based implementation. Has details about whether the state transition should be + * reflected in ZK, as well as expected version of znode. + */ + public static class ZkCloseRegionDetails implements CloseRegionConsensus.CloseRegionDetails { + + /** + * True if we are to update zk about the region close; if the close + * was orchestrated by master, then update zk. If the close is being run by + * the regionserver because its going down, don't update zk. + * */ + private boolean zk; + + /** + * The version of znode to compare when RS transitions the znode from + * CLOSING state. + */ + private int expectedVersion = FAILED_VERSION; + + /** + * Server name the handler is running on. + */ + private ServerName serverName; + + public ZkCloseRegionDetails() { + } + + public ZkCloseRegionDetails(boolean zk, int expectedVersion) { + this.zk = zk; + this.expectedVersion = expectedVersion; + } + + public boolean isZk() { + return zk; + } + + public void setZk(boolean zk) { + this.zk = zk; + } + + public int getExpectedVersion() { + return expectedVersion; + } + + public void setExpectedVersion(int expectedVersion) { + this.expectedVersion = expectedVersion; + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public void setServerName(ServerName serverName) { + this.serverName = serverName; + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java (revision 9f362ac918cac99dc365edd897e1c6e130a8dd4d) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java (revision ) @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.consensus.OpenRegionConsensus; /** * Handles opening of a meta region on a region server. @@ -34,13 +35,9 @@ public class OpenMetaHandler extends OpenRegionHandler { public OpenMetaHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - final HTableDescriptor htd) { - this(server, rsServices, regionInfo, htd, -1); - } - public OpenMetaHandler(final Server server, - final RegionServerServices rsServices, HRegionInfo regionInfo, - final HTableDescriptor htd, int versionOfOfflineNode) { + final HTableDescriptor htd, OpenRegionConsensus consensus, + OpenRegionConsensus.OpenRegionDetails ord) { super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META, - versionOfOfflineNode); + consensus, ord); } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java (revision 9f362ac918cac99dc365edd897e1c6e130a8dd4d) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java (revision ) @@ -33,10 +33,13 @@ import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.consensus.ZkConsensusProvider; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.consensus.OpenRegionConsensus; +import org.apache.hadoop.hbase.regionserver.consensus.ZkCloseRegionConsensus; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MockServer; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -110,8 +113,18 @@ rss.addToOnlineRegions(spy); // Assert the Server is NOT stopped before we call close region. assertFalse(server.isStopped()); - CloseRegionHandler handler = - new CloseRegionHandler(server, rss, hri, false, false, -1); + + ZkConsensusProvider consensus = new ZkConsensusProvider(); + consensus.start(server); + + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setZk(false); + zkCrd.setExpectedVersion(-1); + zkCrd.setServerName(server.getServerName()); + + CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false, + consensus.getCloseRegionConsensus(), zkCrd); boolean throwable = false; try { handler.process(); @@ -141,9 +154,12 @@ HTableDescriptor htd = TEST_HTD; final HRegionInfo hri = TEST_HRI; - + + ZkConsensusProvider consensus = new ZkConsensusProvider(); + consensus.start(server); + // open a region first so that it can be closed later - OpenRegion(server, rss, htd, hri); + OpenRegion(server, rss, htd, hri, consensus.getOpenRegionConsensus()); // close the region // Create it CLOSING, which is what Master set before sending CLOSE RPC @@ -153,9 +169,15 @@ // The CloseRegionHandler will validate the expected version // Given it is set to invalid versionOfClosingNode+1, // CloseRegionHandler should be M_ZK_REGION_CLOSING - CloseRegionHandler handler = - new CloseRegionHandler(server, rss, hri, false, true, - versionOfClosingNode+1); + + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setZk(true); + zkCrd.setExpectedVersion(versionOfClosingNode+1); + zkCrd.setServerName(server.getServerName()); + + CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false, + consensus.getCloseRegionConsensus(), zkCrd); handler.process(); // Handler should remain in M_ZK_REGION_CLOSING @@ -178,9 +200,12 @@ HTableDescriptor htd = TEST_HTD; HRegionInfo hri = TEST_HRI; - + + ZkConsensusProvider consensus = new ZkConsensusProvider(); + consensus.start(server); + // open a region first so that it can be closed later - OpenRegion(server, rss, htd, hri); + OpenRegion(server, rss, htd, hri, consensus.getOpenRegionConsensus()); // close the region // Create it CLOSING, which is what Master set before sending CLOSE RPC @@ -190,9 +215,14 @@ // The CloseRegionHandler will validate the expected version // Given it is set to correct versionOfClosingNode, // CloseRegionHandlerit should be RS_ZK_REGION_CLOSED - CloseRegionHandler handler = - new CloseRegionHandler(server, rss, hri, false, true, - versionOfClosingNode); + ZkCloseRegionConsensus.ZkCloseRegionDetails zkCrd = + new ZkCloseRegionConsensus.ZkCloseRegionDetails(); + zkCrd.setZk(true); + zkCrd.setExpectedVersion(versionOfClosingNode); + zkCrd.setServerName(server.getServerName()); + + CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false, + consensus.getCloseRegionConsensus(), zkCrd); handler.process(); // Handler should have transitioned it to RS_ZK_REGION_CLOSED RegionTransition rt = RegionTransition.parseFrom( @@ -201,11 +231,13 @@ } private void OpenRegion(Server server, RegionServerServices rss, - HTableDescriptor htd, HRegionInfo hri) + HTableDescriptor htd, HRegionInfo hri, OpenRegionConsensus consensus) throws IOException, NodeExistsException, KeeperException, DeserializationException { // Create it OFFLINE node, which is what Master set before sending OPEN RPC ZKAssign.createNodeOffline(server.getZooKeeper(), hri, server.getServerName()); - OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, htd); + + OpenRegionConsensus.OpenRegionDetails ord = consensus.getDetailsForNonCoordinatedOpening(); + OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, htd, consensus, ord); rss.getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE); openHandler.process(); // This parse is not used?