Index: src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (revision 1242819) +++ src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (working copy) @@ -103,7 +103,7 @@ public enum EventType { // Messages originating from RS (NOTE: there is NO direct communication from // RS to Master). These are a result of RS updates into ZK. - RS_ZK_REGION_CLOSING (1), // RS is in process of closing a region + RS_ZK_REGION_CLOSED (2), // RS has finished closing a region RS_ZK_REGION_OPENING (3), // RS is in process of opening a region RS_ZK_REGION_OPENED (4), // RS has finished opening a region @@ -128,6 +128,7 @@ // Updates from master to ZK. This is done by the master and there is // nothing to process by either Master or RS M_ZK_REGION_OFFLINE (50), // Master adds this region as offline in ZK + M_ZK_REGION_CLOSING (51), // Master adds this region as closing in ZK // Master controlled events to be executed on the master M_SERVER_SHUTDOWN (70), // Master is processing shutdown of a RS Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1242819) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler; +import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -114,6 +115,12 @@ // All access to this Map must be synchronized. final NavigableMap regionPlans = new TreeMap(); + + private boolean regionsToProcess = false; + + // Set holding all the regions which got processed while RIT was not + // populated during master failover. + private Set regionsProcessed = new HashSet(1); private final ZKTable zkTable; @@ -258,9 +265,12 @@ } LOG.info("Failed-over master needs to process " + nodes.size() + " regions in transition"); + regionsToProcess = true; for (String encodedRegionName: nodes) { processRegionInTransition(encodedRegionName, null, deadServers); } + regionsToProcess = false; + regionsProcessed.clear(); } /** @@ -327,75 +337,102 @@ LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + data.getEventType()); synchronized (regionsInTransition) { - switch (data.getEventType()) { - case RS_ZK_REGION_CLOSING: - //If zk node of the region was updated by a live server, - //we should skip this region and just add it into RIT. - if (isOnDeadServer(regionInfo, deadServers) && - (null == data.getServerName() || - !serverManager.isServerOnline(data.getServerName()))){ - // If was on dead server, its closed now. Force to OFFLINE and this - // will get it reassigned if appropriate - forceOffline(regionInfo, data); - } else { - // Just insert region into RIT. - // If this never updates the timeout will trigger new assignment - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.CLOSING, data.getStamp())); - } - break; + RegionState regionState = regionsInTransition.get(encodedRegionName); + if (regionState == null + && regionsProcessed.contains(encodedRegionName) == false) { + switch (data.getEventType()) { + case M_ZK_REGION_CLOSING: + // If zk node of the region was updated by a live server, + // we should skip this region and just add it into RIT. + if (isOnDeadServer(regionInfo, deadServers) + && (null == data.getServerName() || !serverManager + .isServerOnline(data.getServerName()))) { + // If was on dead server, its closed now. Force to OFFLINE and this + // will get it reassigned if appropriate + forceOffline(regionInfo, data); + } else { + // Just insert region into RIT. + // If this never updates the timeout will trigger new assignment + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.PENDING_CLOSE, data.getStamp())); + } + regionsProcessed.add(encodedRegionName); + break; - case RS_ZK_REGION_CLOSED: - // Region is closed, insert into RIT and handle it - addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data); - break; + case RS_ZK_REGION_CLOSED: + // Region is closed, insert into RIT and handle it + addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data); + regionsProcessed.add(encodedRegionName); + break; - case M_ZK_REGION_OFFLINE: - // Region is offline, insert into RIT and handle it like a closed - addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); - break; + case M_ZK_REGION_OFFLINE: + // If zk node of the region was updated by a live server skip this + // region and just add it into RIT. + if (isOnDeadServer(regionInfo, deadServers) + && (null == data.getServerName() || !serverManager + .isServerOnline(data.getServerName()))) { + // Region is offline, insert into RIT and handle it like a closed + addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); + } else if (data.getServerName() != null + && !serverManager.isServerOnline(data.getServerName())) { + // to handle cases where offline node is created but sendRegionOpen + // RPC is not yet sent + addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data); + } else { + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.PENDING_OPEN, data.getStamp())); + } + regionsProcessed.add(encodedRegionName); + break; - case RS_ZK_REGION_OPENING: - // TODO: Could check if it was on deadServers. If it was, then we could - // do what happens in TimeoutMonitor when it sees this condition. + case RS_ZK_REGION_OPENING: + // TODO:Could check if it was on deadServers. If it was,then we could + // do what happens in TimeoutMonitor when it sees this condition. - // Just insert region into RIT - // If this never updates the timeout will trigger new assignment - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.OPENING, data.getStamp())); - break; + // Just insert region into RIT + // If this never updates the timeout will trigger new assignment + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.OPENING, data.getStamp())); + regionsProcessed.add(encodedRegionName); + break; - case RS_ZK_REGION_OPENED: - // Region is opened, insert into RIT and handle it - regionsInTransition.put(encodedRegionName, new RegionState( - regionInfo, RegionState.State.OPEN, data.getStamp())); - String sn = data.getServerName(); - // hsi could be null if this server is no longer online. If - // that the case, just let this RIT timeout; it'll be assigned - // to new server then. - if (sn == null) { - LOG.warn("Region in transition " + regionInfo.getEncodedName() + - " references a server no longer up " + data.getServerName() + - "; letting RIT timeout so will be assigned elsewhere"); - break; - } - if ((!serverManager.isServerOnline(sn) || (null == data.getServerName())) - && (isOnDeadServer(regionInfo, deadServers) - || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) { - // If was on a dead server, then its not open any more; needs handling. - forceOffline(regionInfo, data); - } else { - HServerInfo hsi = this.serverManager.getServerInfo(sn); - if (hsi == null) { - LOG.info("Failed to find " + sn + - " in list of online servers; skipping registration of open of " + - regionInfo.getRegionNameAsString()); + case RS_ZK_REGION_OPENED: + // Region is opened, insert into RIT and handle it + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.OPEN, data.getStamp())); + String sn = data.getServerName(); + // hsi could be null if this server is no longer online. If + // that the case, just let this RIT timeout; it'll be assigned + // to new server then. + if (sn == null) { + LOG.warn("Region in transition " + regionInfo.getEncodedName() + + " references a server no longer up " + data.getServerName() + + "; letting RIT timeout so will be assigned elsewhere"); + regionsProcessed.add(encodedRegionName); + break; + } + if ((!serverManager.isServerOnline(sn) || (null == data + .getServerName())) + && (isOnDeadServer(regionInfo, deadServers) + || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) { + // If was on a dead server, then its not open any more; needs + // handling. + forceOffline(regionInfo, data); } else { - new OpenedRegionHandler(master, this, regionInfo, hsi, - expectedVersion).process(); + HServerInfo hsi = this.serverManager.getServerInfo(sn); + if (hsi == null) { + LOG.info("Failed to find " + + sn + + " in list of online servers; skipping registration of open of " + + regionInfo.getRegionNameAsString()); + } else { + new OpenedRegionHandler(master, this, regionInfo, hsi, + expectedVersion).process(); + } } + regionsProcessed.add(encodedRegionName); + break; } - break; } } } @@ -462,6 +499,7 @@ */ private void handleRegion(final RegionTransitionData data, int expectedVersion) { synchronized(regionsInTransition) { + HRegionInfo hri = null; if (data == null || data.getServerName() == null) { LOG.warn("Unexpected NULL input " + data); return; @@ -493,7 +531,15 @@ // Nothing to do. break; - case RS_ZK_REGION_CLOSING: + case M_ZK_REGION_CLOSING: + hri = handleRegionWhileFailOverInProgress(regionState, encodedName, + data); + if (hri != null) { + regionsInTransition.put(encodedName, new RegionState(hri, + RegionState.State.PENDING_CLOSE, data.getStamp())); + regionsProcessed.add(encodedName); + break; + } // Should see CLOSING after we have asked it to CLOSE or additional // times after already being in state of CLOSING if (regionState == null || @@ -505,10 +551,21 @@ return; } // Transition to CLOSING (or update stamp if already CLOSING) - regionState.update(RegionState.State.CLOSING, data.getStamp()); + regionState.update(RegionState.State.PENDING_CLOSE, data.getStamp()); break; case RS_ZK_REGION_CLOSED: + hri = handleRegionWhileFailOverInProgress(regionState, encodedName, + data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.CLOSED, + data.getStamp()); + regionsInTransition.put(encodedName, regionState); + this.executorService.submit(new ClosedRegionHandler(master, this, + regionState.getRegion())); + regionsProcessed.add(encodedName); + break; + } // Should see CLOSED after CLOSING but possible after PENDING_CLOSE if (regionState == null || (!regionState.isPendingClose() && !regionState.isClosing())) { @@ -527,6 +584,15 @@ break; case RS_ZK_REGION_OPENING: + hri = handleRegionWhileFailOverInProgress(regionState, encodedName, + data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.OPENING, + data.getStamp()); + regionsInTransition.put(encodedName, regionState); + regionsProcessed.add(encodedName); + break; + } // Should see OPENING after we have asked it to OPEN or additional // times after already being in state of OPENING if (regionState == null || @@ -543,6 +609,18 @@ break; case RS_ZK_REGION_OPENED: + hri = handleRegionWhileFailOverInProgress(regionState, encodedName, + data); + if (hri != null) { + regionState = new RegionState(hri, RegionState.State.OPEN, + data.getStamp()); + regionsInTransition.put(encodedName, regionState); + this.executorService.submit(new OpenedRegionHandler(master, this, + regionState.getRegion(), this.serverManager.getServerInfo(data + .getServerName()), expectedVersion)); + regionsProcessed.add(encodedName); + break; + } // Should see OPENED after OPENING but possible after PENDING_OPEN if (regionState == null || (!regionState.isPendingOpen() && !regionState.isOpening())) { @@ -563,8 +641,47 @@ } } } + + /** + * Checks whether the callback came while RIT was not yet populated during + * master failover. + * + * @param regionState + * @param encodedName + * @param data + * @return hri + */ + private HRegionInfo handleRegionWhileFailOverInProgress( + RegionState regionState, String encodedName, RegionTransitionData data) { + if (regionState == null && regionsToProcess + && regionsProcessed.contains(encodedName) == false) { + HRegionInfo hri = getHRegionInfo(data); + return hri; + } + return null; + } /** + * Gets the HRegionInfo from the META table + * + * @param data + * @return HRegionInfo hri for the region + */ + private HRegionInfo getHRegionInfo(RegionTransitionData data) { + HRegionInfo hri = null; + Pair p = null; + try { + p = MetaReader.getRegion(catalogTracker, data.getRegionName()); + } catch (IOException e) { + master.abort("Aborting because error occoured while reading " + + data.getRegionName() + " from .META.", e); + return null; + } + hri = p.getFirst(); + return hri; + } + + /** * Handle a ZK unassigned node transition triggered by HBCK repair tool. *

* This is handled in a separate code path because it breaks the normal rules. @@ -1083,6 +1200,15 @@ serverManager.sendRegionOpen(plan.getDestination(), state.getRegion()); break; } catch (Throwable t) { + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + if (t instanceof RegionAlreadyInTransitionException) { + String errorMsg = "Failed assignment in: " + plan.getDestination() + + " due to " + t.getMessage(); + LOG.error(errorMsg, t); + return; + } + } LOG.warn("Failed assignment of " + state.getRegion().getRegionNameAsString() + " to " + plan.getDestination() + ", trying to assign elsewhere instead; " + @@ -1257,9 +1383,22 @@ String encodedName = region.getEncodedName(); // Grab the state of this region and synchronize on it RegionState state; + int expectedVersion = -1; synchronized (regionsInTransition) { state = regionsInTransition.get(encodedName); if (state == null) { + try { + expectedVersion = ZKAssign.createNodeClosing(master.getZooKeeper(), + region, master.getServerName()); + if (expectedVersion == -1) { + LOG.warn("Error creating node in CLOSING state, aborting close of " + + region.getRegionNameAsString()); + return; + } + } catch (KeeperException e) { + master.abort("Unexpected ZK exception creating node CLOSING", e); + return; + } state = new RegionState(region, RegionState.State.PENDING_CLOSE); regionsInTransition.put(encodedName, state); } else if (force && state.isPendingClose()) { @@ -1314,6 +1453,12 @@ } } } + // RS is already processing this region, only need to update the + // timestamp + if (t instanceof RegionAlreadyInTransitionException) { + LOG.debug("update " + state + " the timestamp."); + state.update(state.getState()); + } } LOG.info("Server " + server + " returned " + t + " for " + region.getEncodedName()); @@ -2129,7 +2274,7 @@ RegionTransitionData node = ZKAssign.getDataNoWatch(this.watcher, parent.getEncodedName(), null); if (node != null) { - if (node.getEventType().equals(EventType.RS_ZK_REGION_CLOSING)) { + if (node.getEventType().equals(EventType.M_ZK_REGION_CLOSING)) { ZKAssign.deleteClosingNode(this.watcher, parent); } else { LOG.warn("Split report has RIT node (shouldnt have one): " + Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (revision 1242819) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (working copy) @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver.handler; import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; @@ -29,6 +28,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; /** @@ -57,7 +57,7 @@ // close -- not the master process so state up in zk will unlikely be // CLOSING. private final boolean zk; - + // This is executed after receiving an CLOSE RPC from the master. public CloseRegionHandler(final Server server, final RegionServerServices rsServices, HRegionInfo regionInfo) { @@ -105,14 +105,7 @@ LOG.warn("Received CLOSE for region " + name + " but currently not serving"); return; - } - - int expectedVersion = FAILED; - if (this.zk) { - expectedVersion = setClosingState(); - if (expectedVersion == FAILED) return; - } - + } // Close the region try { // TODO: If we need to keep updating CLOSING stamp to prevent against @@ -135,6 +128,7 @@ } this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName()); + int expectedVersion = getVersion(); if (this.zk) setClosedState(expectedVersion, region); @@ -145,6 +139,20 @@ remove(this.regionInfo.getEncodedNameAsBytes()); } } + + private int getVersion() { + int version = -1; + try { + if (ZKAssign.getVersion(server.getZooKeeper(), this.regionInfo) == -1) { + LOG.warn("Error getting node's version in CLOSING state," + + " aborting close of " + regionInfo.getRegionNameAsString()); + } + } catch (KeeperException e) { + LOG.warn("Error creating node in CLOSING state, aborting close of " + + regionInfo.getRegionNameAsString(), e); + } + return version; + } /** * Transition ZK node to CLOSED @@ -171,23 +179,4 @@ return; } } - - /** - * Create ZK node in CLOSING state. - * @return The expectedVersion. If -1, we failed setting CLOSING. - */ - private int setClosingState() { - int expectedVersion = FAILED; - try { - if ((expectedVersion = ZKAssign.createNodeClosing( - server.getZooKeeper(), regionInfo, server.getServerName())) == FAILED) { - LOG.warn("Error creating node in CLOSING state, aborting close of " + - regionInfo.getRegionNameAsString()); - } - } catch (KeeperException e) { - LOG.warn("Error creating node in CLOSING state, aborting close of " + - regionInfo.getRegionNameAsString(), e); - } - return expectedVersion; - } } Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 1242819) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (working copy) @@ -110,6 +110,20 @@ return path.substring(zkw.assignmentZNode.length()+1); } + /** + * Get the version of the specified znode + * + * @param zkw zk reference + * @param region region's info + * @return the version of the znode, -1 if it doesn't exist + * @throws KeeperException + */ + public static int getVersion(ZooKeeperWatcher zkw, HRegionInfo region) + throws KeeperException { + String znode = getNodeName(zkw, region.getEncodedName()); + return ZKUtil.checkExists(zkw, znode); + } + // Master methods /** @@ -363,7 +377,7 @@ HRegionInfo region) throws KeeperException, KeeperException.NoNodeException { String regionName = region.getEncodedName(); - return deleteNode(zkw, regionName, EventType.RS_ZK_REGION_CLOSING); + return deleteNode(zkw, regionName, EventType.M_ZK_REGION_CLOSING); } /** @@ -507,7 +521,7 @@ region.getEncodedName() + " in a CLOSING state")); RegionTransitionData data = new RegionTransitionData( - EventType.RS_ZK_REGION_CLOSING, region.getRegionName(), serverName); + EventType.M_ZK_REGION_CLOSING, region.getRegionName(), serverName); synchronized (zkw.getNodes()) { String node = getNodeName(zkw, region.getEncodedName()); @@ -546,7 +560,7 @@ HRegionInfo region, String serverName, int expectedVersion) throws KeeperException { return transitionNode(zkw, region, serverName, - EventType.RS_ZK_REGION_CLOSING, + EventType.M_ZK_REGION_CLOSING, EventType.RS_ZK_REGION_CLOSED, expectedVersion); }