Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1586383) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -750,14 +750,14 @@ try { final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { - protected boolean internalFlushcache( + protected FlushState internalFlushcache( final HLog wal, final long myseqid, MonitoredTask status) throws IOException { LOG.info("InternalFlushCache Invoked"); - boolean b = super.internalFlushcache(wal, myseqid, + FlushState fs = super.internalFlushcache(wal, myseqid, Mockito.mock(MonitoredTask.class)); flushcount.incrementAndGet(); - return b; + return fs; }; }; long seqid = region.initialize(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1586383) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -3343,7 +3343,7 @@ @Override public void doAnAction() throws Exception { - if (region.flushcache()) { + if (region.flushcache().needsCompacting) { ++flushesSinceCompact; } // Compact regularly to avoid creating too many files and exceeding Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1586383) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -458,7 +458,7 @@ } lock.readLock().lock(); try { - boolean shouldCompact = region.flushcache(); + boolean shouldCompact = region.flushcache().needsCompacting; // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1586383) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -379,6 +379,30 @@ ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN); } + public static class FlushState { + final boolean flushSucceeded; + final String failureReason; + final long flushSequenceId; + final boolean needsCompacting; + static final String NOTHING_TO_FLUSH = "Nothing to flush"; + + FlushState(long flushSequenceId, boolean needsCompacting) { + this(true, null, flushSequenceId, needsCompacting); + } + + FlushState(String failureReason) { + this(false, failureReason, -1, false); + } + + FlushState(boolean flushSucceeded, String failureReason, long flushSequenceId, + boolean needsCompacting) { + this.flushSucceeded = flushSucceeded; + this.failureReason = failureReason; + this.flushSequenceId = flushSequenceId; + this.needsCompacting = needsCompacting; + } + } + final WriteState writestate = new WriteState(); long memstoreFlushSize; @@ -1409,11 +1433,12 @@ * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - public boolean flushcache() throws IOException { + public FlushState flushcache() throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { - LOG.debug("Skipping flush on " + this + " because closing"); - return false; + String msg = "Skipping flush on " + this + " because closing"; + LOG.debug(msg); + return new FlushState(msg); } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); status.setStatus("Acquiring readlock on region"); @@ -1421,9 +1446,10 @@ lock.readLock().lock(); try { if (this.closed.get()) { - LOG.debug("Skipping flush on " + this + " because closed"); - status.abort("Skipped: closed"); - return false; + String msg = "Skipping flush on " + this + " because closed"; + LOG.debug(msg); + status.abort(msg); + return new FlushState(msg); } if (coprocessorHost != null) { status.setStatus("Running coprocessor pre-flush hooks"); @@ -1442,14 +1468,15 @@ + ", flushing=" + writestate.flushing + ", writesEnabled=" + writestate.writesEnabled); } - status.abort("Not flushing since " + String msg = "Not flushing since " + (writestate.flushing ? "already flushing" - : "writes not enabled")); - return false; + : "writes not enabled"); + status.abort(msg); + return new FlushState(msg); } } try { - boolean result = internalFlushcache(status); + FlushState fs = internalFlushcache(status); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); @@ -1457,7 +1484,7 @@ } status.markComplete("Flush successful"); - return result; + return fs; } finally { synchronized (writestate) { writestate.flushing = false; @@ -1523,13 +1550,13 @@ *

This method may block for some time. * @param status * - * @return true if the region needs compacting + * @return object describing the flush's state * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - protected boolean internalFlushcache(MonitoredTask status) + protected FlushState internalFlushcache(MonitoredTask status) throws IOException { return internalFlushcache(this.log, -1, status); } @@ -1543,7 +1570,7 @@ * @throws IOException * @see #internalFlushcache(MonitoredTask) */ - protected boolean internalFlushcache( + protected FlushState internalFlushcache( final HLog wal, final long myseqid, MonitoredTask status) throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { @@ -1554,7 +1581,7 @@ // Clear flush flag. // If nothing to flush, return and avoid logging start/stop flush. if (this.memstoreSize.get() <= 0) { - return false; + return new FlushState(FlushState.NOTHING_TO_FLUSH); } if (LOG.isDebugEnabled()) { LOG.debug("Started memstore flush for " + this + @@ -1590,9 +1617,10 @@ if (wal != null) { Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); if (startSeqId == null) { - status.setStatus("Flush will not be started for [" + this.getRegionInfo().getEncodedName() - + "] - WAL is going away"); - return false; + String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + + "] - WAL is going away"; + status.setStatus(msg); + return new FlushState(msg); } flushSeqId = startSeqId.longValue(); } else { @@ -1709,7 +1737,7 @@ status.setStatus(msg); this.recentFlushes.add(new Pair(time/1000, totalFlushableSize)); - return compactionRequested; + return new FlushState(flushSeqId, compactionRequested); } ////////////////////////////////////////////////////////////////////////////// @@ -3381,6 +3409,22 @@ return false; } + long seqId = -1; + // We need to assign a sequential ID that's in between two memstores in order to preserve + // the guarantee that all the edits lower than the highest sequential ID from all the + // HFiles are flushed on disk. See HBASE-10958. + if (assignSeqId) { + FlushState fs = this.flushcache(); + if (fs.flushSucceeded ) { + seqId = fs.flushSequenceId; + } else if (fs.failureReason.equals(FlushState.NOTHING_TO_FLUSH)) { + seqId = this.log.obtainSeqNum(); + } else { + throw new IOException("Could not bulk load with an assigned sequential ID because the " + + "flush didn't run. Reason for not flushing: " + fs.failureReason); + } + } + for (Pair p : familyPaths) { byte[] familyName = p.getFirst(); String path = p.getSecond(); @@ -3390,7 +3434,7 @@ if(bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); } - store.bulkLoadHFile(finalPath, assignSeqId ? this.log.obtainSeqNum() : -1); + store.bulkLoadHFile(finalPath, seqId); if(bulkLoadListener != null) { bulkLoadListener.doneBulkLoad(familyName, path); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1586383) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -3704,7 +3704,7 @@ } FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); if (shouldFlush) { - boolean result = region.flushcache(); + boolean result = region.flushcache().needsCompacting; if (result) { this.compactSplitThread.requestSystemCompaction(region, "Compaction through user triggered flush");