Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1161985) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -332,7 +332,7 @@ * @param region * region to open * @return RegionOpeningState - * OPENED - if region opened succesfully. + * OPENED - if region open request was successful. * ALREADY_OPENED - if the region was already opened. * FAILED_OPENING - if region opening failed. * @@ -341,6 +341,22 @@ public RegionOpeningState openRegion(final HRegionInfo region) throws IOException; /** + * Opens the specified region. + * @param region + * region to open + * @param versionOfOfflineNode + * the version of znode to compare when RS transitions the znode from + * OFFLINE state. + * @return RegionOpeningState + * OPENED - if region open request was successful. + * ALREADY_OPENED - if the region was already opened. + * FAILED_OPENING - if region opening failed. + * @throws IOException + */ + public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode) + throws IOException; + + /** * Opens the specified regions. * @param regions regions to open * @throws IOException Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1161985) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -146,6 +146,9 @@ private final ExecutorService executorService; + //Thread pool executor service for timeout monitor + private java.util.concurrent.ExecutorService threadPoolExecutorService; + /** * Constructs a new assignment manager. * @@ -157,7 +160,8 @@ * @throws IOException */ public AssignmentManager(Server master, ServerManager serverManager, - CatalogTracker catalogTracker, final ExecutorService service) + CatalogTracker catalogTracker, final ExecutorService service, + final java.util.concurrent.ExecutorService threadPoolExecutorService) throws KeeperException, IOException { super(master.getZooKeeper()); this.master = master; @@ -175,6 +179,7 @@ this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); this.balancer = LoadBalancerFactory.getLoadBalancer(conf); + this.threadPoolExecutorService = threadPoolExecutorService; } /** @@ -398,9 +403,20 @@ // Just insert region into RIT // If this never updates the timeout will trigger new assignment - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.OPENING, - data.getStamp(), data.getOrigin())); + if (regionInfo.isMetaRegion() || regionInfo.isRootRegion()) { + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.OPENING, data.getStamp(), data + .getOrigin())); + // If ROOT or .META. table is waiting for timeout monitor to assign + // it may take lot of time when the assignment.timeout.period is + // the default value which may be very long. We will not be able + // to serve any request during this time. + // So we will assign the ROOT and .META. region immediately. + processOpeningState(regionInfo); + break; + } + regionsInTransition.put(encodedRegionName, new RegionState(regionInfo, + RegionState.State.OPENING, data.getStamp(), data.getOrigin())); break; case RS_ZK_REGION_OPENED: @@ -1019,12 +1035,21 @@ public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan) { - String tableName = region.getTableNameAsString(); - boolean disabled = this.zkTable.isDisabledTable(tableName); - if (disabled || this.zkTable.isDisablingTable(tableName)) { - LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") + - " skipping assign of " + region.getRegionNameAsString()); - offlineDisabledRegion(region); + assign(region, setOfflineInZK, forceNewPlan, false); + } + + /** + * @param region + * @param setOfflineInZK + * @param forceNewPlan + * @param reassign + * - true if timeout monitor calls assign + */ + public void assign(HRegionInfo region, boolean setOfflineInZK, + boolean forceNewPlan, boolean reassign) { + //If reassign is true do not call disableRegionIfInRIT as + // we have not yet moved the znode to OFFLINE state. + if (!reassign && disableRegionIfInRIT(region)) { return; } if (this.serverManager.isClusterShutdown()) { @@ -1032,9 +1057,10 @@ region.getRegionNameAsString()); return; } - RegionState state = addToRegionsInTransition(region); + RegionState state = addToRegionsInTransition(region, + reassign); synchronized (state) { - assign(state, setOfflineInZK, forceNewPlan); + assign(region, state, setOfflineInZK, forceNewPlan, reassign); } } @@ -1192,26 +1218,54 @@ * @return The current RegionState */ private RegionState addToRegionsInTransition(final HRegionInfo region) { + return addToRegionsInTransition(region, false); + } + /** + * @param region + * @param reassign + * @return The current RegionState + */ + private RegionState addToRegionsInTransition(final HRegionInfo region, + boolean reassign) { synchronized (regionsInTransition) { - return forceRegionStateToOffline(region); + return forceRegionStateToOffline(region, reassign); } } + /** + * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}. + * Caller must hold lock on this.regionsInTransition. + * @param region + * @return Amended RegionState. + */ + private RegionState forceRegionStateToOffline(final HRegionInfo region) { + return forceRegionStateToOffline(region, false); + } /** * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}. * Caller must hold lock on this.regionsInTransition. * @param region + * @param reassign * @return Amended RegionState. */ - private RegionState forceRegionStateToOffline(final HRegionInfo region) { + private RegionState forceRegionStateToOffline(final HRegionInfo region, + boolean reassign) { String encodedName = region.getEncodedName(); RegionState state = this.regionsInTransition.get(encodedName); if (state == null) { state = new RegionState(region, RegionState.State.OFFLINE); this.regionsInTransition.put(encodedName, state); } else { - LOG.debug("Forcing OFFLINE; was=" + state); - state.update(RegionState.State.OFFLINE); + // If invoked from timeout monitor do not force in-memory to OFFLINE. + // Based on the + // znode state we will decide if to change in-memory state to OFFLINE or + // not. It + // will + // be done before setting the znode to OFFLINE state. + if (!reassign) { + LOG.debug("Forcing OFFLINE; was=" + state); + state.update(RegionState.State.OFFLINE); + } } return state; } @@ -1221,11 +1275,29 @@ * @param state * @param setOfflineInZK * @param forceNewPlan + * @param reassign */ - private void assign(final RegionState state, final boolean setOfflineInZK, - final boolean forceNewPlan) { + private void assign(final HRegionInfo region, final RegionState state, + final boolean setOfflineInZK, final boolean forceNewPlan, + boolean reassign) { for (int i = 0; i < this.maximumAssignmentAttempts; i++) { - if (setOfflineInZK && !setOfflineInZooKeeper(state)) return; + int versionOfOfflineNode = -1; + if (setOfflineInZK) { + // get the version of the znode after setting it to OFFLINE. + // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE + versionOfOfflineNode = setOfflineInZooKeeper(state, + reassign); + if(versionOfOfflineNode != -1){ + if (disableRegionIfInRIT(region)) { + return; + } + } + } + + if (setOfflineInZK && versionOfOfflineNode == -1) { + return; + } + if (this.master.isStopped()) { LOG.debug("Server stopped; skipping assign of " + state); return; @@ -1239,8 +1311,9 @@ state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(), plan.getDestination()); // Send OPEN RPC. This can fail if the server on other end is is not up. + // Pass the version that was obtained while setting the node to OFFLINE. RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan - .getDestination(), state.getRegion()); + .getDestination(), state.getRegion(), versionOfOfflineNode); if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { // Remove region from in-memory transition and unassigned node from ZK // While trying to enable the table the regions of the table were @@ -1286,31 +1359,73 @@ } } + private boolean disableRegionIfInRIT(final HRegionInfo region) { + String tableName = region.getTableNameAsString(); + boolean disabled = this.zkTable.isDisabledTable(tableName); + if (disabled || this.zkTable.isDisablingTable(tableName)) { + LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") + + " skipping assign of " + region.getRegionNameAsString()); + offlineDisabledRegion(region); + return true; + } + return false; + } + /** * Set region as OFFLINED up in zookeeper + * * @param state - * @return True if we succeeded, false otherwise (State was incorrect or failed - * updating zk). + * @param reassign + * - true if comes from timeout monitor, false otherwise + * @return the version of the offline node if setting of the OFFLINE node was + * successful, -1 otherwise. */ - boolean setOfflineInZooKeeper(final RegionState state) { - if (!state.isClosed() && !state.isOffline()) { + int setOfflineInZooKeeper(final RegionState state, + boolean reassign) { + // If invoked from timeoutmonitor the current state in memory need not be + // OFFLINE. + if (!reassign && !state.isClosed() && !state.isOffline()) { this.master.abort("Unexpected state trying to OFFLINE; " + state, - new IllegalStateException()); - return false; + new IllegalStateException()); + return -1; } - state.update(RegionState.State.OFFLINE); + boolean allowZNodeCreation = false; + // If reassign is true and the current state is PENDING_OPEN + // or OPENING then refresh the in-memory state to PENDING_OPEN. This is + // important because + // if timeoutmonitor deduces that a region was in RS_OPENING state for a long + // time but when the master forces + // the znode to OFFLINE state the RS could have opened + // the corresponding region and the + // state in znode will be RS_ZK_REGION_OPENED. The + // OpenedRegionHandler + // expects the in-memory state to be PENDING_OPEN or OPENING. + // For all other cases we can change the in-memory state to OFFLINE. + if (reassign && + (state.getState().equals(RegionState.State.PENDING_OPEN) || + state.getState().equals(RegionState.State.OPENING))) { + state.update(RegionState.State.PENDING_OPEN); + allowZNodeCreation = false; + } else { + state.update(RegionState.State.OFFLINE); + allowZNodeCreation = true; + } + int versionOfOfflineNode = -1; try { - if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), - state.getRegion(), this.master.getServerName())) { - LOG.warn("Attempted to create/force node into OFFLINE state before " + - "completing assignment but failed to do so for " + state); - return false; + // get the version after setting the znode to OFFLINE + versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), + state.getRegion(), this.master.getServerName(), + reassign, allowZNodeCreation); + if (versionOfOfflineNode == -1) { + LOG.warn("Attempted to create/force node into OFFLINE state before " + + "completing assignment but failed to do so for " + state); + return -1; } } catch (KeeperException e) { master.abort("Unexpected ZK exception creating/setting node OFFLINE", e); - return false; + return -1; } - return true; + return versionOfOfflineNode; } /** @@ -2134,126 +2249,127 @@ long now = System.currentTimeMillis(); for (RegionState regionState : regionsInTransition.values()) { if (regionState.getStamp() + timeout <= now) { - HRegionInfo regionInfo = regionState.getRegion(); - LOG.info("Regions in transition timed out: " + regionState); - // Expired! Do a retry. - switch (regionState.getState()) { - case CLOSED: - LOG.info("Region " + regionInfo.getEncodedName() + - " has been CLOSED for too long, waiting on queued " + - "ClosedRegionHandler to run or server shutdown"); - // Update our timestamp. - regionState.updateTimestampToNow(); - break; - case OFFLINE: - LOG.info("Region has been OFFLINE for too long, " + - "reassigning " + regionInfo.getRegionNameAsString() + - " to a random server"); - assigns.put(regionState.getRegion(), Boolean.FALSE); - break; - case PENDING_OPEN: - LOG.info("Region has been PENDING_OPEN for too " + - "long, reassigning region=" + - regionInfo.getRegionNameAsString()); - assigns.put(regionState.getRegion(), Boolean.TRUE); - break; - case OPENING: - LOG.info("Region has been OPENING for too " + - "long, reassigning region=" + - regionInfo.getRegionNameAsString()); - // Should have a ZK node in OPENING state - try { - String node = ZKAssign.getNodeName(watcher, - regionInfo.getEncodedName()); - Stat stat = new Stat(); - RegionTransitionData data = ZKAssign.getDataNoWatch(watcher, - node, stat); - if (data == null) { - LOG.warn("Data is null, node " + node + " no longer exists"); - break; - } - if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) { - LOG.debug("Region has transitioned to OPENED, allowing " + - "watched event handlers to process"); - break; - } else if (data.getEventType() != - EventType.RS_ZK_REGION_OPENING) { - LOG.warn("While timing out a region in state OPENING, " + - "found ZK node in unexpected state: " + - data.getEventType()); - break; - } - // Attempt to transition node into OFFLINE - try { - data = new RegionTransitionData( - EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(), - master.getServerName()); - if (ZKUtil.setData(watcher, node, data.getBytes(), - stat.getVersion())) { - // Node is now OFFLINE, let's trigger another assignment - ZKUtil.getDataAndWatch(watcher, node); // re-set the watch - LOG.info("Successfully transitioned region=" + - regionInfo.getRegionNameAsString() + " into OFFLINE" + - " and forcing a new assignment"); - assigns.put(regionState.getRegion(), Boolean.TRUE); - } - } catch (KeeperException.NoNodeException nne) { - // Node did not exist, can't time this out - } - } catch (KeeperException ke) { - LOG.error("Unexpected ZK exception timing out CLOSING region", - ke); - break; - } - break; - case OPEN: - LOG.error("Region has been OPEN for too long, " + - "we don't know where region was opened so can't do anything"); - synchronized(regionState) { - regionState.updateTimestampToNow(); - } - break; + //decide on action upon timeout + actOnTimeOut(regionState); + } + } + } + } - case PENDING_CLOSE: - LOG.info("Region has been PENDING_CLOSE for too " + - "long, running forced unassign again on region=" + - regionInfo.getRegionNameAsString()); - try { - // If the server got the RPC, it will transition the node - // to CLOSING, so only do something here if no node exists - if (!ZKUtil.watchAndCheckExists(watcher, - ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) { - // Queue running of an unassign -- do actual unassign - // outside of the regionsInTransition lock. - unassigns.add(regionInfo); - } - } catch (NoNodeException e) { - LOG.debug("Node no longer existed so not forcing another " + - "unassignment"); - } catch (KeeperException e) { - LOG.warn("Unexpected ZK exception timing out a region " + - "close", e); - } - break; - case CLOSING: - LOG.info("Region has been CLOSING for too " + - "long, this should eventually complete or the server will " + - "expire, doing nothing"); - break; - } + private void actOnTimeOut(RegionState regionState) { + HRegionInfo regionInfo = regionState.getRegion(); + LOG.info("Regions in transition timed out: " + regionState); + // Expired! Do a retry. + switch (regionState.getState()) { + case CLOSED: + LOG.info("Region " + regionInfo.getEncodedName() + + " has been CLOSED for too long, waiting on queued " + + "ClosedRegionHandler to run or server shutdown"); + // Update our timestamp. + regionState.updateTimestampToNow(); + break; + case OFFLINE: + LOG.info("Region has been OFFLINE for too long, " + "reassigning " + + regionInfo.getRegionNameAsString() + " to a random server"); + invokeTimeOutManager(regionState.getRegion(), + TimeOutOperationType.ASSIGN); + break; + case PENDING_OPEN: + LOG.info("Region has been PENDING_OPEN for too " + + "long, reassigning region=" + regionInfo.getRegionNameAsString()); + invokeTimeOutManager(regionState.getRegion(), + TimeOutOperationType.ASSIGN); + break; + case OPENING: + processOpeningState(regionInfo); + break; + case OPEN: + LOG.error("Region has been OPEN for too long, " + + "we don't know where region was opened so can't do anything"); + synchronized (regionState) { + regionState.updateTimestampToNow(); + } + break; + + case PENDING_CLOSE: + LOG.info("Region has been PENDING_CLOSE for too " + + "long, running forced unassign again on region=" + + regionInfo.getRegionNameAsString()); + try { + // If the server got the RPC, it will transition the node + // to CLOSING, so only do something here if no node exists + if (!ZKUtil.watchAndCheckExists(watcher, + ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) { + // Queue running of an unassign -- do actual unassign + // outside of the regionsInTransition lock. + invokeTimeOutManager(regionState.getRegion(), + TimeOutOperationType.UNASSIGN); } + } catch (NoNodeException e) { + LOG.debug("Node no longer existed so not forcing another " + + "unassignment"); + } catch (KeeperException e) { + LOG.warn("Unexpected ZK exception timing out a region close", e); } + break; + case CLOSING: + LOG.info("Region has been CLOSING for too " + + "long, this should eventually complete or the server will " + + "expire, doing nothing"); + break; } - // Finish the work for regions in PENDING_CLOSE state - for (HRegionInfo hri: unassigns) { - unassign(hri, true); + } + } + + /** + * The type of operation that has to performed on TimeOut. + * This is not an inmemory state. Just an enum to determine whether + * the operation to be taken after timeout is to assign the region + * or unassign the region. + * ASSIGN - need to assign a region to an RS + * UNASSIGN - need to unassign a region + */ + public static enum TimeOutOperationType { + ASSIGN, UNASSIGN; + } + + private void processOpeningState(HRegionInfo regionInfo) { + LOG.info("Region has been OPENING for too " + "long, reassigning region=" + + regionInfo.getRegionNameAsString()); + // Should have a ZK node in OPENING state + try { + String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()); + Stat stat = new Stat(); + RegionTransitionData dataInZNode = ZKAssign.getDataNoWatch(watcher, node, + stat); + if (dataInZNode == null) { + LOG.warn("Data is null, node " + node + " no longer exists"); + return; } - for (Map.Entry e: assigns.entrySet()){ - assign(e.getKey(), false, e.getValue()); + if (dataInZNode.getEventType() == EventType.RS_ZK_REGION_OPENED) { + LOG.debug("Region has transitioned to OPENED, allowing " + + "watched event handlers to process"); + return; + } else if (dataInZNode.getEventType() != EventType.RS_ZK_REGION_OPENING) { + LOG.warn("While timing out a region in state OPENING, " + + "found ZK node in unexpected state: " + + dataInZNode.getEventType()); + return; } + // do not make the znode to OFFLINE. The timeoutManager will do it + invokeTimeOutManager(regionInfo, TimeOutOperationType.ASSIGN); + } catch (KeeperException ke) { + LOG.error("Unexpected ZK exception timing out CLOSING region", ke); + return; } + return; } - + private void invokeTimeOutManager(HRegionInfo hri, + TimeOutOperationType operation) { + TimeOutManagerCallable timeOutManager = new TimeOutManagerCallable(this, + hri, operation); + threadPoolExecutorService.submit(timeOutManager); + } /** * Process shutdown server removing any assignments. * @param sn Server that went down. Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1161985) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -181,6 +182,11 @@ private final ServerName serverName; private TableDescriptors tableDescriptors; + + // thread pool exceutor for timeout monitor. Passed from HMaster so that can + // be properly + // shudown. + private java.util.concurrent.ExecutorService threadPoolExecutorService; /** * Initializes the HMaster. The steps are as follows: @@ -352,8 +358,9 @@ this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); this.catalogTracker.start(); + threadPoolExecutorService = Executors.newCachedThreadPool(); this.assignmentManager = new AssignmentManager(this, serverManager, - this.catalogTracker, this.executorService); + this.catalogTracker, this.executorService, threadPoolExecutorService); this.balancer = LoadBalancerFactory.getLoadBalancer(conf); zooKeeper.registerListenerFirst(assignmentManager); Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 1161985) +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -392,8 +392,11 @@ *

* @param server server to open a region * @param region region to open + * @param versionOfOfflineNode that needs to be present in the offline node + * when RS tries to change the state from OFFLINE to other states. */ - public RegionOpeningState sendRegionOpen(final ServerName server, HRegionInfo region) + public RegionOpeningState sendRegionOpen(final ServerName server, + HRegionInfo region, int versionOfOfflineNode) throws IOException { HRegionInterface hri = getServerConnection(server); if (hri == null) { @@ -401,7 +404,11 @@ " failed because no RPC connection found to this server"); return RegionOpeningState.FAILED_OPENING; } - return hri.openRegion(region); + if (versionOfOfflineNode == -1) { + return hri.openRegion(region); + } else { + return hri.openRegion(region, versionOfOfflineNode); + } } /** Index: src/main/java/org/apache/hadoop/hbase/master/TimeOutManagerCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/TimeOutManagerCallable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/TimeOutManagerCallable.java (revision 0) @@ -0,0 +1,62 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.master.AssignmentManager.TimeOutOperationType; + +/** + * A callable object that invokes the corresponding action that needs to be + * taken when timeout thread deducts a region was in tranisition for a long + * time. Implementing as future callable we are able to act on the timeout + * asynchronoulsy + * + */ +public class TimeOutManagerCallable implements Callable { + + private AssignmentManager assignmentManager; + + private HRegionInfo hri; + + private TimeOutOperationType operation; + + public TimeOutManagerCallable(AssignmentManager assignmentManager, + HRegionInfo hri, TimeOutOperationType operation) { + this.assignmentManager = assignmentManager; + this.hri = hri; + this.operation = operation; + } + + @Override + public Object call() throws Exception { + if (TimeOutOperationType.ASSIGN.equals(operation)) { + assignmentManager.assign(hri, true, true, true); + } else { + assignmentManager.unassign(hri); + } + return null; + } + +} + + + Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java (revision 1161985) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java (working copy) @@ -33,6 +33,12 @@ public OpenMetaHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, final HTableDescriptor htd) { - super(server,rsServices, regionInfo, htd, EventType.M_RS_OPEN_META); + this(server, rsServices, regionInfo, htd, -1); } + public OpenMetaHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + final HTableDescriptor htd, int versionOfOfflineNode) { + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META, + versionOfOfflineNode); + } } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 1161985) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -52,20 +52,30 @@ // the total open. We'll fail the open if someone hijacks our znode; we can // tell this has happened if version is not as expected. private volatile int version = -1; + //version of the offline node that was set by the master + private volatile int versionOfOfflineNode = -1; public OpenRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, HTableDescriptor htd) { - this (server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION); + this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1); } + public OpenRegionHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + HTableDescriptor htd, int versionOfOfflineNode) { + this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, + versionOfOfflineNode); + } protected OpenRegionHandler(final Server server, final RegionServerServices rsServices, final HRegionInfo regionInfo, - final HTableDescriptor htd, EventType eventType) { + final HTableDescriptor htd, EventType eventType, + final int versionOfOfflineNode) { super(server, eventType); this.rsServices = rsServices; this.regionInfo = regionInfo; this.htd = htd; + this.versionOfOfflineNode = versionOfOfflineNode; } public HRegionInfo getRegionInfo() { @@ -86,7 +96,8 @@ // If fails, just return. Someone stole the region from under us. // Calling transitionZookeeperOfflineToOpening initalizes this.version. - if (!transitionZookeeperOfflineToOpening(encodedName)) { + if (!transitionZookeeperOfflineToOpening(encodedName, + versionOfOfflineNode)) { LOG.warn("Region was hijacked? It no longer exists, encodedName=" + encodedName); return; @@ -325,15 +336,18 @@ * Transition ZK node from OFFLINE to OPENING. * @param encodedName Name of the znode file (Region encodedName is the znode * name). + * @param versionOfOfflineNode - version Of OfflineNode that needs to be compared + * before changing the node's state from OFFLINE * @return True if successful transition. */ - boolean transitionZookeeperOfflineToOpening(final String encodedName) { + boolean transitionZookeeperOfflineToOpening(final String encodedName, + int versionOfOfflineNode) { // TODO: should also handle transition from CLOSED? try { // Initialize the znode version. - this.version = - ZKAssign.transitionNodeOpening(server.getZooKeeper(), - regionInfo, server.getServerName()); + this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo, + server.getServerName(), EventType.M_ZK_REGION_OFFLINE, + EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode); } catch (KeeperException e) { LOG.error("Error transition from OFFLINE to OPENING for region=" + encodedName, e); Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java (revision 1161985) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java (working copy) @@ -33,6 +33,12 @@ public OpenRootHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, final HTableDescriptor htd) { - super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT); + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT, -1); } + public OpenRootHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + final HTableDescriptor htd, int versionOfOfflineNode) { + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT, + versionOfOfflineNode); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1161985) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2326,6 +2326,38 @@ } return RegionOpeningState.OPENED; } + @Override + @QosPriority(priority = HIGH_QOS) + public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode) + throws IOException { + checkOpen(); + if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { + throw new RegionAlreadyInTransitionException("open", region + .getEncodedName()); + } + HRegion onlineRegion = this.getFromOnlineRegions(region.getEncodedName()); + if (null != onlineRegion) { + LOG.warn("Attempted open of " + region.getEncodedName() + + " but already online on this server"); + return RegionOpeningState.ALREADY_OPENED; + } + LOG.info("Received request to open region: " + + region.getRegionNameAsString()); + this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes()); + HTableDescriptor htd = this.tableDescriptors.get(region.getTableName()); + // Need to pass the expected version in the constructor. + if (region.isRootRegion()) { + this.service.submit(new OpenRootHandler(this, this, region, htd, + versionOfOfflineNode)); + } else if (region.isMetaRegion()) { + this.service.submit(new OpenMetaHandler(this, this, region, htd, + versionOfOfflineNode)); + } else { + this.service.submit(new OpenRegionHandler(this, this, region, htd, + versionOfOfflineNode)); + } + return RegionOpeningState.OPENED; + } @Override @QosPriority(priority=HIGH_QOS) Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 1161985) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (working copy) @@ -203,7 +203,6 @@ ZKUtil.setData(zkw, node, data.getBytes()); } - /** * Creates or force updates an unassigned node to the OFFLINE state for the * specified region. @@ -222,33 +221,103 @@ * @throws KeeperException if unexpected zookeeper exception * @throws KeeperException.NodeExistsException if node already exists */ - public static boolean createOrForceNodeOffline(ZooKeeperWatcher zkw, - HRegionInfo region, ServerName serverName) + public static int createOrForceNodeOffline(ZooKeeperWatcher zkw, + HRegionInfo region, ServerName serverName) throws KeeperException { + return createOrForceNodeOffline(zkw, region, serverName, false, true); + } + + /** + * Creates or force updates an unassigned node to the OFFLINE state for the + * specified region. + *

