diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6cf2ce3..1b64ac4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6566,16 +6566,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @throws IOException If anything goes wrong with DFS */ private void syncOrDefer(long txid, Durability durability) throws IOException { + if (durability == Durability.USE_DEFAULT) { + durability = this.durability; + } if (this.getRegionInfo().isMetaRegion()) { - this.wal.sync(txid); + this.wal.fsync(txid); } else { switch(durability) { - case USE_DEFAULT: - // do what table defaults to - if (shouldSyncWAL()) { - this.wal.sync(txid); - } - break; case SKIP_WAL: // nothing do to break; @@ -6583,22 +6580,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // nothing do to break; case SYNC_WAL: - case FSYNC_WAL: - // sync the WAL edit (SYNC and FSYNC treated the same for now) + // sync the WAL edit this.wal.sync(txid); break; + case FSYNC_WAL: + // fsync the WAL edit to disk + this.wal.fsync(txid); + break; + default: + // igore } } } /** - * Check whether we should sync the wal from the table's durability settings - */ - private boolean shouldSyncWAL() { - return durability.ordinal() > Durability.ASYNC_WAL.ordinal(); - } - - /** * A mocked list implementaion - discards all updates. */ private static final List MOCKED_LIST = new AbstractList() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index d2ba69d..6ac86bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -199,6 +199,7 @@ public class FSHLog implements WAL { * come in for it. Maintained by the syncing threads. */ private final AtomicLong highestSyncedSequence = new AtomicLong(0); + private final AtomicLong highestFSyncedSequence = new AtomicLong(0); /** * file system instance @@ -1265,15 +1266,18 @@ public class FSHLog implements WAL { * @param t May be non-null if we are processing SyncFutures because an exception was thrown. * @return Count of SyncFutures we let go. */ - private int releaseSyncFutures(final long currentSequence, final Throwable t) { + private int releaseSyncFutures(final long currentSequence, final long currentFSequence, final Throwable t) { int syncCount = 0; + long maxSequence = Math.max(currentFSequence, currentSequence); for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) { - if (syncFuture.getRingBufferSequence() > currentSequence) break; - releaseSyncFuture(syncFuture, currentSequence, t); - if (!this.syncFutures.remove(syncFuture)) { - throw new IllegalStateException(syncFuture.toString()); + if (syncFuture.getRingBufferSequence() > maxSequence) break; + if (!syncFuture.isFSync() || syncFuture.getRingBufferSequence() > currentFSequence) { + releaseSyncFuture(syncFuture, currentSequence, t); + if (!this.syncFutures.remove(syncFuture)) { + throw new IllegalStateException(syncFuture.toString()); + } + syncCount++; } - syncCount++; } return syncCount; } @@ -1283,17 +1287,25 @@ public class FSHLog implements WAL { * @return Current highest synced sequence. */ private long updateHighestSyncedSequence(long sequence) { + return updateHighestSequence(sequence, highestSyncedSequence); + } + + private long updateHighestFSyncedSequence(long sequence) { + return updateHighestSequence(sequence, highestFSyncedSequence); + } + + private long updateHighestSequence(long sequence, AtomicLong highestSequence) { long currentHighestSyncedSequence; // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. do { - currentHighestSyncedSequence = highestSyncedSequence.get(); + currentHighestSyncedSequence = highestSequence.get(); if (currentHighestSyncedSequence >= sequence) { // Set the sync number to current highwater mark; might be able to let go more // queued sync futures sequence = currentHighestSyncedSequence; break; } - } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence)); + } while (!highestSequence.compareAndSet(currentHighestSyncedSequence, sequence)); return sequence; } @@ -1313,7 +1325,11 @@ public class FSHLog implements WAL { ", syncFutureSequence=" + syncFutureSequence); } // See if we can process any syncfutures BEFORE we go sync. - long currentHighestSyncedSequence = highestSyncedSequence.get(); + // if this is an fsync we can only release it if there is newer fsync + // otherwise any sync will do + long currentHighestSyncedSequence = + takeSyncFuture.isFSync() ? highestFSyncedSequence.get() : + Math.max(highestSyncedSequence.get(), highestFSyncedSequence.get()); if (currentSequence < currentHighestSyncedSequence) { syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); // Done with the 'take'. Go around again and do a new 'take'. @@ -1326,9 +1342,15 @@ public class FSHLog implements WAL { TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); long start = System.nanoTime(); Throwable t = null; + long currentFSequence = -1; try { Trace.addTimelineAnnotation("syncing writer"); - writer.sync(); + if (takeSyncFuture.isFSync()) { + writer.fsync(); + currentFSequence = updateHighestFSyncedSequence(currentSequence); + } else { + writer.sync(); + } Trace.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { @@ -1341,9 +1363,11 @@ public class FSHLog implements WAL { // reattach the span to the future before releasing. takeSyncFuture.setSpan(scope.detach()); // First release what we 'took' from the queue. - syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t); + syncCount += + releaseSyncFuture(takeSyncFuture, takeSyncFuture.isFSync() ? currentFSequence + : Math.max(currentFSequence, currentSequence), t); // Can we release other syncs? - syncCount += releaseSyncFutures(currentSequence, t); + syncCount += releaseSyncFutures(currentSequence, currentFSequence, t); if (t != null) { requestLogRoll(); } else checkLogRoll(); @@ -1430,12 +1454,12 @@ public class FSHLog implements WAL { } private SyncFuture publishSyncOnRingBuffer() { - return publishSyncOnRingBuffer(null); + return publishSyncOnRingBuffer(null, false); } - private SyncFuture publishSyncOnRingBuffer(Span span) { + private SyncFuture publishSyncOnRingBuffer(Span span, boolean fsync) { long sequence = this.disruptor.getRingBuffer().next(); - SyncFuture syncFuture = getSyncFuture(sequence, span); + SyncFuture syncFuture = getSyncFuture(sequence, span, fsync); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); truck.loadPayload(syncFuture); @@ -1446,8 +1470,8 @@ public class FSHLog implements WAL { } // Sync all known transactions - private Span publishSyncThenBlockOnCompletion(Span span) throws IOException { - return blockOnSync(publishSyncOnRingBuffer(span)); + private Span publishSyncThenBlockOnCompletion(Span span, boolean fsync) throws IOException { + return blockOnSync(publishSyncOnRingBuffer(span, fsync)); } private Span blockOnSync(final SyncFuture syncFuture) throws IOException { @@ -1470,13 +1494,13 @@ public class FSHLog implements WAL { return ioe; } - private SyncFuture getSyncFuture(final long sequence, Span span) { + private SyncFuture getSyncFuture(final long sequence, Span span, boolean fsync) { SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); if (syncFuture == null) { syncFuture = new SyncFuture(); this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture); } - return syncFuture.reset(sequence, span); + return syncFuture.reset(sequence, span, fsync); } private void postSync(final long timeInNanos, final int handlerSyncs) { @@ -1566,11 +1590,27 @@ public class FSHLog implements WAL { public void sync() throws IOException { TraceScope scope = Trace.startSpan("FSHLog.sync"); try { - scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), false)); + } finally { + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); + } + } + + @Override + public void fsync(long txid) throws IOException { + if (this.highestFSyncedSequence.get() >= txid){ + // Already sync'd. + return; + } + TraceScope scope = Trace.startSpan("FSHLog.fsync"); + try { + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), true)); } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); scope.close(); } + sync(txid); } @Override @@ -1581,7 +1621,7 @@ public class FSHLog implements WAL { } TraceScope scope = Trace.startSpan("FSHLog.sync"); try { - scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), false)); } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); scope.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index ca80e4c..038bc68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -178,6 +178,20 @@ public class ProtobufLogWriter extends WriterBase { } @Override + public void fsync() throws IOException { + try { + // This looks to be a noop but its what we have always done. Leaving for now. + this.output.flush(); + // TODO: Add in option to call hsync. See HBASE-5954 Allow proper fsync support for HBase + // + this.output.hsync();; + } catch (NullPointerException npe) { + // Concurrent close... + throw new IOException(npe); + } + } + + @Override public long getLength() throws IOException { try { return this.output.getPos(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index bcd23da..86c3bab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -65,6 +65,7 @@ class SyncFuture { * immediately call {@link #reset(long, Span)} below and it will work. */ private long doneSequence = -1; + private boolean fsync; /** * If error, the associated throwable. Set when the future is 'done'. @@ -86,7 +87,7 @@ class SyncFuture { * @return this */ synchronized SyncFuture reset(final long sequence) { - return reset(sequence, null); + return reset(sequence, null, false); } /** @@ -98,13 +99,14 @@ class SyncFuture { * resuming after a call to {@link #get()}. * @return this */ - synchronized SyncFuture reset(final long sequence, Span span) { + synchronized SyncFuture reset(final long sequence, Span span, boolean fsync) { if (t != null && t != Thread.currentThread()) throw new IllegalStateException(); t = Thread.currentThread(); if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread()); this.doneSequence = NOT_DONE; this.ringBufferSequence = sequence; this.span = span; + this.fsync = fsync; return this; } @@ -127,6 +129,10 @@ class SyncFuture { return this.span; } + synchronized boolean isFSync() { + return this.fsync; + } + /** * Used to re-attach a {@code span} to the Future. Called by the EventHandler * after a it has completed processing and detached the span from its scope. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index f571166..6abc658 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -183,6 +183,11 @@ class DisabledWALProvider implements WALProvider { } @Override + public void fsync(long txid) { + sync(txid); + } + + @Override public boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { return !(closed.get()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 5a2b08d..31f1723 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -140,6 +140,13 @@ public interface WAL { void sync(long txid) throws IOException; /** + * Sync the WAL and force it to disk, if the txId was not already fsync'd. + * @param txid Transaction id to sync to. + * @throws IOException + */ + void fsync(long txid) throws IOException; + + /** * WAL keeps track of the sequence numbers that were not yet flushed from memstores * in order to be able to do cleanup. This method tells WAL that some region is about * to flush memstore. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 178c322..7561c81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -75,6 +75,7 @@ public interface WALProvider { // interface provided by WAL. interface Writer extends Closeable { void sync() throws IOException; + void fsync() throws IOException; void append(WAL.Entry entry) throws IOException; long getLength() throws IOException; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 7524d5c..a8a91b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -590,6 +590,7 @@ public class PerformanceEvaluation extends Configured implements Tool { String tableName = TABLE_NAME; boolean flushCommits = true; boolean writeToWAL = true; + boolean fsyncWAL = false; boolean autoFlush = false; boolean oneCon = false; boolean useTags = false; @@ -1331,7 +1332,7 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); updateValueSize(value.length); } - put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + put.setDurability(opts.writeToWAL ? opts.fsyncWAL ? Durability.FSYNC_WAL : Durability.SYNC_WAL : Durability.SKIP_WAL); table.put(put); } } @@ -1410,7 +1411,7 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); updateValueSize(value.length); } - put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + put.setDurability(opts.writeToWAL ? opts.fsyncWAL ? Durability.FSYNC_WAL : Durability.SYNC_WAL : Durability.SKIP_WAL); table.put(put); } } @@ -1609,6 +1610,7 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" flushCommits Used to determine if the test should flush the table. " + "Default: false"); System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); + System.err.println(" fsyncWAL Force WAL edit to disk. Default: False"); System.err.println(" autoFlush Set autoFlush on htable. Default: False"); System.err.println(" oneCon all the threads share the same connection. Default: False"); System.err.println(" presplit Create presplit table. Recommended for accurate perf " + @@ -1744,6 +1746,12 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } + final String fsyncWAL = "--fsyncWAL="; + if (cmd.startsWith(fsyncWAL)) { + opts.fsyncWAL = Boolean.parseBoolean(cmd.substring(fsyncWAL.length())); + continue; + } + final String presplit = "--presplit="; if (cmd.startsWith(presplit)) { opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 7c13c00..65eeb53 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -195,7 +195,17 @@ public class SequenceFileLogWriter extends WriterBase { @Override public void sync() throws IOException { try { - this.writer.syncFs(); + this.writer.hflush();; + } catch (NullPointerException npe) { + // Concurrent close... + throw new IOException(npe); + } + } + + @Override + public void fsync() throws IOException { + try { + this.writer.hsync();; } catch (NullPointerException npe) { // Concurrent close... throw new IOException(npe);