Index: src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (revision 1154815) +++ src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (working copy) @@ -117,6 +117,9 @@ M_RS_CLOSE_REGION (23), // Master asking RS to close a region M_RS_CLOSE_ROOT (24), // Master asking RS to close root M_RS_CLOSE_META (25), // Master asking RS to close meta + M_RS_RE_ALLOCATE (26), // Master asking new RS to open a region when the + // previous RS was slow or took a long time open a + // region // Messages originating from Client to Master C_M_DELETE_TABLE (40), // Client asking Master to delete a table Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1154815) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -485,7 +485,7 @@ public static final String HBASE_MASTER_LOGCLEANER_PLUGINS = "hbase.master.logcleaner.plugins"; - + public static final String RE_ALLOCATE_STATE = "RE_ALLOCATE"; /* * Minimum percentage of free heap necessary for a successful cluster startup. */ Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1155311) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -36,6 +36,8 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -54,11 +56,10 @@ import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.EventHandler.EventType; -import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State; import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; @@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; import org.apache.hadoop.hbase.master.handler.SplitRegionHandler; +import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -82,6 +84,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; +import org.apache.hadoop.hbase.executor.ExecutorService; /** * Manages and performs region assignment. @@ -143,6 +146,8 @@ new TreeMap(); private final ExecutorService executorService; + + private java.util.concurrent.ExecutorService threadPoolExecutorService; /** * Constructs a new assignment manager. @@ -155,7 +160,7 @@ * @throws IOException */ public AssignmentManager(Server master, ServerManager serverManager, - CatalogTracker catalogTracker, final ExecutorService service) + CatalogTracker catalogTracker, final ExecutorService service, final java.util.concurrent.ExecutorService threadPoolExecutorService) throws KeeperException, IOException { super(master.getZooKeeper()); this.master = master; @@ -172,6 +177,7 @@ this.zkTable = new ZKTable(this.master.getZooKeeper()); this.maximumAssignmentAttempts = this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10); + this.threadPoolExecutorService = threadPoolExecutorService; } /** @@ -399,6 +405,14 @@ regionInfo, RegionState.State.OPENING, data.getStamp(), data.getOrigin())); break; + + case M_RS_RE_ALLOCATE: + //We are any way doing a forcefull reassign in this case. If the prev RS wins + // then the region will be assigned to it. If not master will assign to new RS + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.PENDING_OPEN, + data.getStamp(), data.getOrigin())); + break; case RS_ZK_REGION_OPENED: // Region is opened, insert into RIT and handle it @@ -613,7 +627,14 @@ regionState.update(RegionState.State.OPENING, data.getStamp(), data.getOrigin()); break; + + case M_RS_RE_ALLOCATE: + //Put the memory state to PENDING_OPEN + regionState.update(RegionState.State.PENDING_OPEN, data.getStamp(), + data.getOrigin()); + break; + case RS_ZK_REGION_OPENED: // Should see OPENED after OPENING but possible after PENDING_OPEN if (regionState == null || @@ -1034,6 +1055,108 @@ assign(state, setOfflineInZK, forceNewPlan); } } + + public void assign(HRegionInfo region, RegionState.State prevStateInMemory) { + String tableName = region.getTableNameAsString(); + boolean disabled = this.zkTable.isDisabledTable(tableName); + if (disabled || this.zkTable.isDisablingTable(tableName)) { + LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") + + " skipping assign of " + region.getRegionNameAsString()); + offlineDisabledRegion(region); + return; + } + if (this.serverManager.isClusterShutdown()) { + LOG.info("Cluster shutdown is set; skipping assign of " + + region.getRegionNameAsString()); + return; + } + RegionState state; + synchronized (regionsInTransition) { + state = regionsInTransition.get(region.getEncodedName()); + } + if (null != state) { + synchronized (state) { + assignToReAllocate(state, prevStateInMemory); + } + } + } + + /** + * Caller must hold lock on the passed state object. + * + * @param state + * @param setOfflineInZK + * @param forceNewPlan + */ + private void assignToReAllocate(final RegionState state, + final RegionState.State prevStateInMemory) { + for (int i = 0; i < this.maximumAssignmentAttempts; i++) { + RegionPlan plan = getRegionPlan(state, true); + if (plan == null) + return; + if (false == setReAllocateInZooKeeper(state, plan, prevStateInMemory)){ + return; + } + if (this.master.isStopped()) { + LOG.debug("Server stopped; skipping assign of " + state); + return; + } + LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() + + " to " + plan.getDestination().toString()); + // Transition RegionState to PENDING_OPEN + state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(), + plan.getDestination()); + try { + // Send OPEN RPC. This can fail if the server on other end is is not up. + RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan + .getDestination(), state.getRegion()); + processAlreadyOpenedRegion(state, plan, regionOpenState); + break; + } catch (Throwable t) { + LOG.warn("Failed assignment of " + + state.getRegion().getRegionNameAsString() + " to " + + plan.getDestination() + ", trying to assign elsewhere instead; " + + "retry=" + i, t); + // Clean out plan we failed execute and one that doesn't look like it'll + // succeed anyways; we need a new plan! + // Transition back to OFFLINE + state.update(RegionState.State.OFFLINE); + // Force a new plan and reassign. Will return null if no servers. + if (getRegionPlan(state, plan.getDestination(), true) == null) { + LOG.warn("Unable to find a viable location to assign region " + + state.getRegion().getRegionNameAsString()); + return; + } + } + } + } + + /** + * Set region as REALLOCATE up in zookeeper + * + * @param state + * @param plan + * @param prevStateInMemory + * @return True if we succeeded, false otherwise (State was incorrect or + * failed updating zk). + */ + boolean setReAllocateInZooKeeper(final RegionState state, RegionPlan plan, + RegionState.State prevStateInMemory) { + try { + if (!ZKAssign.forceNodeToReAllocate(master.getZooKeeper(), state + .getRegion(), plan.getDestination(), prevStateInMemory)) { + LOG.warn("Attempted to create/force node into " + + HConstants.RE_ALLOCATE_STATE + " state before " + + "completing assignment but failed to do so for " + state); + return false; + } + } catch (KeeperException e) { + master.abort("Unexpected ZK exception creating/setting node " + + HConstants.RE_ALLOCATE_STATE, e); + return false; + } + return true; + } /** * Bulk assign regions to destination. @@ -1238,31 +1361,7 @@ // Send OPEN RPC. This can fail if the server on other end is is not up. RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan .getDestination(), state.getRegion()); - if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { - // Remove region from in-memory transition and unassigned node from ZK - // While trying to enable the table the regions of the table were - // already enabled. - String encodedRegionName = state.getRegion() - .getEncodedName(); - try { - ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName); - } catch (KeeperException.NoNodeException e) { - if(LOG.isDebugEnabled()){ - LOG.debug("The unassigned node "+encodedRegionName+" doesnot exist."); - } - } catch (KeeperException e) { - master.abort( - "Error deleting OFFLINED node in ZK for transition ZK node (" - + encodedRegionName + ")", e); - } - synchronized (this.regionsInTransition) { - this.regionsInTransition.remove(plan.getRegionInfo() - .getEncodedName()); - } - synchronized (this.regions) { - this.regions.put(plan.getRegionInfo(), plan.getDestination()); - } - } + processAlreadyOpenedRegion(state, plan, regionOpenState); break; } catch (Throwable t) { LOG.warn("Failed assignment of " + @@ -1283,6 +1382,38 @@ } } + private void processAlreadyOpenedRegion(final RegionState state, RegionPlan plan, + RegionOpeningState regionOpenState) { + if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { + // Remove region from in-memory transition and unassigned node from ZK + // While trying to enable the table the regions of the table were + // already enabled. + String encodedRegionName = state.getRegion() + .getEncodedName(); + boolean isSuccess = false; + try { + isSuccess = ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName); + } catch (KeeperException.NoNodeException e) { + if(LOG.isDebugEnabled()){ + LOG.debug("The unassigned node "+encodedRegionName+" doesnot exist."); + } + } catch (KeeperException e) { + master.abort( + "Error deleting OFFLINED node in ZK for transition ZK node (" + + encodedRegionName + ")", e); + } + if (isSuccess) { + synchronized (this.regionsInTransition) { + this.regionsInTransition + .remove(plan.getRegionInfo().getEncodedName()); + } + synchronized (this.regions) { + this.regions.put(plan.getRegionInfo(), plan.getDestination()); + } + } + } + } + /** * Set region as OFFLINED up in zookeeper * @param state @@ -1431,8 +1562,7 @@ synchronized (regionsInTransition) { state = regionsInTransition.get(encodedName); if (state == null) { - - // Create the znode in CLOSING state + // Create the znode in CLOSING state try { ZKAssign.createNodeClosing( master.getZooKeeper(), region, master.getServerName()); @@ -2131,125 +2261,122 @@ long now = System.currentTimeMillis(); for (RegionState regionState : regionsInTransition.values()) { if (regionState.getStamp() + timeout <= now) { - HRegionInfo regionInfo = regionState.getRegion(); - LOG.info("Regions in transition timed out: " + regionState); - // Expired! Do a retry. - switch (regionState.getState()) { - case CLOSED: - LOG.info("Region " + regionInfo.getEncodedName() + - " has been CLOSED for too long, waiting on queued " + - "ClosedRegionHandler to run or server shutdown"); - // Update our timestamp. - regionState.updateTimestampToNow(); - break; - case OFFLINE: - LOG.info("Region has been OFFLINE for too long, " + - "reassigning " + regionInfo.getRegionNameAsString() + - " to a random server"); - assigns.put(regionState.getRegion(), Boolean.FALSE); - break; - case PENDING_OPEN: - LOG.info("Region has been PENDING_OPEN for too " + - "long, reassigning region=" + - regionInfo.getRegionNameAsString()); - assigns.put(regionState.getRegion(), Boolean.TRUE); - break; - case OPENING: - LOG.info("Region has been OPENING for too " + - "long, reassigning region=" + - regionInfo.getRegionNameAsString()); - // Should have a ZK node in OPENING state - try { - String node = ZKAssign.getNodeName(watcher, - regionInfo.getEncodedName()); - Stat stat = new Stat(); - RegionTransitionData data = ZKAssign.getDataNoWatch(watcher, - node, stat); - if (data == null) { - LOG.warn("Data is null, node " + node + " no longer exists"); - break; - } - if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) { - LOG.debug("Region has transitioned to OPENED, allowing " + - "watched event handlers to process"); - break; - } else if (data.getEventType() != - EventType.RS_ZK_REGION_OPENING) { - LOG.warn("While timing out a region in state OPENING, " + - "found ZK node in unexpected state: " + - data.getEventType()); - break; - } - // Attempt to transition node into OFFLINE - try { - data = new RegionTransitionData( - EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(), - master.getServerName()); - if (ZKUtil.setData(watcher, node, data.getBytes(), - stat.getVersion())) { - // Node is now OFFLINE, let's trigger another assignment - ZKUtil.getDataAndWatch(watcher, node); // re-set the watch - LOG.info("Successfully transitioned region=" + - regionInfo.getRegionNameAsString() + " into OFFLINE" + - " and forcing a new assignment"); - assigns.put(regionState.getRegion(), Boolean.TRUE); - } - } catch (KeeperException.NoNodeException nne) { - // Node did not exist, can't time this out - } - } catch (KeeperException ke) { - LOG.error("Unexpected ZK exception timing out CLOSING region", - ke); - break; - } - break; - case OPEN: - LOG.error("Region has been OPEN for too long, " + - "we don't know where region was opened so can't do anything"); - synchronized(regionState) { - regionState.updateTimestampToNow(); - } - break; + actOnTimeOut(unassigns, assigns, regionState); + } + } + } + } - case PENDING_CLOSE: - LOG.info("Region has been PENDING_CLOSE for too " + - "long, running forced unassign again on region=" + - regionInfo.getRegionNameAsString()); - try { - // If the server got the RPC, it will transition the node - // to CLOSING, so only do something here if no node exists - if (!ZKUtil.watchAndCheckExists(watcher, - ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) { - // Queue running of an unassign -- do actual unassign - // outside of the regionsInTransition lock. - unassigns.add(regionInfo); - } - } catch (NoNodeException e) { - LOG.debug("Node no longer existed so not forcing another " + - "unassignment"); - } catch (KeeperException e) { - LOG.warn("Unexpected ZK exception timing out a region " + - "close", e); - } - break; - case CLOSING: - LOG.info("Region has been CLOSING for too " + - "long, this should eventually complete or the server will " + - "expire, doing nothing"); - break; + private void actOnTimeOut(List unassigns, + Map assigns, RegionState regionState) { + HRegionInfo regionInfo = regionState.getRegion(); + LOG.info("Regions in transition timed out: " + regionState); + // Expired! Do a retry. + switch (regionState.getState()) { + case CLOSED: + LOG.info("Region " + regionInfo.getEncodedName() + + " has been CLOSED for too long, waiting on queued " + + "ClosedRegionHandler to run or server shutdown"); + // Update our timestamp. + regionState.updateTimestampToNow(); + break; + case OFFLINE: + LOG.info("Region has been OFFLINE for too long, " + + "reassigning " + regionInfo.getRegionNameAsString() + + " to a random server"); + //forcefully assign to new RS + invokeTimeOutManager(regionState.getRegion(), TimeOutOperationType.ASSIGN, Boolean.TRUE, false, regionState.getState()); + break; + case PENDING_OPEN: + LOG.info("Region has been PENDING_OPEN for too " + + "long, reassigning region=" + + regionInfo.getRegionNameAsString()); + invokeTimeOutManager(regionState.getRegion(), TimeOutOperationType.ASSIGN, Boolean.TRUE, false, regionState.getState()); + break; + case OPENING: + LOG.info("Region has been OPENING for too " + + "long, reassigning region=" + + regionInfo.getRegionNameAsString()); + // Should have a ZK node in OPENING state + try { + String node = ZKAssign.getNodeName(watcher, + regionInfo.getEncodedName()); + Stat stat = new Stat(); + RegionTransitionData data = ZKAssign.getDataNoWatch(watcher, + node, stat); + if (data == null) { + LOG.warn("Data is null, node " + node + " no longer exists"); + break; } + if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) { + LOG.debug("Region has transitioned to OPENED, allowing " + + "watched event handlers to process"); + break; + } + //directly move to RE_ALLOCATE state + invokeTimeOutManager(regionState.getRegion(), TimeOutOperationType.ASSIGN, Boolean.TRUE, false, regionState.getState()); + } catch (KeeperException ke) { + LOG.error("Unexpected ZK exception timing out CLOSING region", + ke); + break; } - } + break; + case OPEN: + LOG.error("Region has been OPEN for too long, " + + "we don't know where region was opened so can't do anything"); + synchronized(regionState) { + regionState.updateTimestampToNow(); + } + break; + + case PENDING_CLOSE: + LOG.info("Region has been PENDING_CLOSE for too " + + "long, running forced unassign again on region=" + + regionInfo.getRegionNameAsString()); + try { + // If the server got the RPC, it will transition the node + // to CLOSING, so only do something here if no node exists + if (!ZKUtil.watchAndCheckExists(watcher, + ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) { + // Queue running of an unassign -- do actual unassign + // outside of the regionsInTransition lock. + //unassigns.add(regionInfo); + invokeTimeOutManager(regionState.getRegion(), TimeOutOperationType.UNASSIGN, Boolean.TRUE, false, regionState.getState()); + } + } catch (NoNodeException e) { + LOG.debug("Node no longer existed so not forcing another " + + "unassignment"); + } catch (KeeperException e) { + LOG.warn("Unexpected ZK exception timing out a region " + + "close", e); + } + break; + case CLOSING: + LOG.info("Region has been CLOSING for too " + + "long, this should eventually complete or the server will " + + "expire, doing nothing"); + break; } - // Finish the work for regions in PENDING_CLOSE state - for (HRegionInfo hri: unassigns) { - unassign(hri, true); - } - for (Map.Entry e: assigns.entrySet()){ - assign(e.getKey(), false, e.getValue()); - } } } + + /** + * The type of operation that has to performed on TimeOut + * ASSIGN - need to assign a region to an RS + * UNASSIGN - need to unassign a region + */ + public static enum TimeOutOperationType{ + ASSIGN, + UNASSIGN; + } + + private void invokeTimeOutManager(HRegionInfo hri, + TimeOutOperationType operation, boolean forceNewPlan, + boolean setOfflineInZk, RegionState.State state) { + TimeOutManagerCallable timeOutManager = new TimeOutManagerCallable(this, + hri, operation, state); + threadPoolExecutorService.submit(timeOutManager); + } /** * Process shutdown server removing any assignments. Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1154815) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -181,6 +182,8 @@ private final ServerName serverName; private TableDescriptors tableDescriptors; + + private java.util.concurrent.ExecutorService threadPoolExecutorService; /** * Initializes the HMaster. The steps are as follows: @@ -351,9 +354,9 @@ this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); this.catalogTracker.start(); - + threadPoolExecutorService = Executors.newCachedThreadPool(); this.assignmentManager = new AssignmentManager(this, serverManager, - this.catalogTracker, this.executorService); + this.catalogTracker, this.executorService, threadPoolExecutorService); this.balancer = new LoadBalancer(conf); zooKeeper.registerListenerFirst(assignmentManager); @@ -1299,6 +1302,9 @@ } } this.serverManager.shutdownCluster(); + if(null != threadPoolExecutorService){ + threadPoolExecutorService.shutdown(); + } try { this.clusterStatusTracker.setClusterDown(); } catch (KeeperException e) { Index: src/main/java/org/apache/hadoop/hbase/master/TimeOutManagerCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/TimeOutManagerCallable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/master/TimeOutManagerCallable.java (revision 0) @@ -0,0 +1,58 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; +import org.apache.hadoop.hbase.master.AssignmentManager.TimeOutOperationType; + +public class TimeOutManagerCallable implements Callable { + + private AssignmentManager assignmentManager; + + private HRegionInfo hri; + + private TimeOutOperationType operation; + + private RegionState.State state; + + + public TimeOutManagerCallable(AssignmentManager assignmentManager, + HRegionInfo hri, TimeOutOperationType operation, RegionState.State state) { + this.assignmentManager = assignmentManager; + this.hri = hri; + this.operation = operation; + this.state = state; + } + + @Override + public Object call() throws Exception { + if (TimeOutOperationType.ASSIGN.equals(operation)) { + assignmentManager.assign(hri, state); + } else { + assignmentManager.unassign(hri); + } + return null; + } + +} + Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (revision 1154815) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java (working copy) @@ -86,7 +86,7 @@ // If fails, just return. Someone stole the region from under us. // Calling transitionZookeeperOfflineToOpening initalizes this.version. - if (!transitionZookeeperOfflineToOpening(encodedName)) { + if (!transitionZookeeperOfflineOrReAllocateToOpening(encodedName)) { LOG.warn("Region was hijacked? It no longer exists, encodedName=" + encodedName); return; @@ -327,7 +327,7 @@ * name). * @return True if successful transition. */ - boolean transitionZookeeperOfflineToOpening(final String encodedName) { + boolean transitionZookeeperOfflineOrReAllocateToOpening(final String encodedName) { // TODO: should also handle transition from CLOSED? try { // Initialize the znode version. Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 1154815) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (working copy) @@ -23,10 +23,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.executor.RegionTransitionData; import org.apache.hadoop.hbase.executor.EventHandler.EventType; +import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; +import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -646,6 +649,9 @@ int expectedVersion, final byte [] payload) throws KeeperException { String encoded = region.getEncodedName(); + + boolean handleReAllocate = false; + if(LOG.isDebugEnabled()) { LOG.debug(zkw.prefix("Attempting to transition node " + HRegionInfo.prettyPrint(encoded) + @@ -676,17 +682,30 @@ } // Verify it is in expected state - if(!existingData.getEventType().equals(beginState)) { - LOG.warn(zkw.prefix("Attempt to transition the " + - "unassigned node for " + encoded + - " from " + beginState + " to " + endState + " failed, " + - "the node existed but was in the state " + existingData.getEventType() + - " set by the server " + serverName)); - return -1; + if (!existingData.getEventType().equals(beginState)) { + if (checkIfCurrentStateIsReAllocate(existingData, serverName, zkw) + && existingData.getOrigin().equals(serverName)) { + handleReAllocate = true; + beginState = EventType.M_RS_RE_ALLOCATE; + } else { + LOG + .warn(zkw.prefix("Attempt to transition the " + + "unassigned node for " + encoded + " from " + beginState + + " to " + endState + " failed, " + + "the node existed but was in the state " + + existingData.getEventType() + " set by the server " + + serverName)); + return -1; + } } // Write new data, ensuring data has not changed since we last read it try { + if(handleReAllocate){ + LOG.debug(zkw.prefix("Node was already in "+beginState+". Attempting to transition node " + + HRegionInfo.prettyPrint(encoded) + + " from " + beginState + " to " + endState.toString())); + } RegionTransitionData data = new RegionTransitionData(endState, region.getRegionName(), serverName, payload); if(!ZKUtil.setData(zkw, node, data.getBytes(), stat.getVersion())) { @@ -697,9 +716,15 @@ "setting data we got a version mismatch")); return -1; } - if(LOG.isDebugEnabled()) { - LOG.debug(zkw.prefix("Successfully transitioned node " + encoded + - " from " + beginState + " to " + endState)); + byte[] setData = ZKUtil.getDataNoWatch(zkw, node, null); + if (null != setData) { + if (RegionTransitionData.fromBytes(setData).getEventType().equals( + endState)) { + if (LOG.isDebugEnabled()) { + LOG.debug(zkw.prefix("Successfully transitioned node " + encoded + + " from " + beginState + " to " + endState)); + } + } } return stat.getVersion() + 1; } catch (KeeperException.NoNodeException nne) { @@ -712,6 +737,15 @@ } } + private static boolean checkIfCurrentStateIsReAllocate( + RegionTransitionData existingData, ServerName serverName, + ZooKeeperWatcher zkw) { + if (existingData.getEventType().equals(EventType.M_RS_RE_ALLOCATE)) { + return true; + } + return false; + } + /** * Gets the current data in the unassigned node for the specified region name * or fully-qualified path. @@ -832,7 +866,92 @@ Thread.sleep(100); } } + + /** + * Creates or force updates an unassigned node to the RE_ALLOCATE state for + * the specified region. This state is set in the node when the timeoutmonitor + * deducts a timeout for a node. + *

