diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index f4625f6..11b53b2 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -538,7 +538,7 @@ public class AssignmentManager extends ZooKeeperListener { addToServers(serverInfo, regionInfo); } // Remove plan if one. - this.regionPlans.remove(regionInfo.getEncodedName()); + clearRegionPlan(regionInfo.getEncodedName()); // Update timers for all regions in transition going against this server. updateTimers(serverInfo); } @@ -558,15 +558,17 @@ public class AssignmentManager extends ZooKeeperListener { */ private void updateTimers(final HServerInfo hsi) { // This loop could be expensive - for (Map.Entry e: this.regionPlans.entrySet()) { - if (e.getValue().getDestination().equals(hsi)) { - RegionState rs = null; - synchronized (this.regionsInTransition) { - rs = this.regionsInTransition.get(e.getKey()); - } - if (rs != null) { - synchronized (rs) { - rs.update(rs.getState()); + synchronized (this.regionPlans) { + for (Map.Entry e: this.regionPlans.entrySet()) { + if (e.getValue().getDestination().equals(hsi)) { + RegionState rs = null; + synchronized (this.regionsInTransition) { + rs = this.regionsInTransition.get(e.getKey()); + } + if (rs != null) { + synchronized (rs) { + rs.update(rs.getState()); + } } } } @@ -586,7 +588,10 @@ public class AssignmentManager extends ZooKeeperListener { this.regionsInTransition.notifyAll(); } } + // remove the region plan as well just in case. + this.regionPlans.remove(regionInfo.getEncodedName()); setOffline(regionInfo); + // TODO remove the CLOSING node if it is there. } /** @@ -681,6 +686,7 @@ public class AssignmentManager extends ZooKeeperListener { final List regions) { LOG.debug("Bulk assigning " + regions.size() + " region(s) to " + destination.getServerName()); + List states = new ArrayList(regions.size()); synchronized (this.regionsInTransition) { for (HRegionInfo region: regions) { @@ -1704,7 +1710,9 @@ public class AssignmentManager extends ZooKeeperListener { * @param plan Plan to execute. */ void balance(final RegionPlan plan) { - this.regionPlans.put(plan.getRegionName(), plan); + synchronized (this.regionPlans) { + this.regionPlans.put(plan.getRegionName(), plan); + } unassign(plan.getRegionInfo()); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 2eeb19f..5ae9611 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -154,6 +154,10 @@ public class CompactSplitThread extends Thread implements CompactionRequestor { if (!st.prepare()) return; try { st.execute(this.server, this.server); + } catch (ConcurrentCloseSplitFailedException e) { + LOG.info("Split failed due to concurrent close, leaving region " + + parent + " closed and moving on."); + return; } catch (IOException ioe) { try { LOG.info("Running rollback of failed split of " + diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/ConcurrentCloseSplitFailedException.java b/src/main/java/org/apache/hadoop/hbase/regionserver/ConcurrentCloseSplitFailedException.java new file mode 100644 index 0000000..8da941a --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/ConcurrentCloseSplitFailedException.java @@ -0,0 +1,26 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +public class ConcurrentCloseSplitFailedException extends IOException { +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index dcea027..bdd43c9 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -465,6 +465,8 @@ public class HRegion implements HeapSize { // , Writable{ return close(false); } + private final Object closeLock = new Object(); + /** * Close down this HRegion. Flush the cache unless abort parameter is true, * Shut down each HStore, don't service any more calls. @@ -481,59 +483,64 @@ public class HRegion implements HeapSize { // , Writable{ */ public List close(final boolean abort) throws IOException { - if (isClosed()) { - LOG.warn("Region " + this + " already closed"); - return null; - } - boolean wasFlushing = false; - synchronized (writestate) { - // Disable compacting and flushing by background threads for this - // region. - writestate.writesEnabled = false; - wasFlushing = writestate.flushing; - LOG.debug("Closing " + this + ": disabling compactions & flushes"); - while (writestate.compacting || writestate.flushing) { - LOG.debug("waiting for" + - (writestate.compacting ? " compaction" : "") + - (writestate.flushing ? - (writestate.compacting ? "," : "") + " cache flush" : - "") + " to complete for region " + this); - try { - writestate.wait(); - } catch (InterruptedException iex) { - // continue - } - } - } - // If we were not just flushing, is it worth doing a preflush...one - // that will clear out of the bulk of the memstore before we put up - // the close flag? - if (!abort && !wasFlushing && worthPreFlushing()) { - LOG.info("Running close preflush of " + this.getRegionNameAsString()); - internalFlushcache(); - } - this.closing.set(true); - lock.writeLock().lock(); - try { - if (this.isClosed()) { - // SplitTransaction handles the null + // Only allow one thread to close at a time. Serialize them so dual + // threads attempting to close will run up against each other. + + synchronized (closeLock) { + if (isClosed()) { + LOG.warn("Region " + this + " already closed"); return null; } - LOG.debug("Updates disabled for region " + this); - // Don't flush the cache if we are aborting - if (!abort) { + boolean wasFlushing = false; + synchronized (writestate) { + // Disable compacting and flushing by background threads for this + // region. + writestate.writesEnabled = false; + wasFlushing = writestate.flushing; + LOG.debug("Closing " + this + ": disabling compactions & flushes"); + while (writestate.compacting || writestate.flushing) { + LOG.debug("waiting for" + + (writestate.compacting ? " compaction" : "") + + (writestate.flushing ? + (writestate.compacting ? "," : "") + " cache flush" : + "") + " to complete for region " + this); + try { + writestate.wait(); + } catch (InterruptedException iex) { + // continue + } + } + } + // If we were not just flushing, is it worth doing a preflush...one + // that will clear out of the bulk of the memstore before we put up + // the close flag? + if (!abort && !wasFlushing && worthPreFlushing()) { + LOG.info("Running close preflush of " + this.getRegionNameAsString()); internalFlushcache(); } + this.closing.set(true); + lock.writeLock().lock(); + try { + if (this.isClosed()) { + // SplitTransaction handles the null + return null; + } + LOG.debug("Updates disabled for region " + this); + // Don't flush the cache if we are aborting + if (!abort) { + internalFlushcache(); + } - List result = new ArrayList(); - for (Store store : stores.values()) { - result.addAll(store.close()); + List result = new ArrayList(); + for (Store store : stores.values()) { + result.addAll(store.close()); + } + this.closed.set(true); + LOG.info("Closed " + this); + return result; + } finally { + lock.writeLock().unlock(); } - this.closed.set(true); - LOG.info("Closed " + this); - return result; - } finally { - lock.writeLock().unlock(); } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index 1bcde8c..01cc020 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -191,12 +191,20 @@ class SplitTransaction { boolean testing = server == null? true: server.getConfiguration().getBoolean("hbase.testing.nocluster", false); - createSplitDir(this.parent.getFilesystem(), this.splitdir); - this.journal.add(JournalEntry.CREATE_SPLIT_DIR); - List hstoreFilesToSplit = this.parent.close(false); + if (hstoreFilesToSplit == null) { + // the region was closed by a concurrent thread, we can't continue + // with the split, instead we must just abandon the split. If we + // reopen or split this could cause problems because the region has + // probably already been moved to a different server, or is in the + // progress of moving to a different server. + throw new ConcurrentCloseSplitFailedException(); + } this.journal.add(JournalEntry.CLOSED_PARENT_REGION); + createSplitDir(this.parent.getFilesystem(), this.splitdir); + this.journal.add(JournalEntry.CREATE_SPLIT_DIR); + if (!testing) { services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName()); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java index 6cba18a..f5771c0 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java @@ -115,7 +115,15 @@ public class CloseRegionHandler extends EventHandler { try { // TODO: If we need to keep updating CLOSING stamp to prevent against // a timeout if this is long-running, need to spin up a thread? - region.close(abort); + if (region.close(abort) == null) { + // This region got closed. Most likely due to a split. So instead + // of doing the setClosedState() below, let's just ignore and continue. + // The split message will clean up the master state. + + //TODO fix up master to do the right thing. +// removeClosingState(); + return; + } } catch (IOException e) { LOG.error("Unrecoverable exception while closing region " + regionInfo.getRegionNameAsString() + ", still finishing close", e); @@ -173,4 +181,17 @@ public class CloseRegionHandler extends EventHandler { } return expectedVersion; } + + private void removeClosingState() { + try { + ZKAssign.deleteClosingNode(server.getZooKeeper(), + regionInfo); + } catch (KeeperException.NoNodeException e) { + LOG.warn("Error removing closing state node during split failure " + + "of region " + regionInfo, e); + } catch (KeeperException e) { + LOG.warn("Error removing closing state node during split failure " + + "of region " + regionInfo, e); + } + } } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java index 96ced28..5644ea3 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java @@ -355,13 +355,14 @@ public class ZKAssign { * of the specified regions transition to being closed. * * @param zkw zk reference - * @param regionName closing region to be deleted from zk + * @param region closing region to be deleted from zk * @throws KeeperException if unexpected zookeeper exception * @throws KeeperException.NoNodeException if node does not exist */ public static boolean deleteClosingNode(ZooKeeperWatcher zkw, - String regionName) + HRegionInfo region) throws KeeperException, KeeperException.NoNodeException { + String regionName = region.getEncodedName(); return deleteNode(zkw, regionName, EventType.RS_ZK_REGION_CLOSING); } @@ -467,9 +468,11 @@ public class ZKAssign { throws KeeperException, KeeperException.NodeExistsException { LOG.debug(zkw.prefix("Creating unassigned node for " + region.getEncodedName() + " in a CLOSING state")); + RegionTransitionData data = new RegionTransitionData( EventType.RS_ZK_REGION_CLOSING, region.getRegionName(), serverName); - synchronized(zkw.getNodes()) { + + synchronized (zkw.getNodes()) { String node = getNodeName(zkw, region.getEncodedName()); zkw.getNodes().add(node); return ZKUtil.createAndWatch(zkw, node, data.getBytes());