Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1577421) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1042,22 +1042,25 @@ } status.setStatus("Disabling compacts and flushes for region"); - boolean wasFlushing = false; synchronized (writestate) { // Disable compacting and flushing by background threads for this // region. writestate.writesEnabled = false; - wasFlushing = writestate.flushing; LOG.debug("Closing " + this + ": disabling compactions & flushes"); waitForFlushesAndCompactions(); } // If we were not just flushing, is it worth doing a preflush...one // that will clear out of the bulk of the memstore before we put up // the close flag? - if (!abort && !wasFlushing && worthPreFlushing()) { + if (!abort && worthPreFlushing()) { status.setStatus("Pre-flushing region before close"); LOG.info("Running close preflush of " + this.getRegionNameAsString()); - internalFlushcache(status); + try { + internalFlushcache(status); + } catch (IOException ioe) { + // Failed to flush the region. Keep going. + status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage()); + } } this.closing.set(true); @@ -1073,7 +1076,30 @@ LOG.debug("Updates disabled for region " + this); // Don't flush the cache if we are aborting if (!abort) { - internalFlushcache(status); + int flushCount = 0; + while (this.getMemstoreSize().get() > 0) { + try { + if (flushCount++ > 0) { + int actualFlushes = flushCount - 1; + if (actualFlushes > 5) { + // If we tried 5 times and are unable to clear memory, abort + // so we do not lose data + throw new DroppedSnapshotException("Failed clearing memory after " + + actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName())); + } + LOG.info("Running extra flush, " + actualFlushes + + " (carrying snapshot?) " + this); + } + internalFlushcache(status); + } catch (IOException ioe) { + status.setStatus("Failed flush " + this + ", putting online again"); + synchronized (writestate) { + writestate.writesEnabled = true; + } + // Have to throw to upper layers. I can't abort server from here. + throw ioe; + } + } } List result = new ArrayList(); @@ -1088,6 +1114,7 @@ // close each store in parallel for (final Store store : stores.values()) { + assert abort? true: store.getFlushableSize() == 0; completionService .submit(new Callable>() { public ImmutableList call() throws IOException { @@ -1111,7 +1138,7 @@ } } this.closed.set(true); - + if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get()); if (coprocessorHost != null) { status.setStatus("Running coprocessor post-close hooks"); this.coprocessorHost.postClose(abort); @@ -1629,7 +1656,7 @@ status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); - long flushsize = this.memstoreSize.get(); + long totalFlushableSize = 0; status.setStatus("Preparing to flush by snapshotting stores"); List storeFlushers = new ArrayList(stores.size()); try { @@ -1642,6 +1669,7 @@ completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); for (Store s : stores.values()) { + totalFlushableSize += s.getFlushableSize(); storeFlushers.add(s.getStoreFlusher(completeSequenceId)); } @@ -1653,7 +1681,7 @@ this.updatesLock.writeLock().unlock(); } String s = "Finished snapshotting " + this + - ", commencing wait for mvcc, flushsize=" + flushsize; + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; status.setStatus(s); LOG.debug(s); @@ -1699,7 +1727,7 @@ storeFlushers.clear(); // Set down the memstore size by amount of flush. - this.addAndGetGlobalMemstoreSize(-flushsize); + this.addAndGetGlobalMemstoreSize(-totalFlushableSize); } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. @@ -1739,7 +1767,7 @@ long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; long memstoresize = this.memstoreSize.get(); String msg = "Finished memstore flush of ~" + - StringUtils.humanReadableInt(flushsize) + "/" + flushsize + + StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize + ", currentsize=" + StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize + " for region " + this + " in " + time + "ms, sequenceid=" + sequenceId + @@ -1747,7 +1775,7 @@ ((wal == null)? "; wal=null": ""); LOG.info(msg); status.setStatus(msg); - this.recentFlushes.add(new Pair(time/1000, flushsize)); + this.recentFlushes.add(new Pair(time/1000, totalFlushableSize)); return compactionRequested; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1577421) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -88,6 +88,7 @@ // Used to track own heapSize final AtomicLong size; + private volatile long snapshotSize; // Used to track when to flush volatile long timeOfOldestEdit = Long.MAX_VALUE; @@ -122,6 +123,7 @@ timeRangeTracker = new TimeRangeTracker(); snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); + this.snapshotSize = 0; if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { this.allocator = new MemStoreLAB(conf); } else { @@ -151,6 +153,7 @@ "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { if (!this.kvset.isEmpty()) { + this.snapshotSize = keySize(); this.snapshot = this.kvset; this.kvset = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = this.timeRangeTracker; @@ -179,6 +182,18 @@ } /** + * On flush, how much memory we will clear. + * Flush will first clear out the data in snapshot if any (It will take a second flush + * invocation to clear the current Cell set). If snapshot is empty, current + * Cell set will be flushed. + * + * @return size of data that is going to be flushed + */ + long getFlushableSize() { + return this.snapshotSize > 0 ? this.snapshotSize : keySize(); + } + + /** * The passed snapshot was successfully persisted; it can be let go. * @param ss The snapshot to clean out. * @throws UnexpectedException @@ -196,6 +211,7 @@ this.snapshot = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = new TimeRangeTracker(); } + this.snapshotSize = 0; } /** @@ -877,7 +893,7 @@ } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); + ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.ATOMIC_LONG + Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1577421) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -363,6 +363,11 @@ return getStoreHomedir(tabledir, encodedName, Bytes.toString(family)); } + public long getFlushableSize() { + return this.memstore.getFlushableSize(); + } + + /** * @param tabledir * @param encodedName Encoded region name. @@ -796,7 +801,7 @@ return pathName; } catch (Exception e) { LOG.warn("Failed validating store file " + pathName - + ", retring num=" + i, e); + + ", retrying num=" + i, e); if (e instanceof IOException) { lastException = (IOException) e; } else { @@ -804,7 +809,7 @@ } } } catch (IOException e) { - LOG.warn("Failed flushing store file, retring num=" + i, e); + LOG.warn("Failed flushing store file, retrying num=" + i, e); lastException = e; } if (lastException != null && i < (flush_retries_number - 1)) {