+ * Attempts to create the node but if it exists will force it to transition to + * and RE_ALLOCATE state. + * + *

+ * Sets a watcher on the unassigned region node if the method is successful. + * + *

+ * This method should be used when timeout monitor has deducted that the + * region has not been opened for a long time. + * + * @param zkw + * zk reference + * @param region + * region to be created as reallocate + * @param serverName + * server event originates from + * @param prevStateInMemory + * @throws KeeperException + * if unexpected zookeeper exception + * @throws KeeperException.NodeExistsException + * if node already exists + */ + public static boolean forceNodeToReAllocate(ZooKeeperWatcher zkw, + HRegionInfo region, ServerName serverName, + RegionState.State prevStateInMemory) throws KeeperException { + LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " + + region.getEncodedName() + " with " + HConstants.RE_ALLOCATE_STATE + + " state")); + String node = getNodeName(zkw, region.getEncodedName()); + zkw.sync(node); + int version = ZKUtil.checkExists(zkw, node); + if (version != -1) { + RegionTransitionData currentStateInZK = ZKAssign.getDataNoWatch(zkw, + node, null); + boolean isprevStateInOpeningTransition = prevStateInMemory + .equals(State.OFFLINE) + || prevStateInMemory.equals(State.PENDING_OPEN); + boolean iscurrentStateInOpeningTransition = currentStateInZK + .getEventType() == EventType.RS_ZK_REGION_OPENING + || currentStateInZK.getEventType() == EventType.RS_ZK_REGION_OPENED; + + if ((isprevStateInOpeningTransition && iscurrentStateInOpeningTransition) + || (prevStateInMemory.equals(State.OPENING) && currentStateInZK + .getEventType() == EventType.RS_ZK_REGION_OPENED)) { + // Already changed to opening or opened + LOG.info("Before tranisting the znode to " + + HConstants.RE_ALLOCATE_STATE + " the data in the " + + " node changed to" + currentStateInZK.getEventType()); + return false; + } + + RegionTransitionData data = new RegionTransitionData( + EventType.M_RS_RE_ALLOCATE, region.getRegionName(), serverName); + try { + if (false == ZKUtil.setData(zkw, node, data.getBytes(), version)) { + return false; + } + } catch (KeeperException e) { + LOG.debug("Before setting to " + HConstants.RE_ALLOCATE_STATE + + " the RS changed the state."); + return false; + } + + RegionTransitionData curData = ZKAssign.getData(zkw, region + .getEncodedName()); + // TODO : check with the server name also + if (null != curData) { + EventType currentDataEventType = curData.getEventType(); + if (currentDataEventType != data.getEventType()) { + // state changed, need to process + LOG.debug("Before setting to " + HConstants.RE_ALLOCATE_STATE + + " the RS changed the state to " + currentDataEventType); + return false; + } + } + } + return true; + } + /** * Verifies that the specified region is in the specified state in ZooKeeper. *