diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java index 5e4e53e..6cee2df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java @@ -62,4 +62,9 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan * Method to retrieve coordination for closing region operations. */ public abstract CloseRegionCoordination getCloseRegionCoordination(); + + /** + * Method to retrieve coordination for opening region operations. + */ + public abstract OpenRegionCoordination getOpenRegionCoordination(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/OpenRegionCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/OpenRegionCoordination.java new file mode 100644 index 0000000..0c6871d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/OpenRegionCoordination.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coordination; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +import java.io.IOException; + +/** + * Cocoordination operations for opening regions. + */ +@InterfaceAudience.Private +public interface OpenRegionCoordination { + + //--------------------- + // RS-side operations + //--------------------- + /** + * Tries to move regions to OPENED state. + * + * @param r Region we're working on. + * @param ord details about region opening task + * @return whether transition was successful or not + * @throws java.io.IOException + */ + boolean transitionToOpened(HRegion r, OpenRegionDetails ord) throws IOException; + + /** + * Transitions region from offline to opening state. + * @param regionInfo region we're working on. + * @param ord details about opening task. + * @return true if successful, false otherwise + */ + boolean transitionFromOfflineToOpening(HRegionInfo regionInfo, + OpenRegionDetails ord); + + /** + * Heartbeats to prevent timeouts. + * + * @param ord details about opening task. + * @param regionInfo region we're working on. + * @param rsServices instance of RegionServerrServices + * @param context used for logging purposes only + * @return true if successful heartbeat, false otherwise. + */ + boolean tickleOpening(OpenRegionDetails ord, HRegionInfo regionInfo, + RegionServerServices rsServices, String context); + + /** + * Tries transition region from offline to failed open. + * @param rsServices instance of RegionServerServices + * @param hri region we're working on + * @param ord details about region opening task + * @return true if successful, false otherwise + */ + boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices, + HRegionInfo hri, OpenRegionDetails ord); + + /** + * Tries transition from Opening to Failed open. + * @param hri region we're working on + * @param ord details about region opening task + * @return true if successfu. false otherwise. + */ + boolean tryTransitionFromOpeningToFailedOpen(HRegionInfo hri, OpenRegionDetails ord); + + /** + * Construct OpenRegionDetails instance from part of protobuf request. + * @return instance of OpenRegionDetails. + */ + OpenRegionDetails parseFromProtoRequest(AdminProtos.OpenRegionRequest.RegionOpenInfo + regionOpenInfo); + + /** + * Get details object with params for case when we're opening on + * regionserver side with all "default" properties. + */ + OpenRegionDetails getDetailsForNonCoordinatedOpening(); + + //------------------------- + // HMaster-side operations + //------------------------- + + /** + * Commits opening operation on HM side (steps required for "commit" + * are determined by coordination implementation). + * @return true if committed successfully, false otherwise. + */ + public boolean commitOpenOnMasterSide(AssignmentManager assignmentManager, + HRegionInfo regionInfo, + OpenRegionDetails ord); + + /** + * Interface for region opening tasks. Used to carry implementation details in + * encapsulated way through Handlers to the coordination API. + */ + static interface OpenRegionDetails { + /** + * Sets server name on which opening operation is running. + */ + void setServerName(ServerName serverName); + + /** + * @return server name on which opening op is running. + */ + ServerName getServerName(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index dab0262..2c2fa44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -37,6 +37,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { protected ZooKeeperWatcher watcher; protected SplitTransactionCoordination splitTransactionCoordination; protected CloseRegionCoordination closeRegionCoordination; + protected OpenRegionCoordination openRegionCoordination; @Override public void initialize(Server server) { @@ -45,6 +46,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher); closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher); + openRegionCoordination = new ZkOpenRegionCoordination(this, watcher); } @Override @@ -71,4 +73,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { public CloseRegionCoordination getCloseRegionCoordination() { return closeRegionCoordination; } + + @Override + public OpenRegionCoordination getOpenRegionCoordination() { + return openRegionCoordination; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java new file mode 100644 index 0000000..290ed09 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkOpenRegionCoordination.java @@ -0,0 +1,414 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coordination; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; + +/** + * ZK-based implementation of {@link OpenRegionCoordination}. + */ +@InterfaceAudience.Private +public class ZkOpenRegionCoordination implements OpenRegionCoordination { + private static final Log LOG = LogFactory.getLog(ZkOpenRegionCoordination.class); + + private CoordinatedStateManager coordination; + private final ZooKeeperWatcher watcher; + + public ZkOpenRegionCoordination(CoordinatedStateManager coordination, + ZooKeeperWatcher watcher) { + this.coordination = coordination; + this.watcher = watcher; + } + + //------------------------------- + // Region Server-side operations + //------------------------------- + + /** + * @param r Region we're working on. + * @return whether znode is successfully transitioned to OPENED state. + * @throws java.io.IOException + */ + @Override + public boolean transitionToOpened(final HRegion r, OpenRegionDetails ord) throws IOException { + ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; + + boolean result = false; + HRegionInfo hri = r.getRegionInfo(); + final String name = hri.getRegionNameAsString(); + // Finally, Transition ZK node to OPENED + try { + if (ZKAssign.transitionNodeOpened(watcher, hri, + zkOrd.getServerName(), zkOrd.getVersion()) == -1) { + String warnMsg = "Completed the OPEN of region " + name + + " but when transitioning from " + " OPENING to OPENED "; + try { + String node = ZKAssign.getNodeName(watcher, hri.getEncodedName()); + if (ZKUtil.checkExists(watcher, node) < 0) { + // if the znode + coordination.getServer().abort(warnMsg + "the znode disappeared", null); + } else { + LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " + + "so now unassigning -- closing region on server: " + zkOrd.getServerName()); + } + } catch (KeeperException ke) { + coordination.getServer().abort(warnMsg, ke); + } + } else { + LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() + + " to OPENED in zk on " + zkOrd.getServerName()); + result = true; + } + } catch (KeeperException e) { + LOG.error("Failed transitioning node " + name + + " from OPENING to OPENED -- closing region", e); + } + return result; + } + + /** + * Transition ZK node from OFFLINE to OPENING. + * @param regionInfo region info instance + * @param ord - instance of open region details, for ZK implementation + * will include version Of OfflineNode that needs to be compared + * before changing the node's state from OFFLINE + * @return True if successful transition. + */ + @Override + public boolean transitionFromOfflineToOpening(HRegionInfo regionInfo, + OpenRegionDetails ord) { + ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; + + // encoded name is used as znode encoded name in ZK + final String encodedName = regionInfo.getEncodedName(); + + // TODO: should also handle transition from CLOSED? + try { + // Initialize the znode version. + zkOrd.setVersion(ZKAssign.transitionNode(watcher, regionInfo, + zkOrd.getServerName(), EventType.M_ZK_REGION_OFFLINE, + EventType.RS_ZK_REGION_OPENING, zkOrd.getVersionOfOfflineNode())); + } catch (KeeperException e) { + LOG.error("Error transition from OFFLINE to OPENING for region=" + + encodedName, e); + zkOrd.setVersion(-1); + return false; + } + boolean b = isGoodVersion(zkOrd); + if (!b) { + LOG.warn("Failed transition from OFFLINE to OPENING for region=" + + encodedName); + } + return b; + } + + /** + * Update our OPENING state in zookeeper. + * Do this so master doesn't timeout this region-in-transition. + * We may lose the znode ownership during the open. Currently its + * too hard interrupting ongoing region open. Just let it complete + * and check we still have the znode after region open. + * + * @param context Some context to add to logs if failure + * @return True if successful transition. + */ + @Override + public boolean tickleOpening(OpenRegionDetails ord, HRegionInfo regionInfo, + RegionServerServices rsServices, final String context) { + ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; + if (!isRegionStillOpening(regionInfo, rsServices)) { + LOG.warn("Open region aborted since it isn't opening any more"); + return false; + } + // If previous checks failed... do not try again. + if (!isGoodVersion(zkOrd)) return false; + String encodedName = regionInfo.getEncodedName(); + try { + zkOrd.setVersion(ZKAssign.confirmNodeOpening(watcher, + regionInfo, zkOrd.getServerName(), zkOrd.getVersion())); + } catch (KeeperException e) { + coordination.getServer().abort("Exception refreshing OPENING; region=" + encodedName + + ", context=" + context, e); + zkOrd.setVersion(-1); + return false; + } + boolean b = isGoodVersion(zkOrd); + if (!b) { + LOG.warn("Failed refreshing OPENING; region=" + encodedName + + ", context=" + context); + } + return b; + } + + /** + * Try to transition to open. + * + * This is not guaranteed to succeed, we just do our best. + * + * @param rsServices + * @param hri Region we're working on. + * @param ord Details about region open task + * @return whether znode is successfully transitioned to FAILED_OPEN state. + */ + @Override + public boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices, + final HRegionInfo hri, + OpenRegionDetails ord) { + ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; + boolean result = false; + final String name = hri.getRegionNameAsString(); + try { + LOG.info("Opening of region " + hri + " failed, transitioning" + + " from OFFLINE to FAILED_OPEN in ZK, expecting version " + + zkOrd.getVersionOfOfflineNode()); + if (ZKAssign.transitionNode( + rsServices.getZooKeeper(), hri, + rsServices.getServerName(), + EventType.M_ZK_REGION_OFFLINE, + EventType.RS_ZK_REGION_FAILED_OPEN, + zkOrd.getVersionOfOfflineNode()) == -1) { + LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " + + "It's likely that the master already timed out this open " + + "attempt, and thus another RS already has the region."); + } else { + result = true; + } + } catch (KeeperException e) { + LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e); + } + return result; + } + + private boolean isGoodVersion(ZkOpenRegionDetails zkOrd) { + return zkOrd.getVersion() != -1; + } + + /** + * This is not guaranteed to succeed, we just do our best. + * @param hri Region we're working on. + * @return whether znode is successfully transitioned to FAILED_OPEN state. + */ + @Override + public boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri, + OpenRegionDetails ord) { + ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; + boolean result = false; + final String name = hri.getRegionNameAsString(); + try { + LOG.info("Opening of region " + hri + " failed, transitioning" + + " from OPENING to FAILED_OPEN in ZK, expecting version " + zkOrd.getVersion()); + if (ZKAssign.transitionNode( + watcher, hri, + zkOrd.getServerName(), + EventType.RS_ZK_REGION_OPENING, + EventType.RS_ZK_REGION_FAILED_OPEN, + zkOrd.getVersion()) == -1) { + LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " + + "It's likely that the master already timed out this open " + + "attempt, and thus another RS already has the region."); + } else { + result = true; + } + } catch (KeeperException e) { + LOG.error("Failed transitioning node " + name + + " from OPENING to FAILED_OPEN", e); + } + return result; + } + + /** + * Parse ZK-related fields from request. + */ + @Override + public OpenRegionCoordination.OpenRegionDetails parseFromProtoRequest( + AdminProtos.OpenRegionRequest.RegionOpenInfo regionOpenInfo) { + ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + + int versionOfOfflineNode = -1; + if (regionOpenInfo.hasVersionOfOfflineNode()) { + versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode(); + } + zkCrd.setVersionOfOfflineNode(versionOfOfflineNode); + zkCrd.setServerName(coordination.getServer().getServerName()); + + return zkCrd; + } + + /** + * No ZK tracking will be performed for that case. + * This method should be used when we want to construct CloseRegionDetails, + * but don't want any coordination on that (when it's initiated by regionserver), + * so no znode state transitions will be performed. + */ + @Override + public OpenRegionCoordination.OpenRegionDetails getDetailsForNonCoordinatedOpening() { + ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkCrd.setVersionOfOfflineNode(-1); + zkCrd.setServerName(coordination.getServer().getServerName()); + + return zkCrd; + } + + //-------------------------- + // HMaster-side operations + //-------------------------- + @Override + public boolean commitOpenOnMasterSide(AssignmentManager assignmentManager, + HRegionInfo regionInfo, + OpenRegionDetails ord) { + boolean committedSuccessfully = true; + + // Code to defend against case where we get SPLIT before region open + // processing completes; temporary till we make SPLITs go via zk -- 0.92. + RegionState regionState = assignmentManager.getRegionStates() + .getRegionTransitionState(regionInfo.getEncodedName()); + boolean openedNodeDeleted = false; + if (regionState != null && regionState.isOpened()) { + openedNodeDeleted = deleteOpenedNode(regionInfo, ord); + if (!openedNodeDeleted) { + LOG.error("Znode of region " + regionInfo.getShortNameToLog() + " could not be deleted."); + } + } else { + LOG.warn("Skipping the onlining of " + regionInfo.getShortNameToLog() + + " because regions is NOT in RIT -- presuming this is because it SPLIT"); + } + if (!openedNodeDeleted) { + if (assignmentManager.getTableStateManager().isTableState(regionInfo.getTable(), + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + debugLog(regionInfo, "Opened region " + + regionInfo.getShortNameToLog() + " but " + + "this table is disabled, triggering close of region"); + committedSuccessfully = false; + } + } + + return committedSuccessfully; + } + + private boolean deleteOpenedNode(HRegionInfo regionInfo, OpenRegionDetails ord) { + ZkOpenRegionDetails zkOrd = (ZkOpenRegionDetails) ord; + int expectedVersion = zkOrd.getVersion(); + + debugLog(regionInfo, "Handling OPENED of " + + regionInfo.getShortNameToLog() + " from " + zkOrd.getServerName().toString() + + "; deleting unassigned node"); + try { + // delete the opened znode only if the version matches. + return ZKAssign.deleteNode(this.coordination.getServer().getZooKeeper(), + regionInfo.getEncodedName(), EventType.RS_ZK_REGION_OPENED, expectedVersion); + } catch(KeeperException.NoNodeException e){ + // Getting no node exception here means that already the region has been opened. + LOG.warn("The znode of the region " + regionInfo.getShortNameToLog() + + " would have already been deleted"); + return false; + } catch (KeeperException e) { + this.coordination.getServer().abort("Error deleting OPENED node in ZK (" + + regionInfo.getRegionNameAsString() + ")", e); + } + return false; + } + + private void debugLog(HRegionInfo region, String string) { + if (region.isMetaTable()) { + LOG.info(string); + } else { + LOG.debug(string); + } + } + + // Additional classes and helper methods + + /** + * ZK-based implementation. Has details about whether the state transition should be + * reflected in ZK, as well as expected version of znode. + */ + public static class ZkOpenRegionDetails implements OpenRegionCoordination.OpenRegionDetails { + + // We get version of our znode at start of open process and monitor it across + // the total open. We'll fail the open if someone hijacks our znode; we can + // tell this has happened if version is not as expected. + private volatile int version = -1; + + //version of the offline node that was set by the master + private volatile int versionOfOfflineNode = -1; + + /** + * Server name the handler is running on. + */ + private ServerName serverName; + + public ZkOpenRegionDetails() { + } + + public ZkOpenRegionDetails(int versionOfOfflineNode) { + this.versionOfOfflineNode = versionOfOfflineNode; + } + + public int getVersionOfOfflineNode() { + return versionOfOfflineNode; + } + + public void setVersionOfOfflineNode(int versionOfOfflineNode) { + this.versionOfOfflineNode = versionOfOfflineNode; + } + + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public void setServerName(ServerName serverName) { + this.serverName = serverName; + } + } + + private boolean isRegionStillOpening(HRegionInfo regionInfo, RegionServerServices rsServices) { + byte[] encodedName = regionInfo.getEncodedNameAsBytes(); + Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName); + return Boolean.TRUE.equals(action); // true means opening for RIT + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 1c4804d..529a333 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -58,6 +58,8 @@ import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; +import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -75,6 +77,7 @@ import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; @@ -577,8 +580,20 @@ public class AssignmentManager extends ZooKeeperListener { return false; } } + + // TODO: This code is tied to ZK anyway, so for now leaving it as is, + // will refactor when whole region assignment will be abstracted from ZK + BaseCoordinatedStateManager cp = + (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager(); + OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination(); + + ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkOrd.setVersion(stat.getVersion()); + zkOrd.setServerName(cp.getServer().getServerName()); + return processRegionsInTransition( - rt, hri, stat.getVersion()); + rt, hri, openRegionCoordination, zkOrd); } finally { lock.unlock(); } @@ -594,7 +609,8 @@ public class AssignmentManager extends ZooKeeperListener { */ boolean processRegionsInTransition( final RegionTransition rt, final HRegionInfo regionInfo, - final int expectedVersion) throws KeeperException { + OpenRegionCoordination coordination, + final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException { EventType et = rt.getEventType(); // Get ServerName. Could not be null. final ServerName sn = rt.getServerName(); @@ -652,6 +668,8 @@ public class AssignmentManager extends ZooKeeperListener { public void process() throws IOException { ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName()); try { + final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord) + .getVersion(); unassign(regionInfo, rsClosing, expectedVersion, null, true, null); if (regionStates.isRegionOffline(regionInfo)) { assign(regionInfo, true); @@ -699,7 +717,7 @@ public class AssignmentManager extends ZooKeeperListener { // This could be done asynchronously, we would need then to acquire the lock in the // handler. regionStates.updateRegionState(rt, State.OPEN); - new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process(); + new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process(); break; case RS_ZK_REQUEST_REGION_SPLIT: case RS_ZK_REGION_SPLITTING: @@ -748,10 +766,12 @@ public class AssignmentManager extends ZooKeeperListener { *
* This deals with skipped transitions (we got a CLOSED but didn't see CLOSING * yet). - * @param rt - * @param expectedVersion + * @param rt region transition + * @param coordination coordination for opening region + * @param ord details about opening region */ - void handleRegion(final RegionTransition rt, int expectedVersion) { + void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination, + OpenRegionCoordination.OpenRegionDetails ord) { if (rt == null) { LOG.warn("Unexpected NULL input for RegionTransition rt"); return; @@ -931,7 +951,7 @@ public class AssignmentManager extends ZooKeeperListener { if (regionState != null) { failedOpenTracker.remove(encodedName); // reset the count, if any new OpenedRegionHandler( - server, this, regionState.getRegion(), sn, expectedVersion).process(); + server, this, regionState.getRegion(), coordination, ord).process(); updateOpenedRegionHandlerTracker(regionState.getRegion()); } break; @@ -1299,7 +1319,19 @@ public class AssignmentManager extends ZooKeeperListener { if (data == null) return; RegionTransition rt = RegionTransition.parseFrom(data); - handleRegion(rt, stat.getVersion()); + + // TODO: This code is tied to ZK anyway, so for now leaving it as is, + // will refactor when whole region assignment will be abstracted from ZK + BaseCoordinatedStateManager csm = + (BaseCoordinatedStateManager) server.getCoordinatedStateManager(); + OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination(); + + ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkOrd.setVersion(stat.getVersion()); + zkOrd.setServerName(csm.getServer().getServerName()); + + handleRegion(rt, openRegionCoordination, zkOrd); } catch (KeeperException e) { server.abort("Unexpected ZK exception reading unassigned node data", e); } catch (DeserializationException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java index 3457131..a2dc41b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java @@ -18,21 +18,16 @@ */ package org.apache.hadoop.hbase.master.handler; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.zookeeper.KeeperException; /** * Handles OPENED region event on Master. @@ -42,9 +37,10 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf private static final Log LOG = LogFactory.getLog(OpenedRegionHandler.class); private final AssignmentManager assignmentManager; private final HRegionInfo regionInfo; - private final ServerName sn; private final OpenedPriority priority; - private final int expectedVersion; + + private OpenRegionCoordination coordination; + private OpenRegionCoordination.OpenRegionDetails ord; private enum OpenedPriority { META (1), @@ -62,12 +58,13 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf public OpenedRegionHandler(Server server, AssignmentManager assignmentManager, HRegionInfo regionInfo, - ServerName sn, int expectedVersion) { + OpenRegionCoordination coordination, + OpenRegionCoordination.OpenRegionDetails ord) { super(server, EventType.RS_ZK_REGION_OPENED); this.assignmentManager = assignmentManager; this.regionInfo = regionInfo; - this.sn = sn; - this.expectedVersion = expectedVersion; + this.coordination = coordination; + this.ord = ord; if(regionInfo.isMetaRegion()) { priority = OpenedPriority.META; } else if(regionInfo.getTable() @@ -99,56 +96,8 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf @Override public void process() { - // Code to defend against case where we get SPLIT before region open - // processing completes; temporary till we make SPLITs go via zk -- 0.92. - RegionState regionState = this.assignmentManager.getRegionStates() - .getRegionTransitionState(regionInfo.getEncodedName()); - boolean openedNodeDeleted = false; - if (regionState != null && regionState.isOpened()) { - openedNodeDeleted = deleteOpenedNode(expectedVersion); - if (!openedNodeDeleted) { - LOG.error("Znode of region " + regionInfo.getShortNameToLog() + " could not be deleted."); - } - } else { - LOG.warn("Skipping the onlining of " + regionInfo.getShortNameToLog() + - " because regions is NOT in RIT -- presuming this is because it SPLIT"); - } - if (!openedNodeDeleted) { - if (this.assignmentManager.getTableStateManager().isTableState(regionInfo.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { - debugLog(regionInfo, "Opened region " - + regionInfo.getShortNameToLog() + " but " - + "this table is disabled, triggering close of region"); + if (!coordination.commitOpenOnMasterSide(assignmentManager,regionInfo, ord)) { assignmentManager.unassign(regionInfo); - } - } - } - - private boolean deleteOpenedNode(int expectedVersion) { - debugLog(regionInfo, "Handling OPENED of " + - this.regionInfo.getShortNameToLog() + " from " + this.sn.toString() + - "; deleting unassigned node"); - try { - // delete the opened znode only if the version matches. - return ZKAssign.deleteNode(server.getZooKeeper(), - regionInfo.getEncodedName(), EventType.RS_ZK_REGION_OPENED, expectedVersion); - } catch(KeeperException.NoNodeException e){ - // Getting no node exception here means that already the region has been opened. - LOG.warn("The znode of the region " + regionInfo.getShortNameToLog() + - " would have already been deleted"); - return false; - } catch (KeeperException e) { - server.abort("Error deleting OPENED node in ZK (" + - regionInfo.getRegionNameAsString() + ")", e); - } - return false; - } - - private void debugLog(HRegionInfo region, String string) { - if (region.isMetaTable()) { - LOG.info(string); - } else { - LOG.debug(string); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7e24610..9d90d1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -141,6 +141,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -1188,11 +1189,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final boolean isBulkAssign = regionCount > 1; for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) { final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion()); + OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager(). + getOpenRegionCoordination(); + OpenRegionCoordination.OpenRegionDetails ord = + coordination.parseFromProtoRequest(regionOpenInfo); - int versionOfOfflineNode = -1; - if (regionOpenInfo.hasVersionOfOfflineNode()) { - versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode(); - } HTableDescriptor htd; try { final HRegion onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName()); @@ -1237,8 +1238,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (Boolean.FALSE.equals(previous)) { // There is a close in progress. We need to mark this open as failed in ZK. - OpenRegionHandler. - tryTransitionFromOfflineToFailedOpen(regionServer, region, versionOfOfflineNode); + + coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord); throw new RegionAlreadyInTransitionException("Received OPEN for the region:" + region.getRegionNameAsString() + " , which we are already trying to CLOSE "); @@ -1266,12 +1267,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // Need to pass the expected version in the constructor. if (region.isMetaRegion()) { regionServer.service.submit(new OpenMetaHandler( - regionServer, regionServer, region, htd, versionOfOfflineNode)); + regionServer, regionServer, region, htd, coordination, ord)); } else { regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), regionOpenInfo.getFavoredNodesList()); regionServer.service.submit(new OpenRegionHandler( - regionServer, regionServer, region, htd, versionOfOfflineNode)); + regionServer, regionServer, region, htd, coordination, ord)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java index 2278e0b..f2d5f1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; /** * Handles opening of a meta region on a region server. @@ -34,13 +35,9 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; public class OpenMetaHandler extends OpenRegionHandler { public OpenMetaHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - final HTableDescriptor htd) { - this(server, rsServices, regionInfo, htd, -1); - } - public OpenMetaHandler(final Server server, - final RegionServerServices rsServices, HRegionInfo regionInfo, - final HTableDescriptor htd, int versionOfOfflineNode) { + final HTableDescriptor htd, OpenRegionCoordination coordination, + OpenRegionCoordination.OpenRegionDetails ord) { super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META, - versionOfOfflineNode); + coordination, ord); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index 212a557..07235f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -32,10 +32,9 @@ import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.zookeeper.KeeperException; + /** * Handles opening of a region on a region server. *
@@ -50,34 +49,27 @@ public class OpenRegionHandler extends EventHandler { private final HRegionInfo regionInfo; private final HTableDescriptor htd; - // We get version of our znode at start of open process and monitor it across - // the total open. We'll fail the open if someone hijacks our znode; we can - // tell this has happened if version is not as expected. - private volatile int version = -1; - //version of the offline node that was set by the master - private volatile int versionOfOfflineNode = -1; + private OpenRegionCoordination coordination; + private OpenRegionCoordination.OpenRegionDetails ord; public OpenRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, - HTableDescriptor htd) { - this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1); - } - public OpenRegionHandler(final Server server, - final RegionServerServices rsServices, HRegionInfo regionInfo, - HTableDescriptor htd, int versionOfOfflineNode) { + HTableDescriptor htd, OpenRegionCoordination coordination, + OpenRegionCoordination.OpenRegionDetails ord) { this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, - versionOfOfflineNode); + coordination, ord); } protected OpenRegionHandler(final Server server, final RegionServerServices rsServices, final HRegionInfo regionInfo, final HTableDescriptor htd, EventType eventType, - final int versionOfOfflineNode) { + OpenRegionCoordination coordination, OpenRegionCoordination.OpenRegionDetails ord) { super(server, eventType); this.rsServices = rsServices; this.regionInfo = regionInfo; this.htd = htd; - this.versionOfOfflineNode = versionOfOfflineNode; + this.coordination = coordination; + this.ord = ord; } public HRegionInfo getRegionInfo() { @@ -112,13 +104,13 @@ public class OpenRegionHandler extends EventHandler { // Check that we're still supposed to open the region and transition. // If fails, just return. Someone stole the region from under us. - // Calling transitionZookeeperOfflineToOpening initializes this.version. + // Calling transitionFromOfflineToOpening initializes this.version. if (!isRegionStillOpening()){ LOG.error("Region " + encodedName + " opening cancelled"); return; } - if (!transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) { + if (!coordination.transitionFromOfflineToOpening(regionInfo, ord)) { LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName); // This is a desperate attempt: the znode is unlikely to be ours. But we can't do more. return; @@ -132,7 +124,7 @@ public class OpenRegionHandler extends EventHandler { } boolean failed = true; - if (tickleOpening("post_region_open")) { + if (coordination.tickleOpening(ord, regionInfo, rsServices, "post_region_open")) { if (updateMeta(region)) { failed = false; } @@ -142,8 +134,7 @@ public class OpenRegionHandler extends EventHandler { return; } - - if (!isRegionStillOpening() || !transitionToOpened(region)) { + if (!isRegionStillOpening() || !coordination.transitionToOpened(region, ord)) { // If we fail to transition to opened, it's because of one of two cases: // (a) we lost our ZK lease // OR (b) someone else opened the region before us @@ -173,7 +164,7 @@ public class OpenRegionHandler extends EventHandler { } finally { // Do all clean up here if (!openSuccessful) { - doCleanUpOnFailedOpen(region, transitionedToOpening); + doCleanUpOnFailedOpen(region, transitionedToOpening, ord); } final Boolean current = this.rsServices.getRegionsInTransitionInRS(). remove(this.regionInfo.getEncodedNameAsBytes()); @@ -200,7 +191,8 @@ public class OpenRegionHandler extends EventHandler { } } - private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening) + private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening, + OpenRegionCoordination.OpenRegionDetails ord) throws IOException { if (transitionedToOpening) { try { @@ -210,12 +202,12 @@ public class OpenRegionHandler extends EventHandler { } finally { // Even if cleanupFailed open fails we need to do this transition // See HBASE-7698 - tryTransitionFromOpeningToFailedOpen(regionInfo); + coordination.tryTransitionFromOpeningToFailedOpen(regionInfo, ord); } } else { // If still transition to OPENING is not done, we need to transition znode // to FAILED_OPEN - tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, versionOfOfflineNode); + coordination.tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, ord); } } @@ -250,7 +242,7 @@ public class OpenRegionHandler extends EventHandler { if (elapsed > 120000) { // 2 minutes, no need to tickleOpening too often // Only tickle OPENING if postOpenDeployTasks is taking some time. lastUpdate = now; - tickleOpening = tickleOpening("post_open_deploy"); + tickleOpening = coordination.tickleOpening(ord, regionInfo, rsServices, "post_open_deploy"); } synchronized (signaller) { try { @@ -314,7 +306,7 @@ public class OpenRegionHandler extends EventHandler { try { this.services.postOpenDeployTasks(this.region, this.server.getCatalogTracker()); - } catch (KeeperException e) { + } catch (IOException e) { server.abort("Exception running postOpenDeployTasks; region=" + this.region.getRegionInfo().getEncodedName(), e); } catch (Throwable e) { @@ -337,131 +329,22 @@ public class OpenRegionHandler extends EventHandler { } } - - /** - * @param r Region we're working on. - * @return whether znode is successfully transitioned to OPENED state. - * @throws IOException - */ - boolean transitionToOpened(final HRegion r) throws IOException { - boolean result = false; - HRegionInfo hri = r.getRegionInfo(); - final String name = hri.getRegionNameAsString(); - // Finally, Transition ZK node to OPENED - try { - if (ZKAssign.transitionNodeOpened(this.server.getZooKeeper(), hri, - this.server.getServerName(), this.version) == -1) { - String warnMsg = "Completed the OPEN of region " + name + - " but when transitioning from " + " OPENING to OPENED "; - try { - String node = ZKAssign.getNodeName(this.server.getZooKeeper(), hri.getEncodedName()); - if (ZKUtil.checkExists(this.server.getZooKeeper(), node) < 0) { - // if the znode - rsServices.abort(warnMsg + "the znode disappeared", null); - } else { - LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " + - "so now unassigning -- closing region on server: " + this.server.getServerName()); - } - } catch (KeeperException ke) { - rsServices.abort(warnMsg, ke); - } - } else { - LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() + - " to OPENED in zk on " + this.server.getServerName()); - result = true; - } - } catch (KeeperException e) { - LOG.error("Failed transitioning node " + name + - " from OPENING to OPENED -- closing region", e); - } - return result; - } - - /** - * This is not guaranteed to succeed, we just do our best. - * @param hri Region we're working on. - * @return whether znode is successfully transitioned to FAILED_OPEN state. - */ - private boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri) { - boolean result = false; - final String name = hri.getRegionNameAsString(); - try { - LOG.info("Opening of region " + hri + " failed, transitioning" + - " from OPENING to FAILED_OPEN in ZK, expecting version " + this.version); - if (ZKAssign.transitionNode( - this.server.getZooKeeper(), hri, - this.server.getServerName(), - EventType.RS_ZK_REGION_OPENING, - EventType.RS_ZK_REGION_FAILED_OPEN, - this.version) == -1) { - LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " + - "It's likely that the master already timed out this open " + - "attempt, and thus another RS already has the region."); - } else { - result = true; - } - } catch (KeeperException e) { - LOG.error("Failed transitioning node " + name + - " from OPENING to FAILED_OPEN", e); - } - return result; - } - - /** - * Try to transition to open. This function is static to make it usable before creating the - * handler. - * - * This is not guaranteed to succeed, we just do our best. - * - * @param rsServices - * @param hri Region we're working on. - * @param versionOfOfflineNode version to checked. - * @return whether znode is successfully transitioned to FAILED_OPEN state. - */ - public static boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices, - final HRegionInfo hri, final int versionOfOfflineNode) { - boolean result = false; - final String name = hri.getRegionNameAsString(); - try { - LOG.info("Opening of region " + hri + " failed, transitioning" + - " from OFFLINE to FAILED_OPEN in ZK, expecting version " + versionOfOfflineNode); - if (ZKAssign.transitionNode( - rsServices.getZooKeeper(), hri, - rsServices.getServerName(), - EventType.M_ZK_REGION_OFFLINE, - EventType.RS_ZK_REGION_FAILED_OPEN, - versionOfOfflineNode) == -1) { - LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " + - "It's likely that the master already timed out this open " + - "attempt, and thus another RS already has the region."); - } else { - result = true; - } - } catch (KeeperException e) { - LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e); - } - return result; - } - - /** * @return Instance of HRegion if successful open else null. */ HRegion openRegion() { HRegion region = null; try { - // Instantiate the region. This also periodically tickles our zk OPENING + // Instantiate the region. This also periodically tickles OPENING // state so master doesn't timeout this region in transition. region = HRegion.openHRegion(this.regionInfo, this.htd, - this.rsServices.getWAL(this.regionInfo), - this.server.getConfiguration(), - this.rsServices, + this.rsServices.getWAL(this.regionInfo), + this.server.getConfiguration(), + this.rsServices, new CancelableProgressable() { public boolean progress() { - // We may lose the znode ownership during the open. Currently its - // too hard interrupting ongoing region open. Just let it complete - // and check we still have the znode after region open. - return tickleOpening("open_region_progress"); + // if tickle failed, we need to cancel opening region. + return coordination.tickleOpening(ord, regionInfo, rsServices, "open_region_progress"); } }); } catch (Throwable t) { @@ -495,70 +378,4 @@ public class OpenRegionHandler extends EventHandler { Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName); return Boolean.TRUE.equals(action); // true means opening for RIT } - - /** - * Transition ZK node from OFFLINE to OPENING. - * @param encodedName Name of the znode file (Region encodedName is the znode - * name). - * @param versionOfOfflineNode - version Of OfflineNode that needs to be compared - * before changing the node's state from OFFLINE - * @return True if successful transition. - */ - boolean transitionZookeeperOfflineToOpening(final String encodedName, - int versionOfOfflineNode) { - // TODO: should also handle transition from CLOSED? - try { - // Initialize the znode version. - this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo, - server.getServerName(), EventType.M_ZK_REGION_OFFLINE, - EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode); - } catch (KeeperException e) { - LOG.error("Error transition from OFFLINE to OPENING for region=" + - encodedName, e); - this.version = -1; - return false; - } - boolean b = isGoodVersion(); - if (!b) { - LOG.warn("Failed transition from OFFLINE to OPENING for region=" + - encodedName); - } - return b; - } - - /** - * Update our OPENING state in zookeeper. - * Do this so master doesn't timeout this region-in-transition. - * @param context Some context to add to logs if failure - * @return True if successful transition. - */ - boolean tickleOpening(final String context) { - if (!isRegionStillOpening()) { - LOG.warn("Open region aborted since it isn't opening any more"); - return false; - } - // If previous checks failed... do not try again. - if (!isGoodVersion()) return false; - String encodedName = this.regionInfo.getEncodedName(); - try { - this.version = - ZKAssign.confirmNodeOpening(server.getZooKeeper(), - this.regionInfo, this.server.getServerName(), this.version); - } catch (KeeperException e) { - server.abort("Exception refreshing OPENING; region=" + encodedName + - ", context=" + context, e); - this.version = -1; - return false; - } - boolean b = isGoodVersion(); - if (!b) { - LOG.warn("Failed refreshing OPENING; region=" + encodedName + - ", context=" + context); - } - return b; - } - - private boolean isGoodVersion() { - return this.version != -1; - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index c7d3f1f..f5e8952 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -54,6 +54,9 @@ import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; +import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; @@ -255,7 +258,7 @@ public class TestAssignmentManager { } @Test(timeout = 60000) - public void testBalanceOnMasterFailoverScenarioWithClosedNode() + public void testBalanceOnMasterFailoverScenarioWithClosedNode() throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException, CoordinatedStateException { AssignmentManagerWithExtrasForTesting am = @@ -876,9 +879,19 @@ public class TestAssignmentManager { am.getRegionStates().createRegionState(REGIONINFO); am.gate.set(false); CatalogTracker ct = Mockito.mock(CatalogTracker.class); - assertFalse(am.processRegionsInTransition(rt, REGIONINFO, version)); - am.getTableStateManager().setTableState(REGIONINFO.getTable(), - Table.State.ENABLED); + + BaseCoordinatedStateManager cp = new ZkCoordinatedStateManager(); + cp.initialize(server); + cp.start(); + + OpenRegionCoordination orc = cp.getOpenRegionCoordination(); + ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkOrd.setServerName(server.getServerName()); + zkOrd.setVersion(version); + + assertFalse(am.processRegionsInTransition(rt, REGIONINFO, orc, zkOrd)); + am.getTableStateManager().setTableState(REGIONINFO.getTable(), Table.State.ENABLED); processServerShutdownHandler(ct, am, false); // Waiting for the assignment to get completed. while (!am.gate.get()) { @@ -1357,8 +1370,9 @@ public class TestAssignmentManager { this.serverManager, ct, balancer, null, null, master.getTableLockManager()) { @Override - void handleRegion(final RegionTransition rt, int expectedVersion) { - super.handleRegion(rt, expectedVersion); + void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination, + OpenRegionCoordination.OpenRegionDetails ord) { + super.handleRegion(rt, coordination, ord); if (rt != null && Bytes.equals(hri.getRegionName(), rt.getRegionName()) && rt.getEventType() == EventType.RS_ZK_REGION_OPENING) { zkEventProcessed.set(true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java index 7bd7893..e91b1c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java @@ -31,6 +31,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -137,8 +141,17 @@ public class TestOpenedRegionHandler { ZKUtil.getDataAndWatch(zkw, nodeName, stat); // use the version for the OpenedRegionHandler + BaseCoordinatedStateManager csm = new ZkCoordinatedStateManager(); + csm.initialize(server); + csm.start(); + + OpenRegionCoordination orc = csm.getOpenRegionCoordination(); + ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkOrd.setServerName(server.getServerName()); + zkOrd.setVersion(stat.getVersion()); OpenedRegionHandler handler = new OpenedRegionHandler(server, am, region - .getRegionInfo(), server.getServerName(), stat.getVersion()); + .getRegionInfo(), orc, zkOrd); // Once again overwrite the same znode so that the version changes. ZKAssign.transitionNode(zkw, region.getRegionInfo(), server .getServerName(), EventType.RS_ZK_REGION_OPENED, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java index b7cc51a..2d94a77 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java @@ -31,6 +31,9 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -341,7 +344,18 @@ public class TestRegionServerNoMaster { // Let's start the open handler HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTable()); - getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 0)); + + BaseCoordinatedStateManager csm = new ZkCoordinatedStateManager(); + csm.initialize(getRS()); + csm.start(); + + ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkCrd.setServerName(getRS().getServerName()); + zkCrd.setVersionOfOfflineNode(0); + + getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, + csm.getOpenRegionCoordination(), zkCrd)); // The open handler should have removed the region from RIT but kept the region closed checkRegionIsClosed(); @@ -395,7 +409,18 @@ public class TestRegionServerNoMaster { // 2) The region in RIT was changed. // The order is more or less implementation dependant. HTableDescriptor htd = getRS().tableDescriptors.get(hri.getTable()); - getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, 0)); + + BaseCoordinatedStateManager csm = new ZkCoordinatedStateManager(); + csm.initialize(getRS()); + csm.start(); + + ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkCrd.setServerName(getRS().getServerName()); + zkCrd.setVersionOfOfflineNode(0); + + getRS().service.submit(new OpenRegionHandler(getRS(), getRS(), hri, htd, + csm.getOpenRegionCoordination(), zkCrd)); // The open handler should have removed the region from RIT but kept the region closed checkRegionIsClosed(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java index 6793ff5..43d5875 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventType; @@ -138,13 +139,13 @@ public class TestCloseRegionHandler { HRegion.closeHRegion(region); } } - + /** * Test if close region can handle ZK closing node version mismatch * @throws IOException * @throws NodeExistsException * @throws KeeperException - * @throws DeserializationException + * @throws DeserializationException */ @Test public void testZKClosingNodeVersionMismatch() throws IOException, NodeExistsException, KeeperException, DeserializationException { @@ -153,38 +154,38 @@ public class TestCloseRegionHandler { HTableDescriptor htd = TEST_HTD; final HRegionInfo hri = TEST_HRI; - + + ZkCoordinatedStateManager coordinationProvider = new ZkCoordinatedStateManager(); + coordinationProvider.initialize(server); + coordinationProvider.start(); + // open a region first so that it can be closed later - OpenRegion(server, rss, htd, hri); - + OpenRegion(server, rss, htd, hri, coordinationProvider.getOpenRegionCoordination()); + // close the region // Create it CLOSING, which is what Master set before sending CLOSE RPC int versionOfClosingNode = ZKAssign.createNodeClosing(server.getZooKeeper(), hri, server.getServerName()); - + // The CloseRegionHandler will validate the expected version // Given it is set to invalid versionOfClosingNode+1, // CloseRegionHandler should be M_ZK_REGION_CLOSING - ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager(); - consensusProvider.initialize(server); - consensusProvider.start(); - ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd = new ZkCloseRegionCoordination.ZkCloseRegionDetails(); zkCrd.setPublishStatusInZk(true); zkCrd.setExpectedVersion(versionOfClosingNode+1); CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false, - consensusProvider.getCloseRegionCoordination(), zkCrd); + coordinationProvider.getCloseRegionCoordination(), zkCrd); handler.process(); - + // Handler should remain in M_ZK_REGION_CLOSING RegionTransition rt = RegionTransition.parseFrom(ZKAssign.getData(server.getZooKeeper(), hri.getEncodedName())); assertTrue(rt.getEventType().equals(EventType.M_ZK_REGION_CLOSING )); } - + /** * Test if the region can be closed properly * @throws IOException @@ -196,33 +197,33 @@ public class TestCloseRegionHandler { throws IOException, NodeExistsException, KeeperException, DeserializationException { final Server server = new MockServer(HTU); final RegionServerServices rss = HTU.createMockRegionServerService(); - + HTableDescriptor htd = TEST_HTD; HRegionInfo hri = TEST_HRI; - + + ZkCoordinatedStateManager coordinationProvider = new ZkCoordinatedStateManager(); + coordinationProvider.initialize(server); + coordinationProvider.start(); + // open a region first so that it can be closed later - OpenRegion(server, rss, htd, hri); - + OpenRegion(server, rss, htd, hri, coordinationProvider.getOpenRegionCoordination()); + // close the region // Create it CLOSING, which is what Master set before sending CLOSE RPC int versionOfClosingNode = ZKAssign.createNodeClosing(server.getZooKeeper(), hri, server.getServerName()); - + // The CloseRegionHandler will validate the expected version // Given it is set to correct versionOfClosingNode, // CloseRegionHandlerit should be RS_ZK_REGION_CLOSED - ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager(); - consensusProvider.initialize(server); - consensusProvider.start(); - ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd = new ZkCloseRegionCoordination.ZkCloseRegionDetails(); zkCrd.setPublishStatusInZk(true); zkCrd.setExpectedVersion(versionOfClosingNode); CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false, - consensusProvider.getCloseRegionCoordination(), zkCrd); + coordinationProvider.getCloseRegionCoordination(), zkCrd); handler.process(); // Handler should have transitioned it to RS_ZK_REGION_CLOSED RegionTransition rt = RegionTransition.parseFrom( @@ -231,11 +232,15 @@ public class TestCloseRegionHandler { } private void OpenRegion(Server server, RegionServerServices rss, - HTableDescriptor htd, HRegionInfo hri) + HTableDescriptor htd, HRegionInfo hri, OpenRegionCoordination coordination) throws IOException, NodeExistsException, KeeperException, DeserializationException { // Create it OFFLINE node, which is what Master set before sending OPEN RPC ZKAssign.createNodeOffline(server.getZooKeeper(), hri, server.getServerName()); - OpenRegionHandler openHandler = new OpenRegionHandler(server, rss, hri, htd); + + OpenRegionCoordination.OpenRegionDetails ord = + coordination.getDetailsForNonCoordinatedOpening(); + OpenRegionHandler openHandler = + new OpenRegionHandler(server, rss, hri, htd, coordination, ord); rss.getRegionsInTransitionInRS().put(hri.getEncodedNameAsBytes(), Boolean.TRUE); openHandler.process(); // This parse is not used? diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java index 7c7cf5a..869d727 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java @@ -25,6 +25,8 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -96,7 +98,16 @@ public class TestOpenRegionHandler { .getConfiguration(), htd); assertNotNull(region); try { - OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd) { + ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager(); + csm.initialize(server); + csm.start(); + + ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkCrd.setServerName(server.getServerName()); + + OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, + htd, csm.getOpenRegionCoordination(), zkCrd) { HRegion openRegion() { // Open region first, then remove znode as though it'd been hijacked. HRegion region = super.openRegion(); @@ -150,10 +161,22 @@ public class TestOpenRegionHandler { .getConfiguration(), htd); assertNotNull(region); try { - OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd) { - boolean transitionToOpened(final HRegion r) throws IOException { + + ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager(); + csm.initialize(server); + csm.start(); + + ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkCrd.setServerName(server.getServerName()); + + ZkOpenRegionCoordination openRegionCoordination = + new ZkOpenRegionCoordination(csm, server.getZooKeeper()) { + @Override + public boolean transitionToOpened(final HRegion r, OpenRegionDetails ord) + throws IOException { // remove znode simulating intermittent zookeeper connection issue - ZooKeeperWatcher zkw = this.server.getZooKeeper(); + ZooKeeperWatcher zkw = server.getZooKeeper(); String node = ZKAssign.getNodeName(zkw, hri.getEncodedName()); try { ZKUtil.deleteNodeFailSilent(zkw, node); @@ -161,9 +184,12 @@ public class TestOpenRegionHandler { throw new RuntimeException("Ugh failed delete of " + node, e); } // then try to transition to OPENED - return super.transitionToOpened(r); + return super.transitionToOpened(r, ord); } }; + + OpenRegionHandler handler = new OpenRegionHandler(server, rss, hri, htd, + openRegionCoordination, zkCrd); rss.getRegionsInTransitionInRS().put( hri.getEncodedNameAsBytes(), Boolean.TRUE); // Call process without first creating OFFLINE region in zk, see if @@ -182,7 +208,7 @@ public class TestOpenRegionHandler { // Region server is expected to abort due to OpenRegionHandler perceiving transitioning // to OPENED as failed // This was corresponding to the second handler.process() call above. - assertTrue("region server should have aborted", rss.isAborted()); + assertTrue("region server should have aborted", server.isAborted()); } @Test @@ -193,9 +219,18 @@ public class TestOpenRegionHandler { // Create it OFFLINE, which is what it expects ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName()); + ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager(); + csm.initialize(server); + csm.start(); + + ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkCrd.setServerName(server.getServerName()); + // Create the handler OpenRegionHandler handler = - new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) { + new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, + csm.getOpenRegionCoordination(), zkCrd) { @Override HRegion openRegion() { // Fake failure of opening a region due to an IOE, which is caught @@ -221,8 +256,16 @@ public class TestOpenRegionHandler { ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName()); // Create the handler - OpenRegionHandler handler = - new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) { + ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager(); + csm.initialize(server); + csm.start(); + + ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkCrd.setServerName(server.getServerName()); + + OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, + csm.getOpenRegionCoordination(), zkCrd) { @Override boolean updateMeta(final HRegion r) { // Fake failure of updating META @@ -246,7 +289,16 @@ public class TestOpenRegionHandler { // Create it OFFLINE, which is what it expects ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName()); // Create the handler - OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) { + ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager(); + csm.initialize(server); + csm.start(); + + ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkCrd.setServerName(server.getServerName()); + + OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, + csm.getOpenRegionCoordination(), zkCrd) { @Override boolean updateMeta(HRegion r) { return false; @@ -275,13 +327,25 @@ public class TestOpenRegionHandler { // Create it OFFLINE, which is what it expects ZKAssign.createNodeOffline(server.getZooKeeper(), TEST_HRI, server.getServerName()); // Create the handler - OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD) { + ZkCoordinatedStateManager csm = new ZkCoordinatedStateManager(); + csm.initialize(server); + csm.start(); + ZkOpenRegionCoordination.ZkOpenRegionDetails zkCrd = + new ZkOpenRegionCoordination.ZkOpenRegionDetails(); + zkCrd.setServerName(server.getServerName()); + + ZkOpenRegionCoordination openRegionCoordination = + new ZkOpenRegionCoordination(csm, server.getZooKeeper()) { @Override - boolean transitionZookeeperOfflineToOpening(String encodedName, int versionOfOfflineNode) { + public boolean transitionFromOfflineToOpening(HRegionInfo regionInfo, + OpenRegionDetails ord) { return false; } }; + + OpenRegionHandler handler = new OpenRegionHandler(server, rsServices, TEST_HRI, TEST_HTD, + openRegionCoordination, zkCrd); rsServices.getRegionsInTransitionInRS().put(TEST_HRI.getEncodedNameAsBytes(), Boolean.TRUE); handler.process();