Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1586800) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -736,14 +736,14 @@ try { final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { - protected boolean internalFlushcache( + protected FlushResult internalFlushcache( final HLog wal, final long myseqid, MonitoredTask status) throws IOException { LOG.info("InternalFlushCache Invoked"); - boolean b = super.internalFlushcache(wal, myseqid, + FlushResult fs = super.internalFlushcache(wal, myseqid, Mockito.mock(MonitoredTask.class)); flushcount.incrementAndGet(); - return b; + return fs; }; }; long seqid = region.initialize(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1586800) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -3382,7 +3382,7 @@ @Override public void doAnAction() throws Exception { - if (region.flushcache()) { + if (region.flushcache().compactionNeeded()) { ++flushesSinceCompact; } // Compact regularly to avoid creating too many files and exceeding Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1586800) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -431,8 +431,8 @@ /** * @return The maximum sequence id in all store files. Used for log replay. */ - long getMaxSequenceId(boolean includeBulkFiles) { - return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles); + long getMaxSequenceId() { + return StoreFile.getMaxSequenceIdInList(this.getStorefiles()); } @Override Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1586800) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -475,7 +475,7 @@ lock.readLock().lock(); try { notifyFlushRequest(region, emergencyFlush); - boolean shouldCompact = region.flushcache(); + boolean shouldCompact = region.flushcache().compactionNeeded(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1586800) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -388,6 +388,81 @@ ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN); } + /** + * Objects from this class are created when flushing to describe all the different states that + * that method ends up in. The Result enum describes those states. The sequence id should only + * be specified if the flush was successful, and the failure message should only be speficied + * if it didn't flush. + */ + public static class FlushResult { + enum Result { + FLUSHED_NO_COMPACTION_NEEDED, + FLUSHED_COMPACTION_NEEDED, + // Special case where a flush didn't run because there's nothing in the memstores. Used when + // bulk loading to know when we can still load even if a flush didn't happen. + CANNOT_FLUSH_MEMSTORE_EMPTY, + CANNOT_FLUSH + // Be careful adding more to this enum, look at the below methods to make sure + } + + final Result result; + final String failureReason; + final long flushSequenceId; + + /** + * Convenience constructor to use when the flush is successful, the failure message is set to + * null. + * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED. + * @param flushSequenceId Generated sequence id that comes right after the edits in the + * memstores. + */ + FlushResult(Result result, long flushSequenceId) { + this(result, flushSequenceId, null); + assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result + .FLUSHED_COMPACTION_NEEDED; + } + + /** + * Convenience constructor to use when we cannot flush. + * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH. + * @param failureReason Reason why we couldn't flush. + */ + FlushResult(Result result, String failureReason) { + this(result, -1, failureReason); + assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH; + } + + /** + * Constructor with all the parameters. + * @param result Any of the Result. + * @param flushSequenceId Generated sequence id if the memstores were flushed else -1. + * @param failureReason Reason why we couldn't flush, or null. + */ + FlushResult(Result result, long flushSequenceId, String failureReason) { + this.result = result; + this.flushSequenceId = flushSequenceId; + this.failureReason = failureReason; + } + + /** + * Convenience method, the equivalent of checking if result is + * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED. + * @return true if the memstores were flushed, else false. + */ + public boolean flushSucceeded() { + return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result + .FLUSHED_COMPACTION_NEEDED; + } + + /** + * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED. + * @return True if the flush requested a compaction, else false (doesn't even mean it flushed). + */ + public boolean compactionNeeded() { + return result == Result.FLUSHED_COMPACTION_NEEDED; + } + } + final WriteState writestate = new WriteState(); long memstoreFlushSize; @@ -707,16 +782,14 @@ for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) { Future future = completionService.take(); HStore store = future.get(); + this.stores.put(store.getColumnFamilyName().getBytes(), store); - this.stores.put(store.getColumnFamilyName().getBytes(), store); - // Do not include bulk loaded files when determining seqIdForReplay - long storeSeqIdForReplay = store.getMaxSequenceId(false); + long storeMaxSequenceId = store.getMaxSequenceId(); maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), - storeSeqIdForReplay); - // Include bulk loaded files when determining seqIdForAssignment - long storeSeqIdForAssignment = store.getMaxSequenceId(true); - if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) { - maxSeqId = storeSeqIdForAssignment; + storeMaxSequenceId); + + if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) { + maxSeqId = storeMaxSequenceId; } long maxStoreMemstoreTS = store.getMaxMemstoreTS(); if (maxStoreMemstoreTS > maxMemstoreTS) { @@ -1462,11 +1535,12 @@ * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - public boolean flushcache() throws IOException { + public FlushResult flushcache() throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { - LOG.debug("Skipping flush on " + this + " because closing"); - return false; + String msg = "Skipping flush on " + this + " because closing"; + LOG.debug(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); status.setStatus("Acquiring readlock on region"); @@ -1474,9 +1548,10 @@ lock.readLock().lock(); try { if (this.closed.get()) { - LOG.debug("Skipping flush on " + this + " because closed"); - status.abort("Skipped: closed"); - return false; + String msg = "Skipping flush on " + this + " because closed"; + LOG.debug(msg); + status.abort(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } if (coprocessorHost != null) { status.setStatus("Running coprocessor pre-flush hooks"); @@ -1495,14 +1570,15 @@ + ", flushing=" + writestate.flushing + ", writesEnabled=" + writestate.writesEnabled); } - status.abort("Not flushing since " + String msg = "Not flushing since " + (writestate.flushing ? "already flushing" - : "writes not enabled")); - return false; + : "writes not enabled"); + status.abort(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } } try { - boolean result = internalFlushcache(status); + FlushResult fs = internalFlushcache(status); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); @@ -1510,7 +1586,7 @@ } status.markComplete("Flush successful"); - return result; + return fs; } finally { synchronized (writestate) { writestate.flushing = false; @@ -1579,13 +1655,13 @@ *

This method may block for some time. * @param status * - * @return true if the region needs compacting + * @return object describing the flush's state * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - protected boolean internalFlushcache(MonitoredTask status) + protected FlushResult internalFlushcache(MonitoredTask status) throws IOException { return internalFlushcache(this.log, -1, status); } @@ -1599,7 +1675,7 @@ * @throws IOException * @see #internalFlushcache(MonitoredTask) */ - protected boolean internalFlushcache( + protected FlushResult internalFlushcache( final HLog wal, final long myseqid, MonitoredTask status) throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { @@ -1610,7 +1686,7 @@ // Clear flush flag. // If nothing to flush, return and avoid logging start/stop flush. if (this.memstoreSize.get() <= 0) { - return false; + return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"); } if (LOG.isDebugEnabled()) { LOG.debug("Started memstore flush for " + this + @@ -1645,9 +1721,10 @@ // check if it is not closing. if (wal != null) { if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { - status.setStatus("Flush will not be started for [" - + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."); - return false; + String msg = "Flush will not be started for [" + + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; + status.setStatus(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } flushSeqId = this.sequenceId.incrementAndGet(); } else { @@ -1763,7 +1840,8 @@ status.setStatus(msg); this.recentFlushes.add(new Pair(time/1000, totalFlushableSize)); - return compactionRequested; + return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED : + FlushResult.Result.FLUSHED_COMPACTION_NEEDED, flushSeqId); } ////////////////////////////////////////////////////////////////////////////// @@ -3566,6 +3644,22 @@ return false; } + long seqId = -1; + // We need to assign a sequential ID that's in between two memstores in order to preserve + // the guarantee that all the edits lower than the highest sequential ID from all the + // HFiles are flushed on disk. See HBASE-10958. + if (assignSeqId) { + FlushResult fs = this.flushcache(); + if (fs.flushSucceeded()) { + seqId = fs.flushSequenceId; + } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { + seqId = this.sequenceId.incrementAndGet(); + } else { + throw new IOException("Could not bulk load with an assigned sequential ID because the " + + "flush didn't run. Reason for not flushing: " + fs.failureReason); + } + } + for (Pair p : familyPaths) { byte[] familyName = p.getFirst(); String path = p.getSecond(); @@ -3575,7 +3669,7 @@ if(bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); } - store.bulkLoadHFile(finalPath, assignSeqId ? this.sequenceId.incrementAndGet() : -1); + store.bulkLoadHFile(finalPath, seqId); if(bulkLoadListener != null) { bulkLoadListener.doneBulkLoad(familyName, path); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1586800) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -293,21 +293,15 @@ /** * Return the highest sequence ID found across all storefiles in - * the given list. Store files that were created by a mapreduce - * bulk load are ignored, as they do not correspond to any edit - * log items. + * the given list. * @param sfs - * @param includeBulkLoadedFiles * @return 0 if no non-bulk-load files are provided or, this is Store that * does not yet have any store files. */ - public static long getMaxSequenceIdInList(Collection sfs, - boolean includeBulkLoadedFiles) { + public static long getMaxSequenceIdInList(Collection sfs) { long max = 0; for (StoreFile sf : sfs) { - if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) { - max = Math.max(max, sf.getMaxSequenceId()); - } + max = Math.max(max, sf.getMaxSequenceId()); } return max; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java (revision 1586800) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java (working copy) @@ -978,7 +978,7 @@ } FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); if (shouldFlush) { - boolean result = region.flushcache(); + boolean result = region.flushcache().compactionNeeded(); if (result) { regionServer.compactSplitThread.requestSystemCompaction(region, "Compaction through user triggered flush");