diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java index f8c746f..e41d1f7 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java @@ -62,6 +62,10 @@ public interface MetricsWALSource extends BaseSource { String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest"; String LOW_REPLICA_ROLL_REQUESTED_DESC = "How many times a log roll was requested due to too few DN's in the write pipeline."; + String SYNC_COUNT = "syncCount"; + String SYNC_COUNT_DESC = "Number edit sync'ed (hflush)"; + String FSYNC_COUNT = "fsyncCount"; + String FSYNC_COUNT_DESC = "Number edit fsync'ed (hsync)"; /** * Add the append size. @@ -92,4 +96,7 @@ public interface MetricsWALSource extends BaseSource { void incrementLowReplicationLogRoll(); + void incrementSyncCount(int count); + + void incrementFsyncCount(int count); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java index 36c3571..9df00ec 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java @@ -40,6 +40,8 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo private final MutableCounterLong slowAppendCount; private final MutableCounterLong logRollRequested; private final MutableCounterLong lowReplicationLogRollRequested; + private final MutableCounterLong syncCount; + private final MutableCounterLong fsyncCount; public MetricsWALSourceImpl() { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); @@ -62,6 +64,10 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo this.getMetricsRegistry().newCounter(ROLL_REQUESTED, ROLL_REQUESTED_DESC, 0L); lowReplicationLogRollRequested = this.getMetricsRegistry() .newCounter(LOW_REPLICA_ROLL_REQUESTED, LOW_REPLICA_ROLL_REQUESTED_DESC, 0L); + syncCount = + this.getMetricsRegistry().newCounter(SYNC_COUNT, SYNC_COUNT_DESC, 0l); + fsyncCount = + this.getMetricsRegistry().newCounter(FSYNC_COUNT, FSYNC_COUNT_DESC, 0l); } @Override @@ -98,4 +104,14 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo public void incrementLowReplicationLogRoll() { lowReplicationLogRollRequested.incr(); } + + @Override + public void incrementSyncCount(int count) { + syncCount.incr(count); + } + + @Override + public void incrementFsyncCount(int count) { + fsyncCount.incr(count); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 26f8943..50c295a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6487,16 +6487,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @throws IOException If anything goes wrong with DFS */ private void syncOrDefer(long txid, Durability durability) throws IOException { + if (durability == Durability.USE_DEFAULT) { + durability = this.durability; + } if (this.getRegionInfo().isMetaRegion()) { - this.wal.sync(txid); + this.wal.fsync(txid); } else { switch(durability) { - case USE_DEFAULT: - // do what table defaults to - if (shouldSyncWAL()) { - this.wal.sync(txid); - } - break; case SKIP_WAL: // nothing do to break; @@ -6504,22 +6501,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // nothing do to break; case SYNC_WAL: - case FSYNC_WAL: - // sync the WAL edit (SYNC and FSYNC treated the same for now) + // sync the WAL edit this.wal.sync(txid); break; + case FSYNC_WAL: + // fsync the WAL edit to disk + this.wal.fsync(txid); + break; + default: + // igore } } } /** - * Check whether we should sync the wal from the table's durability settings - */ - private boolean shouldSyncWAL() { - return durability.ordinal() > Durability.ASYNC_WAL.ordinal(); - } - - /** * A mocked list implementaion - discards all updates. */ private static final List MOCKED_LIST = new AbstractList() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 19b4719..ac0629e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -199,6 +199,7 @@ public class FSHLog implements WAL { * come in for it. Maintained by the syncing threads. */ private final AtomicLong highestSyncedSequence = new AtomicLong(0); + private final AtomicLong highestFSyncedSequence = new AtomicLong(0); /** * file system instance @@ -319,7 +320,9 @@ public class FSHLog implements WAL { /** * The total size of wal */ - private AtomicLong totalLogSize = new AtomicLong(0); + private final AtomicLong totalLogSize = new AtomicLong(0); + private final AtomicLong fsyncCount = new AtomicLong(0); + private final AtomicLong syncCount = new AtomicLong(0); /* * If more than this many logs, force flush of oldest region to oldest edit @@ -677,7 +680,7 @@ public class FSHLog implements WAL { long startTimeNanos = System.nanoTime(); try { nextWriter.sync(); - postSync(System.nanoTime() - startTimeNanos, 0); + postSync(System.nanoTime() - startTimeNanos, 0, 0); } catch (IOException e) { // optimization failed, no need to abort here. LOG.warn("pre-sync failed but an optimization so keep going", e); @@ -1265,6 +1268,7 @@ public class FSHLog implements WAL { /** * Release all SyncFutures whose sequence is <= currentSequence. * @param currentSequence + * @param fsyncOnly only handle * @param t May be non-null if we are processing SyncFutures because an exception was thrown. * @return Count of SyncFutures we let go. */ @@ -1286,17 +1290,25 @@ public class FSHLog implements WAL { * @return Current highest synced sequence. */ private long updateHighestSyncedSequence(long sequence) { + return updateHighestSequence(sequence, highestSyncedSequence); + } + + private long updateHighestFSyncedSequence(long sequence) { + return updateHighestSequence(sequence, highestFSyncedSequence); + } + + private long updateHighestSequence(long sequence, AtomicLong highestSequence) { long currentHighestSyncedSequence; // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. do { - currentHighestSyncedSequence = highestSyncedSequence.get(); + currentHighestSyncedSequence = highestSequence.get(); if (currentHighestSyncedSequence >= sequence) { // Set the sync number to current highwater mark; might be able to let go more // queued sync futures sequence = currentHighestSyncedSequence; break; } - } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence)); + } while (!highestSequence.compareAndSet(currentHighestSyncedSequence, sequence)); return sequence; } @@ -1304,6 +1316,7 @@ public class FSHLog implements WAL { long currentSequence; while (!isInterrupted()) { int syncCount = 0; + int fsyncCount = 0; SyncFuture takeSyncFuture; try { while (true) { @@ -1316,9 +1329,17 @@ public class FSHLog implements WAL { ", syncFutureSequence=" + syncFutureSequence); } // See if we can process any syncfutures BEFORE we go sync. - long currentHighestSyncedSequence = highestSyncedSequence.get(); + // If this is an fsync we can only release it if there is a newer fsync + // otherwise any sync will do + long currentHighestSyncedSequence = + takeSyncFuture.isFSync() ? highestFSyncedSequence.get() : + Math.max(highestSyncedSequence.get(), highestFSyncedSequence.get()); if (currentSequence < currentHighestSyncedSequence) { - syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); + if (takeSyncFuture.isFSync()) { + syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); + } else { + fsyncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); + } // Done with the 'take'. Go around again and do a new 'take'. continue; } @@ -1329,9 +1350,15 @@ public class FSHLog implements WAL { TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); long start = System.nanoTime(); Throwable t = null; + long currentFSequence = -1; try { Trace.addTimelineAnnotation("syncing writer"); - writer.sync(); + if (takeSyncFuture.isFSync()) { + writer.fsync(); + currentFSequence = updateHighestFSyncedSequence(currentSequence); + } else { + writer.sync(); + } Trace.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { @@ -1344,14 +1371,23 @@ public class FSHLog implements WAL { // reattach the span to the future before releasing. takeSyncFuture.setSpan(scope.detach()); // First release what we 'took' from the queue. - syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t); + if (takeSyncFuture.isFSync()) { + fsyncCount += + releaseSyncFuture(takeSyncFuture, currentFSequence, t); + } else { + syncCount += + releaseSyncFuture(takeSyncFuture, Math.max(currentFSequence, currentSequence), t); + } // Can we release other syncs? + // up to the fsync seq first, so that we can count them separately + fsyncCount += releaseSyncFutures(currentFSequence, t); + // then all the others, if any syncCount += releaseSyncFutures(currentSequence, t); if (t != null) { requestLogRoll(); } else checkLogRoll(); } - postSync(System.nanoTime() - start, syncCount); + postSync(System.nanoTime() - start, syncCount, fsyncCount); } catch (InterruptedException e) { // Presume legit interrupt. Thread.currentThread().interrupt(); @@ -1433,12 +1469,12 @@ public class FSHLog implements WAL { } private SyncFuture publishSyncOnRingBuffer() { - return publishSyncOnRingBuffer(null); + return publishSyncOnRingBuffer(null, false); } - private SyncFuture publishSyncOnRingBuffer(Span span) { + private SyncFuture publishSyncOnRingBuffer(Span span, boolean fsync) { long sequence = this.disruptor.getRingBuffer().next(); - SyncFuture syncFuture = getSyncFuture(sequence, span); + SyncFuture syncFuture = getSyncFuture(sequence, span, fsync); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); truck.loadPayload(syncFuture); @@ -1449,8 +1485,8 @@ public class FSHLog implements WAL { } // Sync all known transactions - private Span publishSyncThenBlockOnCompletion(Span span) throws IOException { - return blockOnSync(publishSyncOnRingBuffer(span)); + private Span publishSyncThenBlockOnCompletion(Span span, boolean fsync) throws IOException { + return blockOnSync(publishSyncOnRingBuffer(span, fsync)); } private Span blockOnSync(final SyncFuture syncFuture) throws IOException { @@ -1473,16 +1509,16 @@ public class FSHLog implements WAL { return ioe; } - private SyncFuture getSyncFuture(final long sequence, Span span) { + private SyncFuture getSyncFuture(final long sequence, Span span, boolean fsync) { SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); if (syncFuture == null) { syncFuture = new SyncFuture(); this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture); } - return syncFuture.reset(sequence, span); + return syncFuture.reset(sequence, span, fsync); } - private void postSync(final long timeInNanos, final int handlerSyncs) { + private void postSync(final long timeInNanos, final int handlerSyncs, final int handlerFsyncs) { if (timeInNanos > this.slowSyncNs) { String msg = new StringBuilder().append("Slow sync cost: ") @@ -1493,7 +1529,7 @@ public class FSHLog implements WAL { } if (!listeners.isEmpty()) { for (WALActionsListener listener : listeners) { - listener.postSync(timeInNanos, handlerSyncs); + listener.postSync(timeInNanos, handlerSyncs, handlerFsyncs); } } } @@ -1569,11 +1605,27 @@ public class FSHLog implements WAL { public void sync() throws IOException { TraceScope scope = Trace.startSpan("FSHLog.sync"); try { - scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), false)); + } finally { + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); + } + } + + @Override + public void fsync(long txid) throws IOException { + if (this.highestFSyncedSequence.get() >= txid){ + // Already sync'd. + return; + } + TraceScope scope = Trace.startSpan("FSHLog.fsync"); + try { + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), true)); } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); scope.close(); } + sync(txid); } @Override @@ -1584,7 +1636,7 @@ public class FSHLog implements WAL { } TraceScope scope = Trace.startSpan("FSHLog.sync"); try { - scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); + scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach(), false)); } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); scope.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index 4d465e4..dc7b26c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -46,8 +46,10 @@ public class MetricsWAL extends WALActionsListener.Base { } @Override - public void postSync(final long timeInNanos, final int handlerSyncs) { + public void postSync(final long timeInNanos, final int handlerSyncs, final int handlerFsyncs) { source.incrementSyncTime(timeInNanos/1000000L); + source.incrementSyncCount(handlerSyncs); + source.incrementFsyncCount(handlerFsyncs); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index ca80e4c..038bc68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -178,6 +178,20 @@ public class ProtobufLogWriter extends WriterBase { } @Override + public void fsync() throws IOException { + try { + // This looks to be a noop but its what we have always done. Leaving for now. + this.output.flush(); + // TODO: Add in option to call hsync. See HBASE-5954 Allow proper fsync support for HBase + // + this.output.hsync();; + } catch (NullPointerException npe) { + // Concurrent close... + throw new IOException(npe); + } + } + + @Override public long getLength() throws IOException { try { return this.output.getPos(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 62ab458..f4b7bdc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -65,6 +65,7 @@ class SyncFuture { * immediately call {@link #reset(long, Span)} below and it will work. */ private long doneSequence = -1; + private boolean fsync; /** * If error, the associated throwable. Set when the future is 'done'. @@ -86,7 +87,7 @@ class SyncFuture { * @return this */ synchronized SyncFuture reset(final long sequence) { - return reset(sequence, null); + return reset(sequence, null, false); } /** @@ -98,13 +99,14 @@ class SyncFuture { * resuming after a call to {@link #get()}. * @return this */ - synchronized SyncFuture reset(final long sequence, Span span) { + synchronized SyncFuture reset(final long sequence, Span span, boolean fsync) { if (t != null && t != Thread.currentThread()) throw new IllegalStateException(); t = Thread.currentThread(); if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread()); this.doneSequence = NOT_DONE; this.ringBufferSequence = sequence; this.span = span; + this.fsync = fsync; return this; } @@ -127,6 +129,10 @@ class SyncFuture { return this.span; } + synchronized boolean isFSync() { + return this.fsync; + } + /** * Used to re-attach a {@code span} to the Future. Called by the EventHandler * after a it has completed processing and detached the span from its scope. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index 457d859..deb115b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -111,7 +111,7 @@ public interface WALActionsListener { * @param handlerSyncs How many sync handler calls were released by this call to filesystem * sync. */ - void postSync(final long timeInNanos, final int handlerSyncs); + void postSync(final long timeInNanos, final int handlerSyncs, final int handlerFsyncs); static class Base implements WALActionsListener { @Override @@ -142,6 +142,6 @@ public interface WALActionsListener { public void postAppend(final long entryLen, final long elapsedTimeMillis) {} @Override - public void postSync(final long timeInNanos, final int handlerSyncs) {} + public void postSync(final long timeInNanos, final int handlerSyncs, final int handlerFsyncs) {} } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 5bffea5..a90a7d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -175,7 +175,7 @@ class DisabledWALProvider implements WALProvider { public void sync() { if (!this.listeners.isEmpty()) { for (WALActionsListener listener : this.listeners) { - listener.postSync(0l, 0); + listener.postSync(0l, 0, 0); } } } @@ -186,6 +186,11 @@ class DisabledWALProvider implements WALProvider { } @Override + public void fsync(long txid) { + sync(txid); + } + + @Override public boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { return !(closed.get()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 5a2b08d..31f1723 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -140,6 +140,13 @@ public interface WAL { void sync(long txid) throws IOException; /** + * Sync the WAL and force it to disk, if the txId was not already fsync'd. + * @param txid Transaction id to sync to. + * @throws IOException + */ + void fsync(long txid) throws IOException; + + /** * WAL keeps track of the sequence numbers that were not yet flushed from memstores * in order to be able to do cleanup. This method tells WAL that some region is about * to flush memstore. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 178c322..7561c81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -75,6 +75,7 @@ public interface WALProvider { // interface provided by WAL. interface Writer extends Closeable { void sync() throws IOException; + void fsync() throws IOException; void append(WAL.Entry entry) throws IOException; long getLength() throws IOException; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 2e7afa5..f623a61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -591,6 +591,7 @@ public class PerformanceEvaluation extends Configured implements Tool { String tableName = TABLE_NAME; boolean flushCommits = true; boolean writeToWAL = true; + boolean fsyncWAL = false; boolean autoFlush = false; boolean oneCon = false; boolean useTags = false; @@ -1369,7 +1370,8 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); updateValueSize(value.length); } - put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + put.setDurability(opts.writeToWAL ? opts.fsyncWAL ? Durability.FSYNC_WAL + : Durability.SYNC_WAL : Durability.SKIP_WAL); mutator.mutate(put); } } @@ -1448,7 +1450,8 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); updateValueSize(value.length); } - put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + put.setDurability(opts.writeToWAL ? opts.fsyncWAL ? Durability.FSYNC_WAL + : Durability.SYNC_WAL : Durability.SKIP_WAL); mutator.mutate(put); } } @@ -1650,6 +1653,7 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" flushCommits Used to determine if the test should flush the table. " + "Default: false"); System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); + System.err.println(" fsyncWAL Force WAL edit to disk. Default: False"); System.err.println(" autoFlush Set autoFlush on htable. Default: False"); System.err.println(" oneCon all the threads share the same connection. Default: False"); System.err.println(" presplit Create presplit table. Recommended for accurate perf " + @@ -1785,6 +1789,12 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } + final String fsyncWAL = "--fsyncWAL="; + if (cmd.startsWith(fsyncWAL)) { + opts.fsyncWAL = Boolean.parseBoolean(cmd.substring(fsyncWAL.length())); + continue; + } + final String presplit = "--presplit="; if (cmd.startsWith(presplit)) { opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 8b41594..16d09e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1493,6 +1493,57 @@ public class TestHRegion { } @Test + public void testSyncAndFsync() throws Exception { + byte[] b = Bytes.toBytes(getName()); + byte[] row1 = Bytes.toBytes("row1"); + byte[] row2 = Bytes.toBytes("row2"); + byte[] cf = Bytes.toBytes(COLUMN_FAMILY); + byte[] qual = Bytes.toBytes("qual"); + byte[] val = Bytes.toBytes("val"); + + this.region = initHRegion(b, getName(), CONF, cf); + MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class); + long syncs = metricsAssertHelper.getCounter("syncCount", source); + long fsyncs = metricsAssertHelper.getCounter("fsyncCount", source); + metricsAssertHelper.assertCounter("syncCount", syncs, source); + metricsAssertHelper.assertCounter("fsyncCount", fsyncs, source); + Put p, p1; + p = new Put(row1); + p.add(cf, qual, val); + this.region.put(p); + // make sure the sync counter is updated + syncs++; + metricsAssertHelper.assertCounter("syncCount", syncs, source); + metricsAssertHelper.assertCounter("fsyncCount", fsyncs, source); + + p = new Put(row1); + p.add(cf, qual, val); + p.setDurability(Durability.ASYNC_WAL); + this.region.put(p); + // make sure no counter is updated + metricsAssertHelper.assertCounter("syncCount", syncs, source); + metricsAssertHelper.assertCounter("fsyncCount", fsyncs, source); + + p = new Put(row1); + p.add(cf, qual, val); + p1 = new Put(row2); + p1.add(cf, qual, val); + p1.setDurability(Durability.FSYNC_WAL); + this.region.batchMutate(new Put[]{p, p1}); + // as long as one put in the batch is marked with fsync the wal edit + // is fsync'ed + fsyncs++; + metricsAssertHelper.assertCounter("syncCount", syncs, source); + metricsAssertHelper.assertCounter("fsyncCount", fsyncs, source); + + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + } + + /** + * Test whether basic sync/fsync metric accounting works + */ + @Test public void testBatchPutWithTsSlop() throws Exception { byte[] b = Bytes.toBytes(getName()); byte[] cf = Bytes.toBytes(COLUMN_FAMILY); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 7c13c00..65eeb53 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -195,7 +195,17 @@ public class SequenceFileLogWriter extends WriterBase { @Override public void sync() throws IOException { try { - this.writer.syncFs(); + this.writer.hflush();; + } catch (NullPointerException npe) { + // Concurrent close... + throw new IOException(npe); + } + } + + @Override + public void fsync() throws IOException { + try { + this.writer.hsync();; } catch (NullPointerException npe) { // Concurrent close... throw new IOException(npe); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java index d9183d0..3c9603f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java @@ -50,7 +50,7 @@ public class TestMetricsWAL { long nanos = TimeUnit.MILLISECONDS.toNanos(145); MetricsWALSource source = mock(MetricsWALSourceImpl.class); MetricsWAL metricsWAL = new MetricsWAL(source); - metricsWAL.postSync(nanos, 1); + metricsWAL.postSync(nanos, 1, 0); verify(source, times(1)).incrementSyncTime(145); } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 05d5e51..22aecb2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -505,7 +505,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } @Override - public void postSync(final long timeInNanos, final int handlerSyncs) { + public void postSync(final long timeInNanos, final int handlerSyncs, final int handlerFsyncs) { syncMeter.mark(); syncHistogram.update(timeInNanos); syncCountHistogram.update(handlerSyncs);