Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (revision 1041943) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (working copy) @@ -355,13 +355,14 @@ * of the specified regions transition to being closed. * * @param zkw zk reference - * @param regionName closing region to be deleted from zk + * @param region closing region to be deleted from zk * @throws KeeperException if unexpected zookeeper exception * @throws KeeperException.NoNodeException if node does not exist */ public static boolean deleteClosingNode(ZooKeeperWatcher zkw, - String regionName) + HRegionInfo region) throws KeeperException, KeeperException.NoNodeException { + String regionName = region.getEncodedName(); return deleteNode(zkw, regionName, EventType.RS_ZK_REGION_CLOSING); } @@ -381,7 +382,7 @@ * of the specified regions transition to being closed. * * @param zkw zk reference - * @param region region to be deleted from zk + * @param regionName region to be deleted from zk * @param expectedState state region must be in for delete to complete * @throws KeeperException if unexpected zookeeper exception * @throws KeeperException.NoNodeException if node does not exist @@ -467,9 +468,11 @@ throws KeeperException, KeeperException.NodeExistsException { LOG.debug(zkw.prefix("Creating unassigned node for " + region.getEncodedName() + " in a CLOSING state")); + RegionTransitionData data = new RegionTransitionData( EventType.RS_ZK_REGION_CLOSING, region.getRegionName(), serverName); - synchronized(zkw.getNodes()) { + + synchronized (zkw.getNodes()) { String node = getNodeName(zkw, region.getEncodedName()); zkw.getNodes().add(node); return ZKUtil.createAndWatch(zkw, node, data.getBytes()); Index: src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (revision 1041943) +++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (working copy) @@ -200,6 +200,15 @@ this.journal.add(JournalEntry.CREATE_SPLIT_DIR); List hstoreFilesToSplit = this.parent.close(false); + if (hstoreFilesToSplit == null) { + // The region was closed by a concurrent thread. We can't continue + // with the split, instead we must just abandon the split. If we + // reopen or split this could cause problems because the region has + // probably already been moved to a different server, or is in the + // process of moving to a different server. + throw new IOException("Failed to close region: already closed by " + + "another thread"); + } this.journal.add(JournalEntry.CLOSED_PARENT_REGION); if (!testing) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (revision 1041943) +++ src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (working copy) @@ -114,11 +114,18 @@ // Close the region try { // TODO: If we need to keep updating CLOSING stamp to prevent against - // a timeout if this is long-running, need to spin up a thread? - region.close(abort); + // a timeout if this is long-running, need to spin up a thread? + if (region.close(abort) == null) { + // This region got closed. Most likely due to a split. So instead + // of doing the setClosedState() below, let's just ignore and continue. + // The split message will clean up the master state. + LOG.warn("Can't close region: was already closed during close(): " + + regionInfo.getRegionNameAsString()); + return; + } } catch (IOException e) { LOG.error("Unrecoverable exception while closing region " + - regionInfo.getRegionNameAsString() + ", still finishing close", e); + regionInfo.getRegionNameAsString() + ", still finishing close", e); } this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName()); @@ -164,12 +171,12 @@ try { if ((expectedVersion = ZKAssign.createNodeClosing( server.getZooKeeper(), regionInfo, server.getServerName())) == FAILED) { - LOG.warn("Error creating node in CLOSING state, aborting close of " - + regionInfo.getRegionNameAsString()); + LOG.warn("Error creating node in CLOSING state, aborting close of " + + regionInfo.getRegionNameAsString()); } } catch (KeeperException e) { - LOG.warn("Error creating node in CLOSING state, aborting close of " - + regionInfo.getRegionNameAsString()); + LOG.warn("Error creating node in CLOSING state, aborting close of " + + regionInfo.getRegionNameAsString()); } return expectedVersion; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1041943) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -45,8 +45,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.collect.ClassToInstanceMap; -import com.google.common.collect.MutableClassToInstanceMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -66,15 +64,14 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.coprocessor.Exec; -import org.apache.hadoop.hbase.client.coprocessor.ExecResult; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowLock; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.CoprocessorException; +import org.apache.hadoop.hbase.client.coprocessor.Exec; +import org.apache.hadoop.hbase.client.coprocessor.ExecResult; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.HeapSize; @@ -82,7 +79,6 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.HBaseRPC; -import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -97,7 +93,9 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; +import com.google.common.collect.ClassToInstanceMap; import com.google.common.collect.Lists; +import com.google.common.collect.MutableClassToInstanceMap; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -495,6 +493,8 @@ return close(false); } + private final Object closeLock = new Object(); + /** * Close down this HRegion. Flush the cache unless abort parameter is true, * Shut down each HStore, don't service any more calls. @@ -509,7 +509,15 @@ * * @throws IOException e */ - public List close(final boolean abort) + public List close(final boolean abort) throws IOException { + // Only allow one thread to close at a time. Serialize them so dual + // threads attempting to close will run up against each other. + synchronized (closeLock) { + return doClose(abort); + } + } + + private List doClose(final boolean abort) throws IOException { if (isClosed()) { LOG.warn("Region " + this + " already closed"); Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1041943) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -711,7 +711,7 @@ if (destServerName == null || destServerName.length == 0) { LOG.info("Passed destination servername is null/empty so " + "choosing a server at random"); - this.assignmentManager.clearRegionPlan(hri.getEncodedName()); + this.assignmentManager.clearRegionPlan(hri); // Unassign will reassign it elsewhere choosing random server. this.assignmentManager.unassign(hri); } else { Index: src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1041943) +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -25,6 +25,7 @@ import java.net.ConnectException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -33,7 +34,6 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -108,11 +108,9 @@ /** Plans for region movement. Key is the encoded version of a region name*/ // TODO: When do plans get cleaned out? Ever? In server open and in server // shutdown processing -- St.Ack - // TODO: Better to just synchronize access around regionPlans? I think that - // would be better than a concurrent structure since we do more than - // one operation at a time -- jgray - final ConcurrentNavigableMap regionPlans = - new ConcurrentSkipListMap(); + // All access to this Map must be synchronized. + final NavigableMap regionPlans = + new TreeMap(); private final ZKTable zkTable; @@ -538,7 +536,7 @@ addToServers(serverInfo, regionInfo); } // Remove plan if one. - this.regionPlans.remove(regionInfo.getEncodedName()); + clearRegionPlan(regionInfo); // Update timers for all regions in transition going against this server. updateTimers(serverInfo); } @@ -557,19 +555,24 @@ * @param hsi */ private void updateTimers(final HServerInfo hsi) { - // This loop could be expensive - for (Map.Entry e: this.regionPlans.entrySet()) { - if (e.getValue().getDestination().equals(hsi)) { - RegionState rs = null; - synchronized (this.regionsInTransition) { - rs = this.regionsInTransition.get(e.getKey()); - } - if (rs != null) { - synchronized (rs) { - rs.update(rs.getState()); - } - } + // This loop could be expensive. + // First make a copy of current regionPlan rather than hold sync while + // looping because holding sync can cause deadlock. Its ok in this loop + // if the Map we're going against is a little stale + Map copy = new HashMap(); + synchronized(this.regionPlans) { + copy.putAll(this.regionPlans); + } + for (Map.Entry e: copy.entrySet()) { + if (!e.getValue().getDestination().equals(hsi)) continue; + RegionState rs = null; + synchronized (this.regionsInTransition) { + rs = this.regionsInTransition.get(e.getKey()); } + if (rs == null) continue; + synchronized (rs) { + rs.update(rs.getState()); + } } } @@ -586,6 +589,8 @@ this.regionsInTransition.notifyAll(); } } + // remove the region plan as well just in case. + clearRegionPlan(regionInfo); setOffline(regionInfo); } @@ -681,6 +686,7 @@ final List regions) { LOG.debug("Bulk assigning " + regions.size() + " region(s) to " + destination.getServerName()); + List states = new ArrayList(regions.size()); synchronized (this.regionsInTransition) { for (HRegionInfo region: regions) { @@ -941,23 +947,28 @@ if (servers.isEmpty()) return null; RegionPlan randomPlan = new RegionPlan(state.getRegion(), null, LoadBalancer.randomAssignment(servers)); + boolean newPlan = false; + RegionPlan existingPlan = null; synchronized (this.regionPlans) { - RegionPlan existingPlan = this.regionPlans.get(encodedName); + existingPlan = this.regionPlans.get(encodedName); if (existingPlan == null || forceNewPlan || existingPlan.getDestination().equals(serverToExclude)) { - LOG.debug("No previous transition plan was found (or we are ignoring " + - "an existing plan) for " + state.getRegion().getRegionNameAsString() - + " so generated a random one; " + randomPlan + "; " + - serverManager.countOfRegionServers() + - " (online=" + serverManager.getOnlineServers().size() + - ", exclude=" + serverToExclude + ") available servers"); + newPlan = true; this.regionPlans.put(encodedName, randomPlan); + } + } + if (newPlan) { + LOG.debug("No previous transition plan was found (or we are ignoring " + + "an existing plan) for " + state.getRegion().getRegionNameAsString() + + " so generated a random one; " + randomPlan + "; " + + serverManager.countOfRegionServers() + + " (online=" + serverManager.getOnlineServers().size() + + ", exclude=" + serverToExclude + ") available servers"); return randomPlan; } LOG.debug("Using pre-existing plan for region " + - state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan); + state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan); return existingPlan; - } } /** @@ -1384,15 +1395,15 @@ } } } - clearRegionPlan(hri.getEncodedName()); + clearRegionPlan(hri); } /** - * @param encodedRegionName Region whose plan we are to clear. + * @param region Region whose plan we are to clear. */ - void clearRegionPlan(final String encodedRegionName) { + void clearRegionPlan(final HRegionInfo region) { synchronized (this.regionPlans) { - this.regionPlans.remove(encodedRegionName); + this.regionPlans.remove(region.getEncodedName()); } } @@ -1646,6 +1657,24 @@ public void handleSplitReport(final HServerInfo hsi, final HRegionInfo parent, final HRegionInfo a, final HRegionInfo b) { regionOffline(parent); + // Remove any CLOSING node, if exists, due to race between master & rs + // for close & split. Not putting into regionOffline method because it is + // called from various locations. + try { + RegionTransitionData node = ZKAssign.getDataNoWatch(this.watcher, + parent.getEncodedName(), null); + if (node != null) { + if (node.getEventType().equals(EventType.RS_ZK_REGION_CLOSING)) { + ZKAssign.deleteClosingNode(this.watcher, parent); + } else { + LOG.warn("Split report has RIT node (shouldnt have one): " + + parent + " node: " + node); + } + } + } catch (KeeperException e) { + LOG.warn("Exception while validating RIT during split report", e); + } + regionOnline(a, hsi); regionOnline(b, hsi); @@ -1706,7 +1735,9 @@ * @param plan Plan to execute. */ void balance(final RegionPlan plan) { - this.regionPlans.put(plan.getRegionName(), plan); + synchronized (this.regionPlans) { + this.regionPlans.put(plan.getRegionName(), plan); + } unassign(plan.getRegionInfo()); }