Index: src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (revision 1172608) +++ src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (working copy) @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.util.Bytes; @@ -203,6 +204,7 @@ addToMap(Increment.class, code++); addToMap(KeyOnlyFilter.class, code++); + addToMap(RegionOpeningState.class, code++); } Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1172608) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.ipc.RemoteException; @@ -311,10 +312,37 @@ /** * Opens the specified region. - * @param region region to open + * + * @param region + * region to open + * @param versionOfOfflineNode + * the version of znode to compare when RS transitions the znode from + * OFFLINE state. + * @return RegionOpeningState + * OPENED - if region open request was successful. + * ALREADY_OPENED - if the region was already opened. + * FAILED_OPENING - if region opening failed. * @throws IOException */ - public void openRegion(final HRegionInfo region) throws IOException; + public RegionOpeningState openRegion(final HRegionInfo region) throws IOException; + + /** + * Opens the specified region. + * + * @param region + * region to open + * @param versionOfOfflineNode + * the version of znode to compare when RS transitions the znode from + * OFFLINE state. + * @return RegionOpeningState + * OPENED - if region open request was successful. + * ALREADY_OPENED - if the region was already opened. + * FAILED_OPENING - if region opening failed. + * @throws IOException + */ + public RegionOpeningState openRegion(HRegionInfo region, + int versionOfOfflineNode) throws IOException; + /** * Opens the specified regions. @@ -405,4 +433,6 @@ * @throws IOException */ public void replicateLogEntries(HLog.Entry[] entries) throws IOException; + + } Index: src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java (revision 0) @@ -0,0 +1,47 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.HRegionInfo; + +/** + * A callable object that invokes the corresponding action that needs to be + * taken for assignment of a region in transition. + * Implementing as future callable we are able to act on the timeout + * asynchronously. + */ +public class AssignCallable implements Callable { + private AssignmentManager assignmentManager; + + private HRegionInfo hri; + + public AssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) { + this.assignmentManager = assignmentManager; + this.hri = hri; + } + + @Override + public Object call() throws Exception { + assignmentManager.assign(hri, true, true, true); + return null; + } +} Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1172608) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -36,6 +36,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -62,6 +63,7 @@ import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; +import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -138,6 +140,9 @@ new TreeMap(); private final ExecutorService executorService; + + // Thread pool executor service for timeout monitor + private java.util.concurrent.ExecutorService threadPoolExecutorService; /** * Constructs a new assignment manager. @@ -166,6 +171,7 @@ this.zkTable = new ZKTable(this.master.getZooKeeper()); this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); + this.threadPoolExecutorService = Executors.newCachedThreadPool(); } /** @@ -358,8 +364,19 @@ // Just insert region into RIT // If this never updates the timeout will trigger new assignment - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.OPENING, data.getStamp())); + if (regionInfo.isMetaRegion() || regionInfo.isRootRegion()) { + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.OPENING, data.getStamp())); + // If ROOT or .META. table is waiting for timeout monitor to assign + // it may take lot of time when the assignment.timeout.period is + // the default value which may be very long. We will not be able + // to serve any request during this time. + // So we will assign the ROOT and .META. region immediately. + processOpeningState(regionInfo); + break; + } + regionsInTransition.put(encodedRegionName, new RegionState(regionInfo, + RegionState.State.OPENING, data.getStamp())); break; case RS_ZK_REGION_OPENED: @@ -843,26 +860,40 @@ public void assign(HRegionInfo region, boolean setOfflineInZK) { assign(region, setOfflineInZK, false); } + public void assign(HRegionInfo region, boolean setOfflineInZK, + boolean forceNewPlan) { + assign(region, setOfflineInZK, forceNewPlan, false); + } + /** + * @param region + * @param setOfflineInZK + * @param forceNewPlan + * @param hijack - true new assignment is needed, false otherwise + */ public void assign(HRegionInfo region, boolean setOfflineInZK, - boolean forceNewPlan) { + boolean forceNewPlan, boolean hijack) { + // If hijack is true do not call disableRegionIfInRIT as + // we have not yet moved the znode to OFFLINE state. + if (!hijack && isDisabledorDisablingRegionInRIT(region)) { + return; + } + RegionState state = addToRegionsInTransition(region, hijack); + synchronized (state) { + assign(region, state, setOfflineInZK, forceNewPlan, hijack); + } + } + + private boolean isDisabledorDisablingRegionInRIT(HRegionInfo region) { String tableName = region.getTableDesc().getNameAsString(); boolean disabled = this.zkTable.isDisabledTable(tableName); if (disabled || this.zkTable.isDisablingTable(tableName)) { LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") + " skipping assign of " + region.getRegionNameAsString()); offlineDisabledRegion(region); - return; + return true; } - if (this.serverManager.isClusterShutdown()) { - LOG.info("Cluster shutdown is set; skipping assign of " + - region.getRegionNameAsString()); - return; - } - RegionState state = addToRegionsInTransition(region); - synchronized (state) { - assign(state, setOfflineInZK, forceNewPlan); - } + return false; } /** @@ -997,11 +1028,13 @@ /** * @param region - * @return + * @param hijack + * @return The current RegionState */ - private RegionState addToRegionsInTransition(final HRegionInfo region) { + private RegionState addToRegionsInTransition(final HRegionInfo region, + boolean hijack) { synchronized (regionsInTransition) { - return forceRegionStateToOffline(region); + return forceRegionStateToOffline(region, hijack); } } @@ -1012,14 +1045,31 @@ * @return Amended RegionState. */ private RegionState forceRegionStateToOffline(final HRegionInfo region) { + return forceRegionStateToOffline(region, false); + } + /** + * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}. + * Caller must hold lock on this.regionsInTransition. + * @param region + * @param hijack + * @return Amended RegionState. + */ + private RegionState forceRegionStateToOffline(final HRegionInfo region, + boolean hijack) { String encodedName = region.getEncodedName(); RegionState state = this.regionsInTransition.get(encodedName); if (state == null) { state = new RegionState(region, RegionState.State.OFFLINE); this.regionsInTransition.put(encodedName, state); } else { - LOG.debug("Forcing OFFLINE; was=" + state); - state.update(RegionState.State.OFFLINE); + // If we are reassigning the node do not force in-memory state to OFFLINE. + // Based on the znode state we will decide if to change + // in-memory state to OFFLINE or not. It will + // be done before setting the znode to OFFLINE state. + if (!hijack) { + LOG.debug("Forcing OFFLINE; was=" + state); + state.update(RegionState.State.OFFLINE); + } } return state; } @@ -1029,11 +1079,27 @@ * @param state * @param setOfflineInZK * @param forceNewPlan + * @param hijack */ - private void assign(final RegionState state, final boolean setOfflineInZK, - final boolean forceNewPlan) { + private void assign(final HRegionInfo region, final RegionState state, + final boolean setOfflineInZK, final boolean forceNewPlan, boolean hijack) { for (int i = 0; i < this.maximumAssignmentAttempts; i++) { - if (setOfflineInZK && !setOfflineInZooKeeper(state)) return; + int versionOfOfflineNode = -1; + if (setOfflineInZK) { + // get the version of the znode after setting it to OFFLINE. + // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE + versionOfOfflineNode = setOfflineInZooKeeper(state, hijack); + if (versionOfOfflineNode != -1) { + if (isDisabledorDisablingRegionInRIT(region)) { + return; + } + } + } + + if (setOfflineInZK && versionOfOfflineNode == -1) { + return; + } + if (this.master.isStopped()) { LOG.debug("Server stopped; skipping assign of " + state); return; @@ -1051,7 +1117,34 @@ // Transition RegionState to PENDING_OPEN state.update(RegionState.State.PENDING_OPEN); // Send OPEN RPC. This can fail if the server on other end is is not up. - serverManager.sendRegionOpen(plan.getDestination(), state.getRegion()); + // Pass the version that was obtained while setting the node to OFFLINE. + RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan + .getDestination(), state.getRegion(), versionOfOfflineNode); + if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { + // Remove region from in-memory transition and unassigned node from ZK + // While trying to enable the table the regions of the table were + // already enabled. + String encodedRegionName = state.getRegion() + .getEncodedName(); + try { + ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName); + } catch (KeeperException.NoNodeException e) { + if(LOG.isDebugEnabled()){ + LOG.debug("The unassigned node "+encodedRegionName+" doesnot exist."); + } + } catch (KeeperException e) { + master.abort( + "Error deleting OFFLINED node in ZK for transition ZK node (" + + encodedRegionName + ")", e); + } + synchronized (this.regionsInTransition) { + this.regionsInTransition.remove(plan.getRegionInfo() + .getEncodedName()); + } + synchronized (this.regions) { + this.regions.put(plan.getRegionInfo(), plan.getDestination()); + } + } break; } catch (Throwable t) { LOG.warn("Failed assignment of " + @@ -1084,29 +1177,53 @@ /** * Set region as OFFLINED up in zookeeper * @param state - * @return True if we succeeded, false otherwise (State was incorrect or failed - * updating zk). + * @param hijack + * - true if needs to be hijacked and reassigned, false otherwise. + * @return the version of the offline node if setting of the OFFLINE node was + * successful, -1 otherwise. */ - boolean setOfflineInZooKeeper(final RegionState state) { - if (!state.isClosed() && !state.isOffline()) { + int setOfflineInZooKeeper(final RegionState state, boolean hijack) { + // In case of reassignment the current state in memory need not be + // OFFLINE. + if (!hijack && !state.isClosed() && !state.isOffline()) { new RuntimeException("Unexpected state trying to OFFLINE; " + state); this.master.abort("Unexpected state trying to OFFLINE; " + state, new IllegalStateException()); - return false; + return -1; } - state.update(RegionState.State.OFFLINE); + boolean allowZNodeCreation = false; + // Under reassignment if the current state is PENDING_OPEN + // or OPENING then refresh the in-memory state to PENDING_OPEN. This is + // important because if the region was in + // RS_OPENING state for a long time the master will try to force the znode + // to OFFLINE state meanwhile the RS could have opened the corresponding + // region and the state in znode will be RS_ZK_REGION_OPENED. + // For all other cases we can change the in-memory state to OFFLINE. + if (hijack + && (state.getState().equals(RegionState.State.PENDING_OPEN) || state + .getState().equals(RegionState.State.OPENING))) { + state.update(RegionState.State.PENDING_OPEN); + allowZNodeCreation = false; + } else { + state.update(RegionState.State.OFFLINE); + allowZNodeCreation = true; + } + int versionOfOfflineNode = -1; try { - if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), - state.getRegion(), master.getServerName())) { + // get the version after setting the znode to OFFLINE + versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(master + .getZooKeeper(), state.getRegion(), this.master.getServerName(), + hijack, allowZNodeCreation); + if (versionOfOfflineNode == -1) { LOG.warn("Attempted to create/force node into OFFLINE state before " + "completing assignment but failed to do so for " + state); - return false; + return -1; } } catch (KeeperException e) { master.abort("Unexpected ZK exception creating/setting node OFFLINE", e); - return false; + return -1; } - return true; + return versionOfOfflineNode; } /** @@ -1837,136 +1954,135 @@ protected void chore() { // If bulkAssign in progress, suspend checks if (this.bulkAssign) return; - List unassigns = new ArrayList(); - Map assigns = - new HashMap(); synchronized (regionsInTransition) { // Iterate all regions in transition checking for time outs long now = System.currentTimeMillis(); for (RegionState regionState : regionsInTransition.values()) { if (regionState.getStamp() + timeout <= now) { - HRegionInfo regionInfo = regionState.getRegion(); - LOG.info("Regions in transition timed out: " + regionState); - // Expired! Do a retry. - switch (regionState.getState()) { - case CLOSED: - LOG.info("Region " + regionInfo.getEncodedName() + - " has been CLOSED for too long, waiting on queued " + - "ClosedRegionHandler to run or server shutdown"); - // Update our timestamp. - synchronized(regionState) { - regionState.update(regionState.getState()); - } - break; - case OFFLINE: - LOG.info("Region has been OFFLINE for too long, " + - "reassigning " + regionInfo.getRegionNameAsString() + - " to a random server"); - assigns.put(regionState.getRegion(), Boolean.FALSE); - break; - case PENDING_OPEN: - LOG.info("Region has been PENDING_OPEN for too " + - "long, reassigning region=" + - regionInfo.getRegionNameAsString()); - assigns.put(regionState.getRegion(), Boolean.TRUE); - break; - case OPENING: - LOG.info("Region has been OPENING for too " + - "long, reassigning region=" + - regionInfo.getRegionNameAsString()); - // Should have a ZK node in OPENING state - try { - String node = ZKAssign.getNodeName(watcher, - regionInfo.getEncodedName()); - Stat stat = new Stat(); - RegionTransitionData data = ZKAssign.getDataNoWatch(watcher, - node, stat); - if (data == null) { - LOG.warn("Data is null, node " + node + " no longer exists"); - break; - } - if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) { - LOG.debug("Region has transitioned to OPENED, allowing " + - "watched event handlers to process"); - break; - } else if (data.getEventType() != - EventType.RS_ZK_REGION_OPENING) { - LOG.warn("While timing out a region in state OPENING, " + - "found ZK node in unexpected state: " + - data.getEventType()); - break; - } - // Attempt to transition node into OFFLINE - try { - data = new RegionTransitionData( - EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(), - master.getServerName()); - if (ZKUtil.setData(watcher, node, data.getBytes(), - stat.getVersion())) { - // Node is now OFFLINE, let's trigger another assignment - ZKUtil.getDataAndWatch(watcher, node); // re-set the watch - LOG.info("Successfully transitioned region=" + - regionInfo.getRegionNameAsString() + " into OFFLINE" + - " and forcing a new assignment"); - assigns.put(regionState.getRegion(), Boolean.TRUE); - } - } catch (KeeperException.NoNodeException nne) { - // Node did not exist, can't time this out - } - } catch (KeeperException ke) { - LOG.error("Unexpected ZK exception timing out CLOSING region", - ke); - break; - } - break; - case OPEN: - LOG.error("Region has been OPEN for too long, " + - "we don't know where region was opened so can't do anything"); - synchronized(regionState) { - regionState.update(regionState.getState()); - } - break; - - case PENDING_CLOSE: - LOG.info("Region has been PENDING_CLOSE for too " + - "long, running forced unassign again on region=" + - regionInfo.getRegionNameAsString()); - try { - // If the server got the RPC, it will transition the node - // to CLOSING, so only do something here if no node exists - if (!ZKUtil.watchAndCheckExists(watcher, - ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) { - // Queue running of an unassign -- do actual unassign - // outside of the regionsInTransition lock. - unassigns.add(regionInfo); - } - } catch (NoNodeException e) { - LOG.debug("Node no longer existed so not forcing another " + - "unassignment"); - } catch (KeeperException e) { - LOG.warn("Unexpected ZK exception timing out a region " + - "close", e); - } - break; - case CLOSING: - LOG.info("Region has been CLOSING for too " + - "long, this should eventually complete or the server will " + - "expire, doing nothing"); - break; - } + actOnTimeOut(regionState); } } } - // Finish the work for regions in PENDING_CLOSE state - for (HRegionInfo hri: unassigns) { - unassign(hri, true); + } + + private void actOnTimeOut(RegionState regionState) { + HRegionInfo regionInfo = regionState.getRegion(); + LOG.info("Regions in transition timed out: " + regionState); + // Expired! Do a retry. + switch (regionState.getState()) { + case CLOSED: + LOG.info("Region " + regionInfo.getEncodedName() + + " has been CLOSED for too long, waiting on queued " + + "ClosedRegionHandler to run or server shutdown"); + // Update our timestamp. + synchronized(regionState) { + regionState.update(regionState.getState()); + } + break; + case OFFLINE: + LOG.info("Region has been OFFLINE for too long, " + + "reassigning " + regionInfo.getRegionNameAsString() + + " to a random server"); + invokeAssign(regionInfo); + break; + case PENDING_OPEN: + LOG.info("Region has been PENDING_OPEN for too " + + "long, reassigning region=" + + regionInfo.getRegionNameAsString()); + invokeAssign(regionInfo); + break; + case OPENING: + processOpeningState(regionInfo); + break; + case OPEN: + LOG.error("Region has been OPEN for too long, " + + "we don't know where region was opened so can't do anything"); + synchronized(regionState) { + regionState.update(regionState.getState()); + } + break; + + case PENDING_CLOSE: + LOG.info("Region has been PENDING_CLOSE for too " + + "long, running forced unassign again on region=" + + regionInfo.getRegionNameAsString()); + try { + // If the server got the RPC, it will transition the node + // to CLOSING, so only do something here if no node exists + if (!ZKUtil.watchAndCheckExists(watcher, + ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) { + // Queue running of an unassign -- do actual unassign + // outside of the regionsInTransition lock. + invokeUnassign(regionInfo); + } + } catch (NoNodeException e) { + LOG.debug("Node no longer existed so not forcing another " + + "unassignment"); + } catch (KeeperException e) { + LOG.warn("Unexpected ZK exception timing out a region " + + "close", e); + } + break; + case CLOSING: + LOG.info("Region has been CLOSING for too " + + "long, this should eventually complete or the server will " + + "expire, doing nothing"); + break; } - for (Map.Entry e: assigns.entrySet()){ - assign(e.getKey(), false, e.getValue()); + } + } + + private void processOpeningState(HRegionInfo regionInfo) { + LOG.info("Region has been OPENING for too " + "long, reassigning region=" + + regionInfo.getRegionNameAsString()); + // Should have a ZK node in OPENING state + try { + String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()); + Stat stat = new Stat(); + RegionTransitionData data = ZKAssign.getDataNoWatch(watcher, node, stat); + if (data == null) { + LOG.warn("Data is null, node " + node + " no longer exists"); + return; } + if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) { + LOG.debug("Region has transitioned to OPENED, allowing " + + "watched event handlers to process"); + return; + } else if (data.getEventType() != EventType.RS_ZK_REGION_OPENING) { + LOG.warn("While timing out a region in state OPENING, " + + "found ZK node in unexpected state: " + data.getEventType()); + return; + } + // Attempt to transition node into OFFLINE + try { + data = new RegionTransitionData(EventType.M_ZK_REGION_OFFLINE, + regionInfo.getRegionName(), master.getServerName()); + if (ZKUtil.setData(watcher, node, data.getBytes(), stat.getVersion())) { + // Node is now OFFLINE, let's trigger another assignment + ZKUtil.getDataAndWatch(watcher, node); // re-set the watch + LOG.info("Successfully transitioned region=" + + regionInfo.getRegionNameAsString() + " into OFFLINE" + + " and forcing a new assignment"); + invokeAssign(regionInfo); + } + } catch (KeeperException.NoNodeException nne) { + // Node did not exist, can't time this out + } + } catch (KeeperException ke) { + LOG.error("Unexpected ZK exception timing out CLOSING region", ke); + return; } } + + + private void invokeAssign(HRegionInfo regionInfo) { + threadPoolExecutorService.submit(new AssignCallable(this, regionInfo)); + } + private void invokeUnassign(HRegionInfo regionInfo) { + threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo)); + } + /** * Process shutdown server removing any assignments. * @param hsi Server that went down. @@ -2255,4 +2371,12 @@ public boolean isServerOnline(String serverName) { return this.serverManager.isServerOnline(serverName); } + /** + * Shutdown the threadpool executor service + */ + public void shutdown() { + if (null != threadPoolExecutorService) { + this.threadPoolExecutorService.shutdown(); + } + } } Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1172608) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -968,6 +968,7 @@ @Override public void shutdown() { + this.assignmentManager.shutdown(); this.serverManager.shutdownCluster(); try { this.clusterStatusTracker.setClusterDown(); Index: src/main/java/org/apache/hadoop/hbase/master/ServerManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 1172608) +++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; import org.apache.hadoop.hbase.master.metrics.MasterMetrics; +import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; /** @@ -549,16 +550,20 @@ *

