diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c545950..4dd4ab7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6245,16 +6245,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @throws IOException If anything goes wrong with DFS */ private void syncOrDefer(long txid, Durability durability) throws IOException { + if (durability == Durability.USE_DEFAULT) { + durability = this.durability; + } if (this.getRegionInfo().isMetaRegion()) { - this.wal.sync(txid); + this.wal.fsync(txid); } else { switch(durability) { - case USE_DEFAULT: - // do what table defaults to - if (shouldSyncWAL()) { - this.wal.sync(txid); - } - break; case SKIP_WAL: // nothing do to break; @@ -6262,22 +6259,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // nothing do to break; case SYNC_WAL: - case FSYNC_WAL: - // sync the WAL edit (SYNC and FSYNC treated the same for now) + // sync the WAL edit this.wal.sync(txid); break; + case FSYNC_WAL: + // fsync the WAL edit to disk + this.wal.fsync(txid); + break; + default: + // igore } } } /** - * Check whether we should sync the wal from the table's durability settings - */ - private boolean shouldSyncWAL() { - return durability.ordinal() > Durability.ASYNC_WAL.ordinal(); - } - - /** * A mocked list implementaion - discards all updates. */ private static final List MOCKED_LIST = new AbstractList() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 02cf41e..c12bace 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -63,12 +63,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; @@ -78,7 +76,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; @@ -198,6 +195,7 @@ public class FSHLog implements WAL { * come in for it. Maintained by the syncing threads. */ private final AtomicLong highestSyncedSequence = new AtomicLong(0); + private final AtomicLong highestFSyncedSequence = new AtomicLong(0); /** * file system instance @@ -1238,15 +1236,18 @@ public class FSHLog implements WAL { * @param t May be non-null if we are processing SyncFutures because an exception was thrown. * @return Count of SyncFutures we let go. */ - private int releaseSyncFutures(final long currentSequence, final Throwable t) { + private int releaseSyncFutures(final long currentSequence, final long currentFSequence, final Throwable t) { int syncCount = 0; + long maxSequence = Math.max(currentFSequence, currentSequence); for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) { - if (syncFuture.getRingBufferSequence() > currentSequence) break; - releaseSyncFuture(syncFuture, currentSequence, t); - if (!this.syncFutures.remove(syncFuture)) { - throw new IllegalStateException(syncFuture.toString()); + if (syncFuture.getRingBufferSequence() > maxSequence) break; + if (!syncFuture.isFSync() || syncFuture.getRingBufferSequence() > currentFSequence) { + releaseSyncFuture(syncFuture, currentSequence, t); + if (!this.syncFutures.remove(syncFuture)) { + throw new IllegalStateException(syncFuture.toString()); + } + syncCount++; } - syncCount++; } return syncCount; } @@ -1256,17 +1257,25 @@ public class FSHLog implements WAL { * @return Current highest synced sequence. */ private long updateHighestSyncedSequence(long sequence) { + return updateHighestSequence(sequence, highestSyncedSequence); + } + + private long updateHighestFSyncedSequence(long sequence) { + return updateHighestSequence(sequence, highestFSyncedSequence); + } + + private long updateHighestSequence(long sequence, AtomicLong highestSequence) { long currentHighestSyncedSequence; // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. do { - currentHighestSyncedSequence = highestSyncedSequence.get(); + currentHighestSyncedSequence = highestSequence.get(); if (currentHighestSyncedSequence >= sequence) { // Set the sync number to current highwater mark; might be able to let go more // queued sync futures sequence = currentHighestSyncedSequence; break; } - } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence)); + } while (!highestSequence.compareAndSet(currentHighestSyncedSequence, sequence)); return sequence; } @@ -1286,7 +1295,11 @@ public class FSHLog implements WAL { ", syncFutureSequence=" + syncFutureSequence); } // See if we can process any syncfutures BEFORE we go sync. - long currentHighestSyncedSequence = highestSyncedSequence.get(); + // if this is an fsync we can only release it if there is newer fsync + // otherwise any sync will do + long currentHighestSyncedSequence = + takeSyncFuture.isFSync() ? highestFSyncedSequence.get() : + Math.max(highestSyncedSequence.get(), highestFSyncedSequence.get()); if (currentSequence < currentHighestSyncedSequence) { syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); // Done with the 'take'. Go around again and do a new 'take'. @@ -1299,9 +1312,15 @@ public class FSHLog implements WAL { TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); long start = System.nanoTime(); Throwable t = null; + long currentFSequence = -1; try { Trace.addTimelineAnnotation("syncing writer"); - writer.sync(); + if (takeSyncFuture.isFSync()) { + writer.fsync(); + currentFSequence = updateHighestFSyncedSequence(currentSequence); + } else { + writer.sync(); + } Trace.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { @@ -1314,9 +1333,11 @@ public class FSHLog implements WAL { // reattach the span to the future before releasing. takeSyncFuture.setSpan(scope.detach()); // First release what we 'took' from the queue. - syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t); + syncCount += + releaseSyncFuture(takeSyncFuture, takeSyncFuture.isFSync() ? currentFSequence + : Math.max(currentFSequence, currentSequence), t); // Can we release other syncs? - syncCount += releaseSyncFutures(currentSequence, t); + syncCount += releaseSyncFutures(currentSequence, currentFSequence, t); if (t != null) { requestLogRoll(); } else checkLogRoll(); @@ -1401,12 +1422,12 @@ public class FSHLog implements WAL { } private SyncFuture publishSyncOnRingBuffer() { - return publishSyncOnRingBuffer(null); + return publishSyncOnRingBuffer(null, false); } - private SyncFuture publishSyncOnRingBuffer(Span span) { + private SyncFuture publishSyncOnRingBuffer(Span span, boolean fsync) { long sequence = this.disruptor.getRingBuffer().next(); - SyncFuture syncFuture = getSyncFuture(sequence, span); + SyncFuture syncFuture = getSyncFuture(sequence, span, fsync); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); truck.loadPayload(syncFuture); @@ -1417,8 +1438,8 @@ public class FSHLog implements WAL { } // Sync all known transactions - private Span publishSyncThenBlockOnCompletion(Span span) throws IOException { - return blockOnSync(publishSyncOnRingBuffer(span)); + private Span publishSyncThenBlockOnCompletion(Span span, boolean fsync) throws IOException { + return blockOnSync(publishSyncOnRingBuffer(span, fsync)); } private Span blockOnSync(final SyncFuture syncFuture) throws IOException { @@ -1441,13 +1462,13 @@ public class FSHLog implements WAL { return ioe; } - private SyncFuture getSyncFuture(final long sequence, Span span) { + private SyncFuture getSyncFuture(final long sequence, Span span, boolean fsync) { SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); if (syncFuture == null) { syncFuture = new SyncFuture(); this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture); } - return syncFuture.reset(sequence, span); + return syncFuture.reset(sequence, span, fsync); } private void postSync(final long timeInNanos, final int handlerSyncs) { @@ -1537,11 +1558,27 @@ public class FSHLog implements WAL { public void sync() throws IOException { TraceScope scope = Trace.startSpan("FSHLog.sync"); try { - scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), false)); + } finally { + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); + } + } + + @Override + public void fsync(long txid) throws IOException { + if (this.highestFSyncedSequence.get() >= txid){ + // Already sync'd. + return; + } + TraceScope scope = Trace.startSpan("FSHLog.fsync"); + try { + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), true)); } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); scope.close(); } + sync(txid); } @Override @@ -1552,7 +1589,7 @@ public class FSHLog implements WAL { } TraceScope scope = Trace.startSpan("FSHLog.sync"); try { - scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), false)); } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); scope.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index ca80e4c..038bc68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -178,6 +178,20 @@ public class ProtobufLogWriter extends WriterBase { } @Override + public void fsync() throws IOException { + try { + // This looks to be a noop but its what we have always done. Leaving for now. + this.output.flush(); + // TODO: Add in option to call hsync. See HBASE-5954 Allow proper fsync support for HBase + // + this.output.hsync();; + } catch (NullPointerException npe) { + // Concurrent close... + throw new IOException(npe); + } + } + + @Override public long getLength() throws IOException { try { return this.output.getPos(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index bcd23da..86c3bab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -65,6 +65,7 @@ class SyncFuture { * immediately call {@link #reset(long, Span)} below and it will work. */ private long doneSequence = -1; + private boolean fsync; /** * If error, the associated throwable. Set when the future is 'done'. @@ -86,7 +87,7 @@ class SyncFuture { * @return this */ synchronized SyncFuture reset(final long sequence) { - return reset(sequence, null); + return reset(sequence, null, false); } /** @@ -98,13 +99,14 @@ class SyncFuture { * resuming after a call to {@link #get()}. * @return this */ - synchronized SyncFuture reset(final long sequence, Span span) { + synchronized SyncFuture reset(final long sequence, Span span, boolean fsync) { if (t != null && t != Thread.currentThread()) throw new IllegalStateException(); t = Thread.currentThread(); if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread()); this.doneSequence = NOT_DONE; this.ringBufferSequence = sequence; this.span = span; + this.fsync = fsync; return this; } @@ -127,6 +129,10 @@ class SyncFuture { return this.span; } + synchronized boolean isFSync() { + return this.fsync; + } + /** * Used to re-attach a {@code span} to the Future. Called by the EventHandler * after a it has completed processing and detached the span from its scope. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index ac23916..b983e10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -183,6 +183,11 @@ class DisabledWALProvider implements WALProvider { } @Override + public void fsync(long txid) { + sync(txid); + } + + @Override public boolean startCacheFlush(final byte[] encodedRegionName) { return !(closed.get()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 787a34b..d62d244 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -136,6 +136,13 @@ public interface WAL { void sync(long txid) throws IOException; /** + * Sync the WAL and force it to disk, if the txId was not already fsync'd. + * @param txid Transaction id to sync to. + * @throws IOException + */ + void fsync(long txid) throws IOException; + + /** * WAL keeps track of the sequence numbers that were not yet flushed from memstores * in order to be able to do cleanup. This method tells WAL that some region is about * to flush memstore. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index b27abf9..69fea7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -76,6 +76,7 @@ public interface WALProvider { // interface provided by WAL. interface Writer extends Closeable { void sync() throws IOException; + void fsync() throws IOException; void append(WAL.Entry entry) throws IOException; long getLength() throws IOException; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 7c13c00..65eeb53 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -195,7 +195,17 @@ public class SequenceFileLogWriter extends WriterBase { @Override public void sync() throws IOException { try { - this.writer.syncFs(); + this.writer.hflush();; + } catch (NullPointerException npe) { + // Concurrent close... + throw new IOException(npe); + } + } + + @Override + public void fsync() throws IOException { + try { + this.writer.hsync();; } catch (NullPointerException npe) { // Concurrent close... throw new IOException(npe);