diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 8d2afcf..028abeb 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -37,6 +37,7 @@ import java.util.regex.Matcher; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.WritableComparable; +import org.mortbay.log.Log; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; @@ -154,10 +156,17 @@ public class HTableDescriptor implements WritableComparable { * INTERNAL Used by HBase Shell interface to access this metadata * attribute which denotes if the deferred log flush option is enabled */ + @Deprecated public static final String DEFERRED_LOG_FLUSH = "DEFERRED_LOG_FLUSH"; + @Deprecated private static final ImmutableBytesWritable DEFERRED_LOG_FLUSH_KEY = new ImmutableBytesWritable(Bytes.toBytes(DEFERRED_LOG_FLUSH)); + public static final String DURABILITY = "DURABILITY"; + private static final ImmutableBytesWritable DURABILITY_KEY = + new ImmutableBytesWritable(Bytes.toBytes("DURABILITY")); + private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL; + /* * The below are ugly but better than creating them each time till we * replace booleans being saved as Strings with plain booleans. Need a @@ -194,6 +203,7 @@ public class HTableDescriptor implements WritableComparable { String.valueOf(DEFAULT_MEMSTORE_FLUSH_SIZE)); DEFAULT_VALUES.put(DEFERRED_LOG_FLUSH, String.valueOf(DEFAULT_DEFERRED_LOG_FLUSH)); + DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name for (String s : DEFAULT_VALUES.keySet()) { RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s))); } @@ -209,10 +219,11 @@ public class HTableDescriptor implements WritableComparable { * Cache of whether this is root table or not. */ private volatile Boolean root = null; + /** - * Cache of whether deferred logging set. + * Durability setting for the table */ - private Boolean deferredLog = null; + private Durability durability = null; /** * Maps column family name to the respective HColumnDescriptors @@ -373,7 +384,6 @@ public class HTableDescriptor implements WritableComparable { final boolean valueIfNull) { byte [] value = getValue(key); if (value != null) { - // TODO: Make value be a boolean rather than String of boolean. return Boolean.valueOf(Bytes.toString(value)); } return valueIfNull; @@ -522,6 +532,13 @@ public class HTableDescriptor implements WritableComparable { */ private void setValue(final ImmutableBytesWritable key, final ImmutableBytesWritable value) { + if (key.compareTo(DEFERRED_LOG_FLUSH_KEY) == 0) { + boolean isDeferredFlush = Boolean.valueOf(Bytes.toString(value.get())); + Log.warn("HTableDescriptor property:" + DEFERRED_LOG_FLUSH + " is deprecated, " + + "use " + DURABILITY + " instead"); + setDurability(isDeferredFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY); + return; + } values.put(key, value); } @@ -588,13 +605,11 @@ public class HTableDescriptor implements WritableComparable { * @return true if that deferred log flush is enabled on the table * * @see #setDeferredLogFlush(boolean) + * @deprecated use {@link #getDurability()} */ + @Deprecated public synchronized boolean isDeferredLogFlush() { - if(this.deferredLog == null) { - this.deferredLog = - isSomething(DEFERRED_LOG_FLUSH_KEY, DEFAULT_DEFERRED_LOG_FLUSH); - } - return this.deferredLog; + return getDurability() == Durability.ASYNC_WAL; } /** @@ -610,10 +625,33 @@ public class HTableDescriptor implements WritableComparable { *

* * @param isDeferredLogFlush + * @deprecated use {@link #setDurability(Durability)} */ + @Deprecated public synchronized void setDeferredLogFlush(final boolean isDeferredLogFlush) { - setValue(DEFERRED_LOG_FLUSH_KEY, isDeferredLogFlush? TRUE: FALSE); - this.deferredLog = isDeferredLogFlush; + this.setDurability(isDeferredLogFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY); + } + + public void setDurability(Durability durability) { + if (this.durability == Durability.USE_DEFAULT) { + Log.warn("Setting Durability.USE_DEFAULT to table is illegal, it is only " + + "intended to be used from Mutation"); + durability = DEFAULT_DURABLITY; + } + this.durability = durability; + setValue(DURABILITY_KEY, durability.name()); + } + + public Durability getDurability() { + if (this.durability == null) { + byte[] durabilityValue = getValue(DURABILITY_KEY); + if (durabilityValue == null) { + this.durability = DEFAULT_DURABLITY; + } else { + this.durability = Durability.valueOf(Bytes.toString(durabilityValue)); + } + } + return this.durability; } /** diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java index 7088fcc..1fa042c 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java @@ -28,8 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Evolving public enum Durability { + /* Developer note: Do not rename the enum field names. They are serialized in HTableDescriptor*/ /** - * Use the column family's default setting to determine durability. + * Use the table's default setting to determine durability. * This must remain the first option. */ USE_DEFAULT, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d2cae1e..cbef83b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -108,8 +108,8 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; @@ -373,6 +373,7 @@ public class HRegion implements HeapSize { // , Writable{ private final MetricsRegion metricsRegion; private final MetricsRegionWrapperImpl metricsRegionWrapper; private final boolean deferredLogSyncDisabled; + private final Durability durability; /** * HRegion constructor. This constructor should only be used for testing and @@ -482,6 +483,7 @@ public class HRegion implements HeapSize { // , Writable{ // When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled. this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000) <= 0; + this.durability = htd.getDurability(); if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); @@ -626,6 +628,7 @@ public class HRegion implements HeapSize { // , Writable{ for (final HColumnDescriptor family : htableDescriptor.getFamilies()) { status.setStatus("Instantiating store for column family " + family); completionService.submit(new Callable() { + @Override public HStore call() throws IOException { return instantiateHStore(family); } @@ -840,7 +843,7 @@ public class HRegion implements HeapSize { // , Writable{ private final Object closeLock = new Object(); /** Conf key for the periodic flush interval */ - public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = + public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = "hbase.regionserver.optionalcacheflushinterval"; /** Default interval for the memstore flush */ public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000; @@ -936,6 +939,7 @@ public class HRegion implements HeapSize { // , Writable{ for (final Store store : stores.values()) { completionService .submit(new Callable>>() { + @Override public Pair> call() throws IOException { return new Pair>( store.getFamily().getName(), store.close()); @@ -1026,6 +1030,7 @@ public class HRegion implements HeapSize { // , Writable{ new ThreadFactory() { private int count = 1; + @Override public Thread newThread(Runnable r) { return new Thread(r, threadNamePrefix + "-" + count++); } @@ -1477,7 +1482,7 @@ public class HRegion implements HeapSize { // , Writable{ // sync unflushed WAL changes when deferred log sync is enabled // see HBASE-8208 for details - if (wal != null && isDeferredLogSyncEnabled()) { + if (wal != null && !shouldSyncLogFromTableDurability()) { wal.sync(); } @@ -2116,7 +2121,7 @@ public class HRegion implements HeapSize { // , Writable{ batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; Mutation m = batchOp.operations[i].getFirst(); - Durability tmpDur = m.getDurability(); + Durability tmpDur = getEffectiveDurability(m.getDurability()); if (tmpDur.ordinal() > durability.ordinal()) { durability = tmpDur; } @@ -2141,8 +2146,10 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 5. Append the edit to WAL. Do not sync wal. // ------------------------- Mutation first = batchOp.operations[firstIndex].getFirst(); - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), + if (walEdit.size() > 0) { + txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), walEdit, first.getClusterId(), now, this.htableDescriptor); + } // ------------------------------- // STEP 6. Release row locks, etc. @@ -2249,6 +2256,14 @@ public class HRegion implements HeapSize { // , Writable{ } } + /** + * Returns effective durability from the passed durability and + * the table descriptor. + */ + protected Durability getEffectiveDurability(Durability d) { + return d == Durability.USE_DEFAULT ? this.durability : d; + } + //TODO, Think that gets/puts and deletes should be refactored a bit so that //the getting of the lock happens before, so that you would just pass it into //the methods. So in the case of checkAndMutate you could just do lockRow, @@ -3374,6 +3389,7 @@ public class HRegion implements HeapSize { // , Writable{ private long maxResultSize; private HRegion region; + @Override public HRegionInfo getRegionInfo() { return region.getRegionInfo(); } @@ -3567,6 +3583,7 @@ public class HRegion implements HeapSize { // , Writable{ /* * @return True if a filter rules the scanner is over, done. */ + @Override public synchronized boolean isFilterDone() throws IOException { return this.filter != null && this.filter.filterAllRemaining(); } @@ -4530,7 +4547,7 @@ public class HRegion implements HeapSize { // , Writable{ } // 10. Sync edit log if (txid != 0) { - syncOrDefer(txid, processor.useDurability()); + syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); } walSyncSuccessful = true; } @@ -4638,7 +4655,8 @@ public class HRegion implements HeapSize { // , Writable{ byte[] row = append.getRow(); checkRow(row, "append"); boolean flush = false; - boolean writeToWAL = append.getDurability() != Durability.SKIP_WAL; + Durability durability = getEffectiveDurability(append.getDurability()); + boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; List allKVs = new ArrayList(append.size()); Map> tempMemstore = new HashMap>(); @@ -4772,7 +4790,7 @@ public class HRegion implements HeapSize { // , Writable{ } if (writeToWAL) { // sync the transaction log outside the rowlock - syncOrDefer(txid, append.getDurability()); + syncOrDefer(txid, durability); } } finally { if (w != null) { @@ -4806,7 +4824,8 @@ public class HRegion implements HeapSize { // , Writable{ checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); boolean flush = false; - boolean writeToWAL = increment.getDurability() != Durability.SKIP_WAL; + Durability durability = getEffectiveDurability(increment.getDurability()); + boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; List allKVs = new ArrayList(increment.size()); Map> tempMemstore = new HashMap>(); @@ -4916,7 +4935,7 @@ public class HRegion implements HeapSize { // , Writable{ } if (writeToWAL) { // sync the transaction log outside the rowlock - syncOrDefer(txid, increment.getDurability()); + syncOrDefer(txid, durability); } } finally { if (w != null) { @@ -4952,7 +4971,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (11 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); @@ -5344,8 +5363,8 @@ public class HRegion implements HeapSize { // , Writable{ } else { switch(durability) { case USE_DEFAULT: - // do what CF defaults to - if (!isDeferredLogSyncEnabled()) { + // do what table defaults to + if (shouldSyncLogFromTableDurability()) { this.log.sync(txid); } break; @@ -5370,8 +5389,9 @@ public class HRegion implements HeapSize { // , Writable{ /** * check if current region is deferred sync enabled. */ - private boolean isDeferredLogSyncEnabled() { - return (this.htableDescriptor.isDeferredLogFlush() && !this.deferredLogSyncDisabled); + private boolean shouldSyncLogFromTableDurability() { + return this.deferredLogSyncDisabled || + durability.ordinal() > Durability.ASYNC_WAL.ordinal(); } /** diff --git hbase-server/src/main/ruby/hbase/admin.rb hbase-server/src/main/ruby/hbase/admin.rb index 4c908be..803ef3b 100644 --- hbase-server/src/main/ruby/hbase/admin.rb +++ hbase-server/src/main/ruby/hbase/admin.rb @@ -261,6 +261,7 @@ module Hbase htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY] htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE] htd.setDeferredLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH] + htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY))) if arg[DURABILITY] set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] @@ -477,6 +478,7 @@ module Hbase htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY] htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE] htd.setDeferredLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH] + htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY))) if arg[DURABILITY] set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java index bc8e72c..19d0af7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java @@ -27,6 +27,7 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -46,14 +47,14 @@ public class TestHTableDescriptor { HTableDescriptor htd = new HTableDescriptor(HTableDescriptor.META_TABLEDESC); final int v = 123; htd.setMaxFileSize(v); - htd.setDeferredLogFlush(true); + htd.setDurability(Durability.ASYNC_WAL); htd.setReadOnly(true); byte [] bytes = htd.toByteArray(); HTableDescriptor deserializedHtd = HTableDescriptor.parseFrom(bytes); assertEquals(htd, deserializedHtd); assertEquals(v, deserializedHtd.getMaxFileSize()); assertTrue(deserializedHtd.isReadOnly()); - assertTrue(deserializedHtd.isDeferredLogFlush()); + assertEquals(Durability.ASYNC_WAL, deserializedHtd.getDurability()); } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 07f02ad..2cd35ae 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -18,6 +18,14 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -27,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -56,6 +65,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -3868,6 +3878,101 @@ public class TestHRegion extends HBaseTestCase { assertEquals(Bytes.toBytes("value1"), kvs.get(0).getValue()); } + @Test + public void testDurability() throws Exception { + String method = "testDurability"; + // there are 4 x 5 cases: + // table durability(SYNC,FSYNC,ASYC,SKIP) x mutation durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) + + // expected cases for append and sync wal + durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false); + + durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false); + + durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); + + durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false); + + // expected cases for async wal + // do not sync for deferred flush with large optionallogflushinterval + conf.setLong("hbase.regionserver.optionallogflushinterval", Integer.MAX_VALUE); + durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false); + + // now small deferred log flush optionallogflushinterval, expect sync + conf.setLong("hbase.regionserver.optionallogflushinterval", 5); + durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); + durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); + durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); + durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true); + durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true); + + // expect skip wal cases + durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false); + + + } + + private void durabilityTest(String method, Durability tableDurability, + Durability mutationDurability, long timeout, boolean expectAppend, + final boolean expectSync, final boolean expectSyncFromLogSyncer) throws Exception { + method = method + "_" + tableDurability.name() + "_" + mutationDurability.name(); + byte[] tableName = Bytes.toBytes(method); + byte[] family = Bytes.toBytes("family"); + Path logDir = new Path(new Path(DIR + method), "log"); + HLog hlog = HLogFactory.createHLog(fs, logDir, UUID.randomUUID().toString(), conf); + final HLog log = spy(hlog); + this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, method, conf, false, + tableDurability, log, new byte[][] {family}); + + Put put = new Put(Bytes.toBytes("r1")); + put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); + put.setDurability(mutationDurability); + region.put(put); + + //verify append called or not + verify(log, expectAppend ? times(1) : never()) + .appendNoSync((HRegionInfo)any(), eq(tableName), + (WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any()); + + //verify sync called or not + if (expectSync || expectSyncFromLogSyncer) { + TEST_UTIL.waitFor(timeout, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + try { + if (expectSync) { + verify(log, times(1)).sync(anyLong()); //Hregion calls this one + } else if (expectSyncFromLogSyncer) { + verify(log, times(1)).sync(); //log syncer calls this one + } + } catch (Throwable ignore) {} + return true; + } + }); + } else { + verify(log, never()).sync(anyLong()); + verify(log, never()).sync(); + } + + hlog.close(); + region.close(); + } + private void putData(int startRow, int numRows, byte [] qf, byte [] ...families) throws IOException { @@ -3997,6 +4102,13 @@ public class TestHRegion extends HBaseTestCase { return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families); } + private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, + String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) + throws IOException { + return initHRegion(tableName, startKey, stopKey, callingMethod, conf, isReadOnly, + Durability.SYNC_WAL, null, families); + } + /** * @param tableName * @param startKey @@ -4009,13 +4121,15 @@ public class TestHRegion extends HBaseTestCase { * @return A region on which you must call {@link HRegion#closeHRegion(HRegion)} when done. */ private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, - String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) + String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, + HLog hlog, byte[]... families) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); htd.setReadOnly(isReadOnly); for(byte [] family : families) { htd.addFamily(new HColumnDescriptor(family)); } + htd.setDurability(durability); HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false); Path path = new Path(DIR + callingMethod); FileSystem fs = FileSystem.get(conf); @@ -4024,7 +4138,7 @@ public class TestHRegion extends HBaseTestCase { throw new IOException("Failed delete of " + path); } } - return HRegion.createHRegion(info, path, conf, htd); + return HRegion.createHRegion(info, path, conf, htd, hlog); } /**