* @param server server to open a region * @param region region to open + * @param versionOfOfflineNode that needs to be present in the offline node + * when RS tries to change the state from OFFLINE to other states. */ - public void sendRegionOpen(HServerInfo server, HRegionInfo region) + public RegionOpeningState sendRegionOpen(final HServerInfo server, + HRegionInfo region, int versionOfOfflineNode) throws IOException { HRegionInterface hri = getServerConnection(server); if (hri == null) { - LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName() - + " failed because no RPC connection found to this server"); - return; + LOG.warn("Attempting to send OPEN RPC to server " + server.toString() + + " failed because no RPC connection found to this server"); + return RegionOpeningState.FAILED_OPENING; } - hri.openRegion(region); + return (versionOfOfflineNode == -1) ? hri.openRegion(region) : hri + .openRegion(region, versionOfOfflineNode); } /** Index: src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java (revision 0) @@ -0,0 +1,46 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.HRegionInfo; + +/** + * A callable object that invokes the corresponding action that needs to be + * taken for unassignment of a region in transition. Implementing as future + * callable we are able to act on the timeout asynchronously. + */ +public class UnAssignCallable implements Callable { + private AssignmentManager assignmentManager; + + private HRegionInfo hri; + + public UnAssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) { + this.assignmentManager = assignmentManager; + this.hri = hri; + } + + @Override + public Object call() throws Exception { + assignmentManager.unassign(hri); + return null; + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java (revision 1172608) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java (working copy) @@ -31,6 +31,12 @@ public class OpenMetaHandler extends OpenRegionHandler { public OpenMetaHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo) { - super(server,rsServices, regionInfo, EventType.M_RS_OPEN_META); + super(server,rsServices, regionInfo, EventType.M_RS_OPEN_META, -1); } + public OpenMetaHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + int versionOfOfflineNode) { + super(server, rsServices, regionInfo, EventType.M_RS_OPEN_META, + versionOfOfflineNode); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 1172608) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -49,18 +49,27 @@ // the total open. We'll fail the open if someone hijacks our znode; we can // tell this has happened if version is not as expected. private volatile int version = -1; + //version of the offline node that was set by the master + private volatile int versionOfOfflineNode = -1; public OpenRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo) { - this(server, rsServices, regionInfo, EventType.M_RS_OPEN_REGION); + this(server, rsServices, regionInfo, EventType.M_RS_OPEN_REGION, -1); } + public OpenRegionHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + int versionOfOfflineNode) { + this(server, rsServices, regionInfo, EventType.M_RS_OPEN_REGION, + versionOfOfflineNode); + } protected OpenRegionHandler(final Server server, final RegionServerServices rsServices, final HRegionInfo regionInfo, - EventType eventType) { + EventType eventType, int versionOfOfflineNode) { super(server, eventType); this.rsServices = rsServices; this.regionInfo = regionInfo; + this.versionOfOfflineNode = versionOfOfflineNode; } public HRegionInfo getRegionInfo() { @@ -92,7 +101,8 @@ // If fails, just return. Someone stole the region from under us. // Calling transitionZookeeperOfflineToOpening initalizes this.version. - if (!transitionZookeeperOfflineToOpening(encodedName)) { + if (!transitionZookeeperOfflineToOpening(encodedName, + versionOfOfflineNode)) { LOG.warn("Region was hijacked? It no longer exists, encodedName=" + encodedName); return; @@ -301,15 +311,18 @@ * Transition ZK node from OFFLINE to OPENING. * @param encodedName Name of the znode file (Region encodedName is the znode * name). + * @param versionOfOfflineNode - version Of OfflineNode that needs to be compared + * before changing the node's state from OFFLINE * @return True if successful transition. */ - boolean transitionZookeeperOfflineToOpening(final String encodedName) { + boolean transitionZookeeperOfflineToOpening(final String encodedName, + int versionOfOfflineNode) { // TODO: should also handle transition from CLOSED? try { // Initialize the znode version. - this.version = - ZKAssign.transitionNodeOpening(server.getZooKeeper(), - regionInfo, server.getServerName()); + this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo, + server.getServerName(), EventType.M_ZK_REGION_OFFLINE, + EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode); } catch (KeeperException e) { LOG.error("Error transition from OFFLINE to OPENING for region=" + encodedName, e); Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java (revision 1172608) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java (working copy) @@ -29,8 +29,15 @@ * This is executed after receiving an OPEN RPC from the master for root. */ public class OpenRootHandler extends OpenRegionHandler { + public OpenRootHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo) { - super(server, rsServices, regionInfo, EventType.M_RS_OPEN_ROOT); + super(server, rsServices, regionInfo, EventType.M_RS_OPEN_ROOT, -1); } + public OpenRootHandler(final Server server, + final RegionServerServices rsServices, HRegionInfo regionInfo, + int versionOfOfflineNode) { + super(server, rsServices, regionInfo, EventType.M_RS_OPEN_ROOT, + versionOfOfflineNode); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1172608) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HServerLoad; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterAddressTracker; import org.apache.hadoop.hbase.NotServingRegionException; @@ -2112,24 +2113,40 @@ // Region open/close direct RPCs + @QosPriority(priority=HIGH_QOS) + public RegionOpeningState openRegion(HRegionInfo region) + throws IOException { + return openRegion(region, -1); + } @Override @QosPriority(priority=HIGH_QOS) - public void openRegion(HRegionInfo region) + public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode) throws IOException { if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) { throw new RegionAlreadyInTransitionException("open", region.getEncodedName()); } + HRegion onlineRegion = this.getFromOnlineRegions(region.getEncodedName()); + if (null != onlineRegion) { + LOG.warn("Attempted open of " + region.getEncodedName() + + " but already online on this server"); + return RegionOpeningState.ALREADY_OPENED; + } LOG.info("Received request to open region: " + region.getRegionNameAsString()); if (this.stopped) throw new RegionServerStoppedException(); this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes()); + // Need to pass the expected version in the constructor. if (region.isRootRegion()) { - this.service.submit(new OpenRootHandler(this, this, region)); - } else if(region.isMetaRegion()) { - this.service.submit(new OpenMetaHandler(this, this, region)); + this.service.submit(new OpenRootHandler(this, this, region, + versionOfOfflineNode)); + } else if (region.isMetaRegion()) { + this.service.submit(new OpenMetaHandler(this, this, region, + versionOfOfflineNode)); } else { - this.service.submit(new OpenRegionHandler(this, this, region)); + this.service.submit(new OpenRegionHandler(this, this, region, + versionOfOfflineNode)); } + return RegionOpeningState.OPENED; } @Override Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionOpeningState.java (revision 0) @@ -0,0 +1,30 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +public enum RegionOpeningState { + + OPENED, + + ALREADY_OPENED, + + FAILED_OPENING; +} + Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 1172608) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (working copy) @@ -211,7 +211,6 @@ } } - /** * Creates or force updates an unassigned node to the OFFLINE state for the * specified region. @@ -227,11 +226,48 @@ * @param zkw zk reference * @param region region to be created as offline * @param serverName server event originates from + * @return the version of the znode created in OFFLINE state, -1 if + * unsuccessful. * @throws KeeperException if unexpected zookeeper exception * @throws KeeperException.NodeExistsException if node already exists */ - public static boolean createOrForceNodeOffline(ZooKeeperWatcher zkw, - HRegionInfo region, String serverName) + public static int createOrForceNodeOffline(ZooKeeperWatcher zkw, + HRegionInfo region, String serverName) throws KeeperException { + return createOrForceNodeOffline(zkw, region, serverName, false, true); + } + + /** + * Creates or force updates an unassigned node to the OFFLINE state for the + * specified region. + *

