Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1158263) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -340,6 +340,23 @@ public RegionOpeningState openRegion(final HRegionInfo region) throws IOException; /** + * Opens the specified region. + * + * @param region + * region to open + * @param expectedVersion + * the version to compare while creating the OFFLINE node. Passed by + * the master + * @return RegionOpeningState OPENED - if region opened succesfully. + * ALREADY_OPENED - if the region was already opened. FAILED_OPENING - + * if region opening failed. + * + * @throws IOException + */ + public RegionOpeningState openRegion(HRegionInfo region, int expectedVersion) + throws IOException; + + /** * Opens the specified regions. * @param regions regions to open * @throws IOException @@ -487,4 +504,6 @@ final byte[] family, final byte[] qualifier, final CompareOp compareOp, final WritableByteArrayComparable comparator, final Delete delete) throws IOException; + + } Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1158263) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -143,6 +143,8 @@ new TreeMap(); private final ExecutorService executorService; + + private java.util.concurrent.ExecutorService threadPoolExecutorService; /** * Constructs a new assignment manager. @@ -155,7 +157,8 @@ * @throws IOException */ public AssignmentManager(Server master, ServerManager serverManager, - CatalogTracker catalogTracker, final ExecutorService service) + CatalogTracker catalogTracker, final ExecutorService service, + final java.util.concurrent.ExecutorService threadPoolExecutorService) throws KeeperException, IOException { super(master.getZooKeeper()); this.master = master; @@ -172,6 +175,7 @@ this.zkTable = new ZKTable(this.master.getZooKeeper()); this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); + this.threadPoolExecutorService = threadPoolExecutorService; } /** @@ -390,14 +394,19 @@ break; case RS_ZK_REGION_OPENING: - // TODO: Could check if it was on deadServers. If it was, then we could + // TODO: Could check if it was on deadServers. If it was, then we could // do what happens in TimeoutMonitor when it sees this condition. // Just insert region into RIT // If this never updates the timeout will trigger new assignment - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.OPENING, - data.getStamp(), data.getOrigin())); + if (regionInfo.isMetaRegion() || regionInfo.isRootRegion()) { + regionsInTransition.put(encodedRegionName, new RegionState(regionInfo, + RegionState.State.OPENING, data.getStamp(), data.getOrigin())); + processOpeningState(regionInfo); + break; + } + regionsInTransition.put(encodedRegionName, new RegionState(regionInfo, + RegionState.State.OPENING, data.getStamp(), data.getOrigin())); break; case RS_ZK_REGION_OPENED: @@ -425,6 +434,38 @@ } } } + + private void processOpeningState(HRegionInfo regionInfo) { + LOG.info("Region has been OPENING for too " + "long, reassigning region=" + + regionInfo.getRegionNameAsString()); + // Should have a ZK node in OPENING state + try { + String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()); + Stat stat = new Stat(); + RegionTransitionData dataInZNode = ZKAssign.getDataNoWatch(watcher, node, + stat); + if (dataInZNode == null) { + LOG.warn("Data is null, node " + node + " no longer exists"); + return; + } + if (dataInZNode.getEventType() == EventType.RS_ZK_REGION_OPENED) { + LOG.debug("Region has transitioned to OPENED, allowing " + + "watched event handlers to process"); + return; + } else if (dataInZNode.getEventType() != EventType.RS_ZK_REGION_OPENING) { + LOG.warn("While timing out a region in state OPENING, " + + "found ZK node in unexpected state: " + + dataInZNode.getEventType()); + return; + } + // do not make the znode to OFFLINE. The timeoutManager will do it + invokeTimeOutManager(regionInfo, TimeOutOperationType.ASSIGN); + } catch (KeeperException ke) { + LOG.error("Unexpected ZK exception timing out CLOSING region", ke); + return; + } + return; + } /** * Put the region hri into an offline state up in zk. @@ -1014,24 +1055,48 @@ assign(region, setOfflineInZK, false); } + /** + * Used in normal flow to assign a region + * @param region + * @param setOfflineInZK + * @param forceNewPlan + */ public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan) { + assign(region, setOfflineInZK, forceNewPlan, false); + } + + /** + * Used to assign a new region. If invoked from TimeOutManagerCallable + * 'invokedFromTimeOutMonitor' will be true + * @param region + * @param setOfflineInZK + * @param forceNewPlan + * @param isReAllocate + */ + public void assign(HRegionInfo region, boolean setOfflineInZK, + boolean forceNewPlan, boolean isReAllocate) { String tableName = region.getTableNameAsString(); boolean disabled = this.zkTable.isDisabledTable(tableName); if (disabled || this.zkTable.isDisablingTable(tableName)) { - LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") + - " skipping assign of " + region.getRegionNameAsString()); + LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") + + " skipping assign of " + region.getRegionNameAsString()); offlineDisabledRegion(region); return; } if (this.serverManager.isClusterShutdown()) { - LOG.info("Cluster shutdown is set; skipping assign of " + - region.getRegionNameAsString()); + LOG.info("Cluster shutdown is set; skipping assign of " + + region.getRegionNameAsString()); return; } - RegionState state = addToRegionsInTransition(region); + RegionState state = addToRegionsInTransition(region, + isReAllocate); synchronized (state) { - assign(state, setOfflineInZK, forceNewPlan); + if (!isReAllocate) { + assign(state, setOfflineInZK, forceNewPlan); + } else { + assign(state, setOfflineInZK, forceNewPlan, isReAllocate); + } } } @@ -1189,66 +1254,119 @@ * @return The current RegionState */ private RegionState addToRegionsInTransition(final HRegionInfo region) { + return addToRegionsInTransition(region, false); + } + + /** + * @param region + * @param isReAllocate + * @return The current RegionState + */ + private RegionState addToRegionsInTransition(final HRegionInfo region, + boolean isReAllocate) { synchronized (regionsInTransition) { - return forceRegionStateToOffline(region); + return forceRegionStateToOffline(region, isReAllocate); } } /** * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}. * Caller must hold lock on this.regionsInTransition. + * * @param region + * @param invokedFromTimeOutMonitor * @return Amended RegionState. */ private RegionState forceRegionStateToOffline(final HRegionInfo region) { + return forceRegionStateToOffline(region, false); + } + + /** + * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}. + * Caller must hold lock on this.regionsInTransition. + * + * @param region + * @param isReAllocate + * @return Amended RegionState. + */ + private RegionState forceRegionStateToOffline(final HRegionInfo region, + boolean isReAllocate) { String encodedName = region.getEncodedName(); RegionState state = this.regionsInTransition.get(encodedName); if (state == null) { state = new RegionState(region, RegionState.State.OFFLINE); this.regionsInTransition.put(encodedName, state); } else { - LOG.debug("Forcing OFFLINE; was=" + state); - state.update(RegionState.State.OFFLINE); + // If invoked from timeout monitor donot force it to OFFLINE. Based on the + // state + // we will decide if to change in-memory state to OFFLINE or not. + if (!isReAllocate) { + LOG.debug("Forcing OFFLINE; was=" + state); + state.update(RegionState.State.OFFLINE); + } } return state; } /** * Caller must hold lock on the passed state object. + * * @param state * @param setOfflineInZK * @param forceNewPlan */ private void assign(final RegionState state, final boolean setOfflineInZK, final boolean forceNewPlan) { + assign(state, setOfflineInZK, forceNewPlan, false); + } + + /** + * Caller must hold lock on the passed state object. + * @param state + * @param setOfflineInZK + * @param forceNewPlan + */ + private void assign(final RegionState state, final boolean setOfflineInZK, + final boolean forceNewPlan, boolean isReAllocate) { for (int i = 0; i < this.maximumAssignmentAttempts; i++) { - if (setOfflineInZK && !setOfflineInZooKeeper(state)) return; + int versionOfOfflineNode = -1; + if (setOfflineInZK) { + // get the version of the znode after setting it to OFFLINE. + // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE + versionOfOfflineNode = setOfflineInZooKeeper(state, + isReAllocate); + } + if (setOfflineInZK && versionOfOfflineNode == -1) + return; if (this.master.isStopped()) { LOG.debug("Server stopped; skipping assign of " + state); return; } RegionPlan plan = getRegionPlan(state, forceNewPlan); - if (plan == null) return; // Should get reassigned later when RIT times out. + if (plan == null) + return; // Should get reassigned later when RIT times out. try { - LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() + - " to " + plan.getDestination().toString()); + LOG.debug("Assigning region " + + state.getRegion().getRegionNameAsString() + " to " + + plan.getDestination().toString()); // Transition RegionState to PENDING_OPEN - state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(), - plan.getDestination()); + state.update(RegionState.State.PENDING_OPEN, + System.currentTimeMillis(), plan.getDestination()); // Send OPEN RPC. This can fail if the server on other end is is not up. RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan - .getDestination(), state.getRegion()); + .getDestination(), state.getRegion(), versionOfOfflineNode); if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { // Remove region from in-memory transition and unassigned node from ZK // While trying to enable the table the regions of the table were // already enabled. - String encodedRegionName = state.getRegion() - .getEncodedName(); + String encodedRegionName = state.getRegion().getEncodedName(); try { - ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName); + ZKAssign + .deleteOfflineNode(master.getZooKeeper(), encodedRegionName); } catch (KeeperException.NoNodeException e) { - if(LOG.isDebugEnabled()){ - LOG.debug("The unassigned node "+encodedRegionName+" doesnot exist."); + if (LOG.isDebugEnabled()) { + LOG.debug("The unassigned node " + encodedRegionName + + " doesnot exist."); } } catch (KeeperException e) { master.abort( @@ -1265,18 +1383,18 @@ } break; } catch (Throwable t) { - LOG.warn("Failed assignment of " + - state.getRegion().getRegionNameAsString() + " to " + - plan.getDestination() + ", trying to assign elsewhere instead; " + - "retry=" + i, t); + LOG.warn("Failed assignment of " + + state.getRegion().getRegionNameAsString() + " to " + + plan.getDestination() + ", trying to assign elsewhere instead; " + + "retry=" + i, t); // Clean out plan we failed execute and one that doesn't look like it'll // succeed anyways; we need a new plan! // Transition back to OFFLINE state.update(RegionState.State.OFFLINE); - // Force a new plan and reassign. Will return null if no servers. + // Force a new plan and reassign. Will return null if no servers. if (getRegionPlan(state, plan.getDestination(), true) == null) { - LOG.warn("Unable to find a viable location to assign region " + - state.getRegion().getRegionNameAsString()); + LOG.warn("Unable to find a viable location to assign region " + + state.getRegion().getRegionNameAsString()); return; } } @@ -1285,29 +1403,56 @@ /** * Set region as OFFLINED up in zookeeper + * * @param state - * @return True if we succeeded, false otherwise (State was incorrect or failed - * updating zk). + * @return Returns the version if we succeeded, -1 otherwise (State was + * incorrect or failed updating zk). */ - boolean setOfflineInZooKeeper(final RegionState state) { - if (!state.isClosed() && !state.isOffline()) { + int setOfflineInZooKeeper(final RegionState state, final boolean isReAllocate) { + // If invoked from timeoutmonitor the current state in memory need not be + // OFFLINE + if (!isReAllocate && !state.isClosed() && !state.isOffline()) { this.master.abort("Unexpected state trying to OFFLINE; " + state, - new IllegalStateException()); - return false; + new IllegalStateException()); + return -1; } - state.update(RegionState.State.OFFLINE); + boolean allowCreation = false; + // If the isReAllocate is true and the current state is PENDING_OPEN + // or OPENING then update the inmemory state to PENDING_OPEN. This is + // important because + // if timeoutmonitor deducts that a region was in OPENING state for a long + // time but by the + // time timeout monitor tranits the node to OFFLINE the RS would have opened + // the node and the + // state in znode will be RS_ZK_REGION_OPENED. Inorder to invoke the + // OpenedRegionHandler + // we expect the inmemeory state to be PENDING_OPEN or OPENING. + // For all other cases we can change the inmemory state to OFFLINE. + if (isReAllocate + && (state.getState().equals(RegionState.State.PENDING_OPEN) || state + .getState().equals(RegionState.State.OPENING))) { + state.update(RegionState.State.PENDING_OPEN); + allowCreation = false; + } else { + state.update(RegionState.State.OFFLINE); + allowCreation = true; + } + int versionOfOfflineNode = 0; try { - if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), - state.getRegion(), this.master.getServerName())) { - LOG.warn("Attempted to create/force node into OFFLINE state before " + - "completing assignment but failed to do so for " + state); - return false; + //get the version after setting the znode to OFFLINE + versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(master + .getZooKeeper(), state.getRegion(), this.master.getServerName(), + isReAllocate, allowCreation); + if (versionOfOfflineNode == -1) { + LOG.warn("Attempted to create/force node into OFFLINE state before " + + "completing assignment but failed to do so for " + state); + return -1; } } catch (KeeperException e) { master.abort("Unexpected ZK exception creating/setting node OFFLINE", e); - return false; + return -1; } - return true; + return versionOfOfflineNode; } /** @@ -2131,127 +2276,96 @@ long now = System.currentTimeMillis(); for (RegionState regionState : regionsInTransition.values()) { if (regionState.getStamp() + timeout <= now) { - HRegionInfo regionInfo = regionState.getRegion(); - LOG.info("Regions in transition timed out: " + regionState); - // Expired! Do a retry. - switch (regionState.getState()) { - case CLOSED: - LOG.info("Region " + regionInfo.getEncodedName() + - " has been CLOSED for too long, waiting on queued " + - "ClosedRegionHandler to run or server shutdown"); - // Update our timestamp. - regionState.updateTimestampToNow(); - break; - case OFFLINE: - LOG.info("Region has been OFFLINE for too long, " + - "reassigning " + regionInfo.getRegionNameAsString() + - " to a random server"); - assigns.put(regionState.getRegion(), Boolean.FALSE); - break; - case PENDING_OPEN: - LOG.info("Region has been PENDING_OPEN for too " + - "long, reassigning region=" + - regionInfo.getRegionNameAsString()); - assigns.put(regionState.getRegion(), Boolean.TRUE); - break; - case OPENING: - LOG.info("Region has been OPENING for too " + - "long, reassigning region=" + - regionInfo.getRegionNameAsString()); - // Should have a ZK node in OPENING state - try { - String node = ZKAssign.getNodeName(watcher, - regionInfo.getEncodedName()); - Stat stat = new Stat(); - RegionTransitionData data = ZKAssign.getDataNoWatch(watcher, - node, stat); - if (data == null) { - LOG.warn("Data is null, node " + node + " no longer exists"); - break; - } - if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) { - LOG.debug("Region has transitioned to OPENED, allowing " + - "watched event handlers to process"); - break; - } else if (data.getEventType() != - EventType.RS_ZK_REGION_OPENING) { - LOG.warn("While timing out a region in state OPENING, " + - "found ZK node in unexpected state: " + - data.getEventType()); - break; - } - // Attempt to transition node into OFFLINE - try { - data = new RegionTransitionData( - EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(), - master.getServerName()); - if (ZKUtil.setData(watcher, node, data.getBytes(), - stat.getVersion())) { - // Node is now OFFLINE, let's trigger another assignment - ZKUtil.getDataAndWatch(watcher, node); // re-set the watch - LOG.info("Successfully transitioned region=" + - regionInfo.getRegionNameAsString() + " into OFFLINE" + - " and forcing a new assignment"); - assigns.put(regionState.getRegion(), Boolean.TRUE); - } - } catch (KeeperException.NoNodeException nne) { - // Node did not exist, can't time this out - } - } catch (KeeperException ke) { - LOG.error("Unexpected ZK exception timing out CLOSING region", - ke); - break; - } - break; - case OPEN: - LOG.error("Region has been OPEN for too long, " + - "we don't know where region was opened so can't do anything"); - synchronized(regionState) { - regionState.updateTimestampToNow(); - } - break; + //decide on action upon timeout + actOnTimeOut(unassigns, assigns, regionState); + } + } + } + } - case PENDING_CLOSE: - LOG.info("Region has been PENDING_CLOSE for too " + - "long, running forced unassign again on region=" + - regionInfo.getRegionNameAsString()); - try { - // If the server got the RPC, it will transition the node - // to CLOSING, so only do something here if no node exists - if (!ZKUtil.watchAndCheckExists(watcher, - ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) { - // Queue running of an unassign -- do actual unassign - // outside of the regionsInTransition lock. - unassigns.add(regionInfo); - } - } catch (NoNodeException e) { - LOG.debug("Node no longer existed so not forcing another " + - "unassignment"); - } catch (KeeperException e) { - LOG.warn("Unexpected ZK exception timing out a region " + - "close", e); - } - break; - case CLOSING: - LOG.info("Region has been CLOSING for too " + - "long, this should eventually complete or the server will " + - "expire, doing nothing"); - break; - } + private void actOnTimeOut(List unassigns, + Map assigns, RegionState regionState) { + HRegionInfo regionInfo = regionState.getRegion(); + LOG.info("Regions in transition timed out: " + regionState); + // Expired! Do a retry. + switch (regionState.getState()) { + case CLOSED: + LOG.info("Region " + regionInfo.getEncodedName() + + " has been CLOSED for too long, waiting on queued " + + "ClosedRegionHandler to run or server shutdown"); + // Update our timestamp. + regionState.updateTimestampToNow(); + break; + case OFFLINE: + LOG.info("Region has been OFFLINE for too long, " + "reassigning " + + regionInfo.getRegionNameAsString() + " to a random server"); + invokeTimeOutManager(regionState.getRegion(), + TimeOutOperationType.ASSIGN); + break; + case PENDING_OPEN: + LOG.info("Region has been PENDING_OPEN for too " + + "long, reassigning region=" + regionInfo.getRegionNameAsString()); + invokeTimeOutManager(regionState.getRegion(), + TimeOutOperationType.ASSIGN); + break; + case OPENING: + processOpeningState(regionInfo); + break; + case OPEN: + LOG.error("Region has been OPEN for too long, " + + "we don't know where region was opened so can't do anything"); + synchronized (regionState) { + regionState.updateTimestampToNow(); + } + break; + + case PENDING_CLOSE: + LOG.info("Region has been PENDING_CLOSE for too " + + "long, running forced unassign again on region=" + + regionInfo.getRegionNameAsString()); + try { + // If the server got the RPC, it will transition the node + // to CLOSING, so only do something here if no node exists + if (!ZKUtil.watchAndCheckExists(watcher, ZKAssign.getNodeName( + watcher, regionInfo.getEncodedName()))) { + // Queue running of an unassign -- do actual unassign + // outside of the regionsInTransition lock. + // unassigns.add(regionInfo); + invokeTimeOutManager(regionState.getRegion(), + TimeOutOperationType.UNASSIGN); } + } catch (NoNodeException e) { + LOG.debug("Node no longer existed so not forcing another " + + "unassignment"); + } catch (KeeperException e) { + LOG.warn("Unexpected ZK exception timing out a region " + "close", e); } + break; + case CLOSING: + LOG.info("Region has been CLOSING for too " + + "long, this should eventually complete or the server will " + + "expire, doing nothing"); + break; } - // Finish the work for regions in PENDING_CLOSE state - for (HRegionInfo hri: unassigns) { - unassign(hri, true); - } - for (Map.Entry e: assigns.entrySet()){ - assign(e.getKey(), false, e.getValue()); - } } } + /** + * The type of operation that has to performed on TimeOut ASSIGN - need to + * assign a region to an RS UNASSIGN - need to unassign a region + */ + public static enum TimeOutOperationType { + ASSIGN, UNASSIGN; + } + + private void invokeTimeOutManager(HRegionInfo hri, + TimeOutOperationType operation) { + TimeOutManagerCallable timeOutManager = new TimeOutManagerCallable(this, + hri, operation); + threadPoolExecutorService.submit(timeOutManager); + } + /** * Process shutdown server removing any assignments. * @param sn Server that went down. * @return list of regions in transition on this server Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1158263) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -181,6 +182,8 @@ private final ServerName serverName; private TableDescriptors tableDescriptors; + + private java.util.concurrent.ExecutorService threadPoolExecutorService; /** * Initializes the HMaster. The steps are as follows: @@ -352,8 +355,9 @@ this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); this.catalogTracker.start(); + threadPoolExecutorService = Executors.newCachedThreadPool(); this.assignmentManager = new AssignmentManager(this, serverManager, - this.catalogTracker, this.executorService); + this.catalogTracker, this.executorService, threadPoolExecutorService); this.balancer = new LoadBalancer(conf); zooKeeper.registerListenerFirst(assignmentManager); Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 1158263) +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -390,18 +390,27 @@ *

* Open should not fail but can if server just crashed. *

- * @param server server to open a region - * @param region region to open + * + * @param server + * server to open a region + * @param region + * region to open + * @param expectedVersion + * version of znode sent from the master */ - public RegionOpeningState sendRegionOpen(final ServerName server, HRegionInfo region) - throws IOException { + public RegionOpeningState sendRegionOpen(final ServerName server, + HRegionInfo region, int expectedVersion) throws IOException { HRegionInterface hri = getServerConnection(server); if (hri == null) { - LOG.warn("Attempting to send OPEN RPC to server " + server.toString() + - " failed because no RPC connection found to this server"); + LOG.warn("Attempting to send OPEN RPC to server " + server.toString() + + " failed because no RPC connection found to this server"); return RegionOpeningState.FAILED_OPENING; } - return hri.openRegion(region); + if (expectedVersion == -1) { + return hri.openRegion(region); + } else { + return hri.openRegion(region, expectedVersion); + } } /** Index: src/main/java/org/apache/hadoop/hbase/master/TimeOutManagerCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/TimeOutManagerCallable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/TimeOutManagerCallable.java (revision 0) @@ -0,0 +1,54 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; +import org.apache.hadoop.hbase.master.AssignmentManager.TimeOutOperationType; + +public class TimeOutManagerCallable implements Callable { + + private AssignmentManager assignmentManager; + + private HRegionInfo hri; + + private TimeOutOperationType operation; + + public TimeOutManagerCallable(AssignmentManager assignmentManager, + HRegionInfo hri, TimeOutOperationType operation) { + this.assignmentManager = assignmentManager; + this.hri = hri; + this.operation = operation; + } + + @Override + public Object call() throws Exception { + if (TimeOutOperationType.ASSIGN.equals(operation)) { + assignmentManager.assign(hri, true, true, true); + } else { + assignmentManager.unassign(hri); + } + return null; + } + +} + Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java (revision 1158263) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java (working copy) @@ -33,6 +33,13 @@ public OpenMetaHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, final HTableDescriptor htd) { - super(server,rsServices, regionInfo, htd, EventType.M_RS_OPEN_META); + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META, -1); } + + public OpenMetaHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + final HTableDescriptor htd, int version) { + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META, + version); + } } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 1158263) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -46,26 +47,36 @@ private final RegionServerServices rsServices; private final HRegionInfo regionInfo; - private final HTableDescriptor htd; + private final HTableDescriptor htd; // We get version of our znode at start of open process and monitor it across // the total open. We'll fail the open if someone hijacks our znode; we can // tell this has happened if version is not as expected. private volatile int version = -1; + + private volatile int versionOfOfflineNode = -1; public OpenRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, HTableDescriptor htd) { - this (server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION); + this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1); } + public OpenRegionHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + HTableDescriptor htd, int versionOfOfflineNode) { + this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, + versionOfOfflineNode); + } + protected OpenRegionHandler(final Server server, final RegionServerServices rsServices, final HRegionInfo regionInfo, - final HTableDescriptor htd, EventType eventType) { + final HTableDescriptor htd, EventType eventType, final int versionOfOfflineNode) { super(server, eventType); this.rsServices = rsServices; this.regionInfo = regionInfo; this.htd = htd; + this.versionOfOfflineNode = versionOfOfflineNode; } public HRegionInfo getRegionInfo() { @@ -86,9 +97,10 @@ // If fails, just return. Someone stole the region from under us. // Calling transitionZookeeperOfflineToOpening initalizes this.version. - if (!transitionZookeeperOfflineToOpening(encodedName)) { - LOG.warn("Region was hijacked? It no longer exists, encodedName=" + - encodedName); + if (!transitionZookeeperOfflineToOpening(encodedName, + versionOfOfflineNode)) { + LOG.warn("Region was hijacked? It no longer exists, encodedName=" + + encodedName); return; } @@ -325,18 +337,19 @@ * Transition ZK node from OFFLINE to OPENING. * @param encodedName Name of the znode file (Region encodedName is the znode * name). + * @param versionOfOfflineNode * @return True if successful transition. */ - boolean transitionZookeeperOfflineToOpening(final String encodedName) { + boolean transitionZookeeperOfflineToOpening(final String encodedName, + int versionOfOfflineNode) { // TODO: should also handle transition from CLOSED? try { - // Initialize the znode version. - this.version = - ZKAssign.transitionNodeOpening(server.getZooKeeper(), - regionInfo, server.getServerName()); + this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo, + server.getServerName(), EventType.M_ZK_REGION_OFFLINE, + EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode); } catch (KeeperException e) { - LOG.error("Error transition from OFFLINE to OPENING for region=" + - encodedName, e); + LOG.error("Error transition from OFFLINE to OPENING for region=" + + encodedName, e); } boolean b = isGoodVersion(); if (!b) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java (revision 1158263) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java (working copy) @@ -33,6 +33,13 @@ public OpenRootHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo, final HTableDescriptor htd) { - super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT); + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT, -1); } + + public OpenRootHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + final HTableDescriptor htd, int version) { + super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT, + version); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1158263) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -3024,4 +3024,35 @@ new HRegionServerCommandLine(regionServerClass).doMain(args); } + + @Override + public RegionOpeningState openRegion(HRegionInfo region, int expectedVersion) + throws IOException { + checkOpen(); + if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { + throw new RegionAlreadyInTransitionException("open", region.getEncodedName()); + } + HRegion onlineRegion = this.getFromOnlineRegions(region.getEncodedName()); + if (null != onlineRegion) { + LOG.warn("Attempted open of " + region.getEncodedName() + + " but already online on this server"); + return RegionOpeningState.ALREADY_OPENED; + } + LOG.info("Received request to open region: " + + region.getRegionNameAsString()); + this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes()); + HTableDescriptor htd = this.tableDescriptors.get(region.getTableName()); + //Need to pass the expected version in the constructor. + if (region.isRootRegion()) { + this.service.submit(new OpenRootHandler(this, this, region, htd, + expectedVersion)); + } else if (region.isMetaRegion()) { + this.service.submit(new OpenMetaHandler(this, this, region, htd, + expectedVersion)); + } else { + this.service.submit(new OpenRegionHandler(this, this, region, htd, + expectedVersion)); + } + return RegionOpeningState.OPENED; + } } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 1158263) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (working copy) @@ -203,7 +203,6 @@ ZKUtil.setData(zkw, node, data.getBytes()); } - /** * Creates or force updates an unassigned node to the OFFLINE state for the * specified region. @@ -222,8 +221,34 @@ * @throws KeeperException if unexpected zookeeper exception * @throws KeeperException.NodeExistsException if node already exists */ - public static boolean createOrForceNodeOffline(ZooKeeperWatcher zkw, - HRegionInfo region, ServerName serverName) + public static int createOrForceNodeOffline(ZooKeeperWatcher zkw, + HRegionInfo region, ServerName serverName) throws KeeperException { + return createOrForceNodeOffline(zkw, region, serverName, false, true); + } + + /** + * Creates or force updates an unassigned node to the OFFLINE state for the + * specified region. + *

+ * Attempts to create the node but if it exists will force it to transition to + * and OFFLINE state. + * + *

Sets a watcher on the unassigned region node if the method is + * successful. + * + *

This method should be used when assigning a region. + *

returns the version of the znode after transiting it to OFFLINE state. + * Any exceptions while setting will return -1. + * + * @param zkw zk reference + * @param region region to be created as offline + * @param serverName server event originates from + * @throws KeeperException if unexpected zookeeper exception + * @throws KeeperException.NodeExistsException if node already exists + */ + public static int createOrForceNodeOffline(ZooKeeperWatcher zkw, + HRegionInfo region, ServerName serverName, boolean isReAllocate, + boolean allowCreation) throws KeeperException { LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " + region.getEncodedName() + " with OFFLINE state")); @@ -231,24 +256,61 @@ EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName); String node = getNodeName(zkw, region.getEncodedName()); zkw.sync(node); + Stat stat = new Stat(); int version = ZKUtil.checkExists(zkw, node); if (version == -1) { - ZKUtil.createAndWatch(zkw, node, data.getBytes()); + // If timeoutmonitor deducts a node to be in OPENING state but before it + // could + // transit to OFFLINE state if RS had opened the region then the Master + // deletes the + // assigned region znode. In that case the znode will not exist. So we + // should not + // create the znode again which will lead to double assignment. + if (isReAllocate && !allowCreation) { + return -1; + } + return ZKUtil.createAndWatch(zkw, node, data.getBytes()); } else { - if (!ZKUtil.setData(zkw, node, data.getBytes(), version)) { - return false; + RegionTransitionData curDataInZNode = ZKAssign.getDataNoWatch(zkw, region + .getEncodedName(), stat); + // Do not move the node to OFFLINE if znode is in any of the following + // state. + // Because these are already executed states. + if (isReAllocate && null != curDataInZNode) { + EventType eventType = curDataInZNode.getEventType(); + if (eventType.equals(EventType.RS_ZK_REGION_CLOSING) + || eventType.equals(EventType.RS_ZK_REGION_CLOSED) + || eventType.equals(EventType.RS_ZK_REGION_OPENED)) { + return -1; + } + } + + boolean setData = false; + try { + setData = ZKUtil.setData(zkw, node, data.getBytes(), version); + // Setdata throws KeeperException which aborts the Master. So we are + // catching it here. + // If just before setting the znode to OFFLINE if the RS has made any + // change to the + // znode state then we need to return -1. + } catch (KeeperException kpe) { + LOG.info("Version mismatch while setting the node to OFFLINE state."); + return -1; + } + if (!setData) { + return -1; } else { // We successfully forced to OFFLINE, reset watch and handle if // the state changed in between our set and the watch RegionTransitionData curData = - ZKAssign.getData(zkw, region.getEncodedName()); + ZKAssign.getDataNoWatch(zkw, region.getEncodedName(), stat); if (curData.getEventType() != data.getEventType()) { // state changed, need to process - return false; + return -1; } } } - return true; + return stat.getVersion(); } /** @@ -665,15 +727,29 @@ RegionTransitionData existingData = RegionTransitionData.fromBytes(existingBytes); + // Verify it is the expected version - if(expectedVersion != -1 && stat.getVersion() != expectedVersion) { - LOG.warn(zkw.prefix("Attempt to transition the " + - "unassigned node for " + encoded + - " from " + beginState + " to " + endState + " failed, " + - "the node existed but was version " + stat.getVersion() + - " not the expected version " + expectedVersion)); - return -1; + if (expectedVersion != -1 && stat.getVersion() != expectedVersion) { + LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for " + + encoded + " from " + beginState + " to " + endState + " failed, " + + "the node existed but was version " + stat.getVersion() + + " not the expected version " + expectedVersion)); + return -1; + }// the below check ensure that double assignment doesnot happen. + // When the node is created for the first time then the expected version + // that is + // passed will be -1 and the version in znode will be 0. + // In all other cases the version in znode will be > 0. + else if (beginState.equals(EventType.M_ZK_REGION_OFFLINE) + && endState.equals(EventType.RS_ZK_REGION_OPENING) + && expectedVersion == -1 && stat.getVersion() != 0) { + LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for " + + encoded + " from " + beginState + " to " + endState + " failed, " + + "the node existed but was version " + stat.getVersion() + + " not the expected version " + expectedVersion)); + return -1; } + // Verify it is in expected state if(!existingData.getEventType().equals(beginState)) {