+ * Attempts to create the node but if it exists will force it to transition to + * and OFFLINE state. + *

+ * Sets a watcher on the unassigned region node if the method is successful. + * + *

+ * This method should be used when assigning a region. + * + * @param zkw + * zk reference + * @param region + * region to be created as offline + * @param serverName + * server event originates from + * @param reassign + * - true if invoked from timeout monitor, false otherwise + * @param allowCreation + * - true if the node has to be created newly, false otherwise + * @throws KeeperException + * if unexpected zookeeper exception + * @throws KeeperException.NodeExistsException + * if node already exists + */ + public static int createOrForceNodeOffline(ZooKeeperWatcher zkw, + HRegionInfo region, ServerName serverName, + boolean reassign, boolean allowCreation) throws KeeperException { LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " + region.getEncodedName() + " with OFFLINE state")); RegionTransitionData data = new RegionTransitionData( EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName); String node = getNodeName(zkw, region.getEncodedName()); + Stat stat = new Stat(); zkw.sync(node); int version = ZKUtil.checkExists(zkw, node); if (version == -1) { - ZKUtil.createAndWatch(zkw, node, data.getBytes()); + // If timeoutmonitor deducts a node to be in OPENING state but before it + // could + // transit to OFFLINE state if RS had opened the region then the Master + // deletes the + // assigned region znode. In that case the znode will not exist. So we + // should not + // create the znode again which will lead to double assignment. + if (reassign && !allowCreation) { + return -1; + } + return ZKUtil.createAndWatch(zkw, node, data.getBytes()); } else { - if (!ZKUtil.setData(zkw, node, data.getBytes(), version)) { - return false; + RegionTransitionData curDataInZNode = ZKAssign.getDataNoWatch(zkw, region + .getEncodedName(), stat); + // Do not move the node to OFFLINE if znode is in any of the following + // state. + // Because these are already executed states. + if (reassign && null != curDataInZNode) { + EventType eventType = curDataInZNode.getEventType(); + if (eventType.equals(EventType.RS_ZK_REGION_CLOSING) + || eventType.equals(EventType.RS_ZK_REGION_CLOSED) + || eventType.equals(EventType.RS_ZK_REGION_OPENED)) { + return -1; + } + } + + boolean setData = false; + try { + setData = ZKUtil.setData(zkw, node, data.getBytes(), version); + // Setdata throws KeeperException which aborts the Master. So we are + // catching it here. + // If just before setting the znode to OFFLINE if the RS has made any + // change to the + // znode state then we need to return -1. + } catch (KeeperException kpe) { + LOG.info("Version mismatch while setting the node to OFFLINE state."); + return -1; + } + if (!setData) { + return -1; } else { // We successfully forced to OFFLINE, reset watch and handle if // the state changed in between our set and the watch RegionTransitionData curData = - ZKAssign.getData(zkw, region.getEncodedName()); + ZKAssign.getData(zkw, region.getEncodedName()); if (curData.getEventType() != data.getEventType()) { // state changed, need to process - return false; + return -1; } } } - return true; + return stat.getVersion() + 1; } /** @@ -673,6 +742,19 @@ "the node existed but was version " + stat.getVersion() + " not the expected version " + expectedVersion)); return -1; + }// the below check ensure that double assignment doesnot happen. + // When the node is created for the first time then the expected version + // that is + // passed will be -1 and the version in znode will be 0. + // In all other cases the version in znode will be > 0. + else if (beginState.equals(EventType.M_ZK_REGION_OFFLINE) + && endState.equals(EventType.RS_ZK_REGION_OPENING) + && expectedVersion == -1 && stat.getVersion() != 0) { + LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for " + + encoded + " from " + beginState + " to " + endState + " failed, " + + "the node existed but was version " + stat.getVersion() + + " not the expected version " + expectedVersion)); + return -1; } // Verify it is in expected state