+ * Attempts to create the node but if it exists will force it to transition to + * and OFFLINE state. + *

+ * Sets a watcher on the unassigned region node if the method is successful. + * + *

+ * This method should be used when assigning a region. + * + * @param zkw + * zk reference + * @param region + * region to be created as offline + * @param serverName + * server event originates from + * @param hijack + * - true if to be hijacked and reassigned, false otherwise + * @param allowCreation + * - true if the node has to be created newly, false otherwise + * @throws KeeperException + * if unexpected zookeeper exception + * @return the version of the znode created in OFFLINE state, -1 if + * unsuccessful. + * @throws KeeperException.NodeExistsException + * if node already exists + */ + public static int createOrForceNodeOffline(ZooKeeperWatcher zkw, + HRegionInfo region, String serverName, + boolean hijack, boolean allowCreation) throws KeeperException { LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " + region.getEncodedName() + " with OFFLINE state")); @@ -239,14 +275,49 @@ EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName); synchronized(zkw.getNodes()) { String node = getNodeName(zkw, region.getEncodedName()); + Stat stat = new Stat(); zkw.sync(node); zkw.getNodes().add(node); int version = ZKUtil.checkExists(zkw, node); - if(version == -1) { - ZKUtil.createAndWatch(zkw, node, data.getBytes()); + if (version == -1) { + // While trying to transit a node to OFFLINE that was in previously in + // OPENING state but before it could transit to OFFLINE state if RS had + // opened the region then the Master deletes the assigned region znode. + // In that case the znode will not exist. So we should not + // create the znode again which will lead to double assignment. + if (hijack && !allowCreation) { + return -1; + } + return ZKUtil.createAndWatch(zkw, node, data.getBytes()); } else { - if (!ZKUtil.setData(zkw, node, data.getBytes(), version)) { - return false; + RegionTransitionData curDataInZNode = ZKAssign.getDataNoWatch(zkw, region + .getEncodedName(), stat); + // Do not move the node to OFFLINE if znode is in any of the following + // state. + // Because these are already executed states. + if (hijack && null != curDataInZNode) { + EventType eventType = curDataInZNode.getEventType(); + if (eventType.equals(EventType.RS_ZK_REGION_CLOSING) + || eventType.equals(EventType.RS_ZK_REGION_CLOSED) + || eventType.equals(EventType.RS_ZK_REGION_OPENED)) { + return -1; + } + } + + boolean setData = false; + try { + setData = ZKUtil.setData(zkw, node, data.getBytes(), version); + // Setdata throws KeeperException which aborts the Master. So we are + // catching it here. + // If just before setting the znode to OFFLINE if the RS has made any + // change to the + // znode state then we need to return -1. + } catch (KeeperException kpe) { + LOG.info("Version mismatch while setting the node to OFFLINE state."); + return -1; + } + if (!setData) { + return -1; } else { // We successfully forced to OFFLINE, reset watch and handle if // the state changed in between our set and the watch @@ -254,12 +325,12 @@ ZKAssign.getData(zkw, region.getEncodedName()); if (curData.getEventType() != data.getEventType()) { // state changed, need to process - return false; + return -1; } } } + return stat.getVersion() + 1; } - return true; } /** @@ -684,6 +755,18 @@ "the node existed but was version " + stat.getVersion() + " not the expected version " + expectedVersion)); return -1; + } else if (beginState.equals(EventType.M_ZK_REGION_OFFLINE) + && endState.equals(EventType.RS_ZK_REGION_OPENING) + && expectedVersion == -1 && stat.getVersion() != 0) { + // the below check ensures that double assignment doesnot happen. + // When the node is created for the first time then the expected version + // that is passed will be -1 and the version in znode will be 0. + // In all other cases the version in znode will be > 0. + LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for " + + encoded + " from " + beginState + " to " + endState + " failed, " + + "the node existed but was version " + stat.getVersion() + + " not the expected version " + expectedVersion)); + return -1; } // Verify it is in expected state