Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (revision 603763) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (working copy) @@ -104,6 +104,9 @@ // Set client pause to the original default conf.setInt("hbase.client.pause", 10 * 1000); + + // Increase number of retries + conf.setInt("hbase.client.retries.number", 10); } /** @@ -310,9 +313,6 @@ mrCluster.shutdown(); } - // verify map-reduce results - verify(conf, MULTI_REGION_TABLE_NAME); - } finally { table.close(); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (revision 603763) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (working copy) @@ -19,11 +19,13 @@ */ package org.apache.hadoop.hbase; +import java.io.IOException; +import java.util.HashMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; /** * Leases @@ -45,10 +47,10 @@ protected final int leasePeriod; protected final int leaseCheckFrequency; private final Thread leaseMonitorThread; - protected final Map leases = - new HashMap(); - protected final TreeSet sortedLeases = new TreeSet(); - protected AtomicBoolean stop = new AtomicBoolean(false); + protected volatile HashMap leases = + new HashMap(); + protected volatile TreeSet sortedLeases = new TreeSet(); + protected volatile AtomicBoolean stop = new AtomicBoolean(false); /** * Creates a lease @@ -74,7 +76,7 @@ * @param name Set name on the lease checking daemon thread. */ public void setName(final String name) { - this.leaseMonitorThread.setName(name); + leaseMonitorThread.setName(name); } /** @@ -85,13 +87,13 @@ * allocation of new leases. */ public void closeAfterLeasesExpire() { - synchronized(this.leases) { - while (this.leases.size() > 0) { + synchronized (leases) { + while (leases.size() > 0) { LOG.info(Thread.currentThread().getName() + " " + Integer.toString(leases.size()) + " lease(s) " + "outstanding. Waiting for them to expire."); try { - this.leases.wait(this.leaseCheckFrequency); + leases.wait(leaseCheckFrequency); } catch (InterruptedException e) { // continue } @@ -107,17 +109,17 @@ */ public void close() { LOG.info(Thread.currentThread().getName() + " closing leases"); - this.stop.set(true); - while (this.leaseMonitorThread.isAlive()) { + stop.set(true); + while (leaseMonitorThread.isAlive()) { try { - this.leaseMonitorThread.interrupt(); - this.leaseMonitorThread.join(); + leaseMonitorThread.interrupt(); + leaseMonitorThread.join(); } catch (InterruptedException iex) { // Ignore } } - synchronized(leases) { - synchronized(sortedLeases) { + synchronized (leases) { + synchronized (sortedLeases) { leases.clear(); sortedLeases.clear(); } @@ -130,81 +132,64 @@ /** * Obtain a lease * - * @param holderId id of lease holder - * @param resourceId id of resource being leased + * @param leaseName lease identifier * @param listener listener that will process lease expirations */ - public void createLease(final long holderId, final long resourceId, + public void createLease(final String leaseName, final LeaseListener listener) { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - Lease lease = new Lease(holderId, resourceId, listener); - name = lease.getLeaseName(); - if(leases.get(name) != null) { + synchronized (leases) { + synchronized (sortedLeases) { + if (leases.get(leaseName) != null) { throw new AssertionError("Impossible state for createLease(): " + - "Lease " + name + " is still held."); + "Lease " + leaseName + " is still held."); } - leases.put(name, lease); + Lease lease = new Lease(leaseName, listener); + leases.put(leaseName, lease); sortedLeases.add(lease); } } -// if (LOG.isDebugEnabled()) { -// LOG.debug("Created lease " + name); -// } } - /* A client renews a lease... */ /** * Renew a lease * - * @param holderId id of lease holder - * @param resourceId id of resource being leased + * @param leaseName name of lease * @throws IOException */ - public void renewLease(final long holderId, final long resourceId) - throws IOException { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - name = createLeaseName(holderId, resourceId); - Lease lease = leases.get(name); + public void renewLease(final String leaseName) throws IOException { + synchronized (leases) { + synchronized (sortedLeases) { + Lease lease = leases.get(leaseName); if (lease == null) { // It's possible that someone tries to renew the lease, but // it just expired a moment ago. So fail. throw new IOException("Cannot renew lease that is not held: " + - name); + leaseName); } sortedLeases.remove(lease); lease.renew(); sortedLeases.add(lease); } } -// if (LOG.isDebugEnabled()) { -// LOG.debug("Renewed lease " + name); -// } } /** * Client explicitly cancels a lease. * - * @param holderId id of lease holder - * @param resourceId id of resource being leased + * @param leaseName name of lease */ - public void cancelLease(final long holderId, final long resourceId) { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - name = createLeaseName(holderId, resourceId); - Lease lease = leases.get(name); + public void cancelLease(final String leaseName) { + synchronized (leases) { + synchronized (sortedLeases) { + Lease lease = leases.remove(leaseName); if (lease == null) { // It's possible that someone tries to renew the lease, but // it just expired a moment ago. So just skip it. return; } sortedLeases.remove(lease); - leases.remove(name); } + leases.notifyAll(); } } @@ -224,13 +209,13 @@ /** {@inheritDoc} */ @Override protected void chore() { - synchronized(leases) { - synchronized(sortedLeases) { + synchronized (leases) { + synchronized (sortedLeases) { Lease top; - while((sortedLeases.size() > 0) + while ((sortedLeases.size() > 0) && ((top = sortedLeases.first()) != null)) { - if(top.shouldExpire()) { - leases.remove(top.getLeaseName()); + if (top.shouldExpire()) { + leases.remove(top.name); sortedLeases.remove(top); top.expired(); } else { @@ -238,109 +223,34 @@ } } } + leases.notifyAll(); } } } - /* - * A Lease name. - * More lightweight than String or Text. - */ - @SuppressWarnings("unchecked") - class LeaseName implements Comparable { - private final long holderId; - private final long resourceId; - - LeaseName(final long hid, final long rid) { - this.holderId = hid; - this.resourceId = rid; - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object obj) { - LeaseName other = (LeaseName)obj; - return this.holderId == other.holderId && - this.resourceId == other.resourceId; - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - // Copy OR'ing from javadoc for Long#hashCode. - int result = (int)(this.holderId ^ (this.holderId >>> 32)); - result ^= (int)(this.resourceId ^ (this.resourceId >>> 32)); - return result; - } - - /** {@inheritDoc} */ - @Override - public String toString() { - return Long.toString(this.holderId) + "/" + - Long.toString(this.resourceId); - } - - /** {@inheritDoc} */ - public int compareTo(Object obj) { - LeaseName other = (LeaseName)obj; - if (this.holderId < other.holderId) { - return -1; - } - if (this.holderId > other.holderId) { - return 1; - } - // holderIds are equal - if (this.resourceId < other.resourceId) { - return -1; - } - if (this.resourceId > other.resourceId) { - return 1; - } - // Objects are equal - return 0; - } - } - - /** Create a lease id out of the holder and resource ids. */ - protected LeaseName createLeaseName(final long hid, final long rid) { - return new LeaseName(hid, rid); - } - /** This class tracks a single Lease. */ @SuppressWarnings("unchecked") private class Lease implements Comparable { - final long holderId; - final long resourceId; + final String name; final LeaseListener listener; - long lastUpdate; - private LeaseName leaseId; + volatile long lastUpdate; - Lease(final long holderId, final long resourceId, - final LeaseListener listener) { - this.holderId = holderId; - this.resourceId = resourceId; + Lease(final String leaseName, final LeaseListener listener) { + this.name = leaseName; this.listener = listener; renew(); } - synchronized LeaseName getLeaseName() { - if (this.leaseId == null) { - this.leaseId = createLeaseName(holderId, resourceId); - } - return this.leaseId; - } - boolean shouldExpire() { return (System.currentTimeMillis() - lastUpdate > leasePeriod); } void renew() { - this.lastUpdate = System.currentTimeMillis(); + lastUpdate = System.currentTimeMillis(); } void expired() { - LOG.info(Thread.currentThread().getName() + " lease expired " + - getLeaseName()); + LOG.info(Thread.currentThread().getName() + " lease expired " + name); listener.leaseExpired(); } @@ -353,24 +263,24 @@ /** {@inheritDoc} */ @Override public int hashCode() { - int result = this.getLeaseName().hashCode(); - result ^= Long.valueOf(this.lastUpdate).hashCode(); + int result = name.hashCode(); + result ^= Long.valueOf(lastUpdate).hashCode(); return result; } - ////////////////////////////////////////////////////////////////////////////// + // // Comparable - ////////////////////////////////////////////////////////////////////////////// + // /** {@inheritDoc} */ public int compareTo(Object o) { Lease other = (Lease) o; - if(this.lastUpdate < other.lastUpdate) { + if (lastUpdate < other.lastUpdate) { return -1; - } else if(this.lastUpdate > other.lastUpdate) { + } else if (lastUpdate > other.lastUpdate) { return 1; } else { - return this.getLeaseName().compareTo(other.getLeaseName()); + return name.compareTo(other.name); } } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 603763) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -79,9 +79,9 @@ // of HRegionServer in isolation. We use AtomicBoolean rather than // plain boolean so we can pass a reference to Chore threads. Otherwise, // Chore threads need to know about the hosting class. - protected final AtomicBoolean stopRequested = new AtomicBoolean(false); + protected volatile AtomicBoolean stopRequested = new AtomicBoolean(false); - protected final AtomicBoolean quiesced = new AtomicBoolean(false); + protected volatile AtomicBoolean quiesced = new AtomicBoolean(false); // Go down hard. Used if file system becomes unavailable and also in // debugging and unit tests. @@ -95,13 +95,13 @@ private final Random rand = new Random(); // region name -> HRegion - protected final SortedMap onlineRegions = + protected volatile SortedMap onlineRegions = Collections.synchronizedSortedMap(new TreeMap()); - protected final Map retiringRegions = + protected volatile Map retiringRegions = new ConcurrentHashMap(); protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final List outboundMsgs = + private volatile List outboundMsgs = Collections.synchronizedList(new ArrayList()); final int numRetries; @@ -120,7 +120,7 @@ private final Leases leases; // Request counter - private final AtomicInteger requestCount = new AtomicInteger(); + private volatile AtomicInteger requestCount = new AtomicInteger(); // A sleeper that sleeps for msgInterval. private final Sleeper sleeper; @@ -296,7 +296,7 @@ // splitting a 'normal' region, and the ROOT table needs to be // updated if we are splitting a META region. HTable t = null; - if (region.getRegionInfo().getTableDesc().getName().equals(META_TABLE_NAME)) { + if (region.getRegionInfo().isMetaTable()) { // We need to update the root region if (this.root == null) { this.root = new HTable(conf, ROOT_TABLE_NAME); @@ -1377,7 +1377,7 @@ if (s == null) { throw new UnknownScannerException("Name: " + scannerName); } - this.leases.renewLease(scannerId, scannerId); + this.leases.renewLease(scannerName); // Collect values to be returned here MapWritable values = new MapWritable(); @@ -1441,7 +1441,7 @@ scanners.put(scannerName, s); } this.leases. - createLease(scannerId, scannerId, new ScannerListener(scannerName)); + createLease(scannerName, new ScannerListener(scannerName)); return scannerId; } catch (IOException e) { LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", @@ -1465,7 +1465,7 @@ throw new UnknownScannerException(scannerName); } s.close(); - this.leases.cancelLease(scannerId, scannerId); + this.leases.cancelLease(scannerName); } catch (IOException e) { checkFileSystem(); throw e; Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 603763) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -181,16 +181,14 @@ */ abstract class BaseScanner extends Chore { protected boolean rootRegion; - protected final Text tableName; protected abstract boolean initialScan(); protected abstract void maintenanceScan(); - BaseScanner(final Text tableName, final int period, + BaseScanner(final boolean rootRegion, final int period, final AtomicBoolean stop) { super(period, stop); - this.tableName = tableName; - this.rootRegion = tableName.equals(ROOT_TABLE_NAME); + this.rootRegion = rootRegion; } @Override @@ -506,7 +504,7 @@ class RootScanner extends BaseScanner { /** Constructor */ public RootScanner() { - super(HConstants.ROOT_TABLE_NAME, metaRescanInterval, closed); + super(true, metaRescanInterval, closed); } private boolean scanRoot() { @@ -671,7 +669,7 @@ /** Constructor */ public MetaScanner() { - super(HConstants.META_TABLE_NAME, metaRescanInterval, closed); + super(false, metaRescanInterval, closed); } private boolean scanOneMetaRegion(MetaRegion region) { @@ -1182,16 +1180,25 @@ * regions can shut down. */ private void stopScanners() { + if (LOG.isDebugEnabled()) { + LOG.debug("telling root scanner to stop"); + } synchronized(rootScannerLock) { if (rootScannerThread.isAlive()) { rootScannerThread.interrupt(); // Wake root scanner } } + if (LOG.isDebugEnabled()) { + LOG.debug("telling meta scanner to stop"); + } synchronized(metaScannerLock) { if (metaScannerThread.isAlive()) { metaScannerThread.interrupt(); // Wake meta scanner } } + if (LOG.isDebugEnabled()) { + LOG.debug("meta and root scanners notified"); + } } /* @@ -1269,8 +1276,7 @@ loadToServers.put(load, servers); if (!closed.get()) { - long serverLabel = getServerLabel(s); - serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s)); + serverLeases.createLease(s, new ServerExpirer(s)); } return createConfigurationSubset(); @@ -1290,15 +1296,10 @@ return mw; } - private long getServerLabel(final String s) { - return s.hashCode(); - } - /** {@inheritDoc} */ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException { String serverName = serverInfo.getServerAddress().toString().trim(); - long serverLabel = getServerLabel(serverName); if (msgs.length > 0) { if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { synchronized (serversToServerInfo) { @@ -1311,7 +1312,7 @@ ": MSG_REPORT_EXITING -- cancelling lease"); } - if (cancelLease(serverName, serverLabel)) { + if (cancelLease(serverName)) { // Only process the exit message if the server still has a lease. // Otherwise we could end up processing the server exit twice. LOG.info("Region server " + serverName + @@ -1341,21 +1342,26 @@ } } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) { LOG.info("Region server " + serverName + " quiesced"); - if(quiescedMetaServers.incrementAndGet() == serversToServerInfo.size()) { - // If the only servers we know about are meta servers, then we can - // proceed with shutdown - LOG.info("All user tables quiesced. Proceeding with shutdown"); - closed.set(true); - stopScanners(); - synchronized(toDoQueue) { - toDoQueue.clear(); // Empty the queue - delayedToDoQueue.clear(); // Empty shut down queue - toDoQueue.notifyAll(); // Wake main thread - } - } + quiescedMetaServers.incrementAndGet(); } } + if(quiescedMetaServers.get() >= serversToServerInfo.size()) { + // If the only servers we know about are meta servers, then we can + // proceed with shutdown + LOG.info("All user tables quiesced. Proceeding with shutdown"); + closed.set(true); + stopScanners(); + synchronized(toDoQueue) { + toDoQueue.clear(); // Empty the queue + delayedToDoQueue.clear(); // Empty shut down queue + toDoQueue.notifyAll(); // Wake main thread + } + synchronized (serversToServerInfo) { + serversToServerInfo.notifyAll(); + } + } + if (shutdownRequested && !closed.get()) { // Tell the server to stop serving any user regions return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)}; @@ -1395,7 +1401,7 @@ } synchronized (serversToServerInfo) { - cancelLease(serverName, serverLabel); + cancelLease(serverName); serversToServerInfo.notifyAll(); } return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; @@ -1406,7 +1412,7 @@ // This will always succeed; otherwise, the fetch of serversToServerInfo // would have failed above. - serverLeases.renewLease(serverLabel, serverLabel); + serverLeases.renewLease(serverName); // Refresh the info object and the load information @@ -1443,7 +1449,7 @@ } /** Cancel a server's lease and update its load information */ - private boolean cancelLease(final String serverName, final long serverLabel) { + private boolean cancelLease(final String serverName) { boolean leaseCancelled = false; HServerInfo info = serversToServerInfo.remove(serverName); if (info != null) { @@ -1454,7 +1460,7 @@ unassignRootRegion(); } LOG.info("Cancelling lease for " + serverName); - serverLeases.cancelLease(serverLabel, serverLabel); + serverLeases.cancelLease(serverName); leaseCancelled = true; // update load information @@ -1638,7 +1644,7 @@ " split. New regions are: " + newRegionA.getRegionName() + ", " + newRegionB.getRegionName()); - if (region.getTableDesc().getName().equals(META_TABLE_NAME)) { + if (region.isMetaTable()) { // A meta region has split. onlineMetaRegions.remove(region.getStartKey()); @@ -2028,7 +2034,7 @@ serverName + "> (or server is null). Marking unassigned if " + "meta and clearing pendingRegions"); - if (info.getTableDesc().getName().equals(META_TABLE_NAME)) { + if (info.isMetaTable()) { if (LOG.isDebugEnabled()) { LOG.debug("removing meta region " + info.getRegionName() + " from online meta regions"); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 603763) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -225,6 +225,7 @@ protected final long threadWakeFrequency; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Integer updateLock = new Integer(0); + private final Integer splitLock = new Integer(0); private final long desiredMaxFileSize; private final long minSequenceId; private final String encodedRegionName; @@ -381,54 +382,56 @@ LOG.info("region " + this.regionInfo.getRegionName() + " already closed"); return null; } - lock.writeLock().lock(); - try { - synchronized (writestate) { - while (writestate.compacting || writestate.flushing) { - try { - writestate.wait(); - } catch (InterruptedException iex) { - // continue + synchronized (splitLock) { + lock.writeLock().lock(); + try { + synchronized (writestate) { + while (writestate.compacting || writestate.flushing) { + try { + writestate.wait(); + } catch (InterruptedException iex) { + // continue + } } + // Disable compacting and flushing by background threads for this + // region. + writestate.writesEnabled = false; } - // Disable compacting and flushing by background threads for this - // region. - writestate.writesEnabled = false; - } - - // Wait for active scanners to finish. The write lock we hold will prevent - // new scanners from being created. - - synchronized (activeScannerCount) { - while (activeScannerCount.get() != 0) { - try { - activeScannerCount.wait(); - - } catch (InterruptedException e) { - // continue + + // Wait for active scanners to finish. The write lock we hold will prevent + // new scanners from being created. + + synchronized (activeScannerCount) { + while (activeScannerCount.get() != 0) { + try { + activeScannerCount.wait(); + + } catch (InterruptedException e) { + // continue + } } } - } - - // Write lock means no more row locks can be given out. Wait on - // outstanding row locks to come in before we close so we do not drop - // outstanding updates. - waitOnRowLocks(); - // Don't flush the cache if we are aborting - if (!abort) { - internalFlushcache(snapshotMemcaches()); - } + // Write lock means no more row locks can be given out. Wait on + // outstanding row locks to come in before we close so we do not drop + // outstanding updates. + waitOnRowLocks(); - List result = new ArrayList(); - for (HStore store: stores.values()) { - result.addAll(store.close()); + // Don't flush the cache if we are aborting + if (!abort) { + internalFlushcache(snapshotMemcaches()); + } + + List result = new ArrayList(); + for (HStore store: stores.values()) { + result.addAll(store.close()); + } + this.closed.set(true); + LOG.info("closed " + this.regionInfo.getRegionName()); + return result; + } finally { + lock.writeLock().unlock(); } - this.closed.set(true); - LOG.info("closed " + this.regionInfo.getRegionName()); - return result; - } finally { - lock.writeLock().unlock(); } } @@ -541,89 +544,91 @@ HRegion[] splitRegion(final RegionUnavailableListener listener) throws IOException { - Text midKey = new Text(); - if (!needsSplit(midKey)) { - return null; - } - long startTime = System.currentTimeMillis(); - Path splits = getSplitsDir(); - HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(), - this.regionInfo.getStartKey(), midKey); - Path dirA = getSplitRegionDir(splits, - HRegionInfo.encodeRegionName(regionAInfo.getRegionName())); - if(fs.exists(dirA)) { - throw new IOException("Cannot split; target file collision at " + dirA); - } - HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(), - midKey, null); - Path dirB = getSplitRegionDir(splits, - HRegionInfo.encodeRegionName(regionBInfo.getRegionName())); - if(this.fs.exists(dirB)) { - throw new IOException("Cannot split; target file collision at " + dirB); - } + synchronized (splitLock) { + Text midKey = new Text(); + if (!needsSplit(midKey) || closed.get()) { + return null; + } + long startTime = System.currentTimeMillis(); + Path splits = getSplitsDir(); + HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(), + this.regionInfo.getStartKey(), midKey); + Path dirA = getSplitRegionDir(splits, + HRegionInfo.encodeRegionName(regionAInfo.getRegionName())); + if(fs.exists(dirA)) { + throw new IOException("Cannot split; target file collision at " + dirA); + } + HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(), + midKey, null); + Path dirB = getSplitRegionDir(splits, + HRegionInfo.encodeRegionName(regionBInfo.getRegionName())); + if(this.fs.exists(dirB)) { + throw new IOException("Cannot split; target file collision at " + dirB); + } - // Notify the caller that we are about to close the region. This moves - // us to the 'retiring' queue. Means no more updates coming in -- just - // whatever is outstanding. - if (listener != null) { - listener.closing(getRegionName()); - } + // Notify the caller that we are about to close the region. This moves + // us to the 'retiring' queue. Means no more updates coming in -- just + // whatever is outstanding. + if (listener != null) { + listener.closing(getRegionName()); + } - // Now close the HRegion. Close returns all store files or null if not - // supposed to close (? What to do in this case? Implement abort of close?) - // Close also does wait on outstanding rows and calls a flush just-in-case. - List hstoreFilesToSplit = close(); - if (hstoreFilesToSplit == null) { - LOG.warn("Close came back null (Implement abort of close?)"); - throw new RuntimeException("close returned empty vector of HStoreFiles"); - } - - // Tell listener that region is now closed and that they can therefore - // clean up any outstanding references. - if (listener != null) { - listener.closed(this.getRegionName()); - } - - // Split each store file. - for(HStoreFile h: hstoreFilesToSplit) { - // A reference to the bottom half of the hsf store file. - HStoreFile.Reference aReference = new HStoreFile.Reference( - this.encodedRegionName, h.getFileId(), new HStoreKey(midKey), - HStoreFile.Range.bottom); - HStoreFile a = new HStoreFile(this.conf, splits, - HRegionInfo.encodeRegionName(regionAInfo.getRegionName()), - h.getColFamily(), Math.abs(rand.nextLong()), aReference); - // Reference to top half of the hsf store file. - HStoreFile.Reference bReference = new HStoreFile.Reference( - this.encodedRegionName, h.getFileId(), new HStoreKey(midKey), - HStoreFile.Range.top); - HStoreFile b = new HStoreFile(this.conf, splits, - HRegionInfo.encodeRegionName(regionBInfo.getRegionName()), - h.getColFamily(), Math.abs(rand.nextLong()), bReference); - h.splitStoreFile(a, b, this.fs); - } + // Now close the HRegion. Close returns all store files or null if not + // supposed to close (? What to do in this case? Implement abort of close?) + // Close also does wait on outstanding rows and calls a flush just-in-case. + List hstoreFilesToSplit = close(); + if (hstoreFilesToSplit == null) { + LOG.warn("Close came back null (Implement abort of close?)"); + throw new RuntimeException("close returned empty vector of HStoreFiles"); + } - // Done! - // Opening the region copies the splits files from the splits directory - // under each region. - HRegion regionA = - new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null); - regionA.close(); - HRegion regionB = - new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null); - regionB.close(); + // Tell listener that region is now closed and that they can therefore + // clean up any outstanding references. + if (listener != null) { + listener.closed(this.getRegionName()); + } - // Cleanup - boolean deleted = fs.delete(splits); // Get rid of splits directory - if (LOG.isDebugEnabled()) { - LOG.debug("Cleaned up " + splits.toString() + " " + deleted); + // Split each store file. + for(HStoreFile h: hstoreFilesToSplit) { + // A reference to the bottom half of the hsf store file. + HStoreFile.Reference aReference = new HStoreFile.Reference( + this.encodedRegionName, h.getFileId(), new HStoreKey(midKey), + HStoreFile.Range.bottom); + HStoreFile a = new HStoreFile(this.conf, splits, + HRegionInfo.encodeRegionName(regionAInfo.getRegionName()), + h.getColFamily(), Math.abs(rand.nextLong()), aReference); + // Reference to top half of the hsf store file. + HStoreFile.Reference bReference = new HStoreFile.Reference( + this.encodedRegionName, h.getFileId(), new HStoreKey(midKey), + HStoreFile.Range.top); + HStoreFile b = new HStoreFile(this.conf, splits, + HRegionInfo.encodeRegionName(regionBInfo.getRegionName()), + h.getColFamily(), Math.abs(rand.nextLong()), bReference); + h.splitStoreFile(a, b, this.fs); + } + + // Done! + // Opening the region copies the splits files from the splits directory + // under each region. + HRegion regionA = + new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null); + regionA.close(); + HRegion regionB = + new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null); + regionB.close(); + + // Cleanup + boolean deleted = fs.delete(splits); // Get rid of splits directory + if (LOG.isDebugEnabled()) { + LOG.debug("Cleaned up " + splits.toString() + " " + deleted); + } + HRegion regions[] = new HRegion [] {regionA, regionB}; + LOG.info("Region split of " + this.regionInfo.getRegionName() + + " complete; " + "new regions: " + regions[0].getRegionName() + ", " + + regions[1].getRegionName() + ". Split took " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); + return regions; } - HRegion regions[] = new HRegion [] {regionA, regionB}; - LOG.info("Region split of " + this.regionInfo.getRegionName() + - " complete; " + "new regions: " + regions[0].getRegionName() + ", " + - regions[1].getRegionName() + ". Split took " + - StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); - return regions; } /* @@ -1030,6 +1035,7 @@ * avoid a bunch of disk activity. * * @param row + * @param ts * @return Map values * @throws IOException */ @@ -1282,6 +1288,7 @@ * @param row The row to operate on * @param family The column family to match * @param timestamp Timestamp to match + * @throws IOException */ public void deleteFamily(Text row, Text family, long timestamp) throws IOException{