diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 8d2afcf..084b7e2 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -37,6 +37,7 @@ import java.util.regex.Matcher;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.WritableComparable;
+import org.mortbay.log.Log;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -154,10 +156,17 @@ public class HTableDescriptor implements WritableComparable {
* INTERNAL Used by HBase Shell interface to access this metadata
* attribute which denotes if the deferred log flush option is enabled
*/
+ @Deprecated
public static final String DEFERRED_LOG_FLUSH = "DEFERRED_LOG_FLUSH";
+ @Deprecated
private static final ImmutableBytesWritable DEFERRED_LOG_FLUSH_KEY =
new ImmutableBytesWritable(Bytes.toBytes(DEFERRED_LOG_FLUSH));
+ public static final String DURABILITY = "DURABILITY";
+ private static final ImmutableBytesWritable DURABILITY_KEY =
+ new ImmutableBytesWritable(Bytes.toBytes("DURABILITY"));
+ private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
+
/*
* The below are ugly but better than creating them each time till we
* replace booleans being saved as Strings with plain booleans. Need a
@@ -194,6 +203,7 @@ public class HTableDescriptor implements WritableComparable {
String.valueOf(DEFAULT_MEMSTORE_FLUSH_SIZE));
DEFAULT_VALUES.put(DEFERRED_LOG_FLUSH,
String.valueOf(DEFAULT_DEFERRED_LOG_FLUSH));
+ DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name
for (String s : DEFAULT_VALUES.keySet()) {
RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s)));
}
@@ -209,10 +219,11 @@ public class HTableDescriptor implements WritableComparable {
* Cache of whether this is root table or not.
*/
private volatile Boolean root = null;
+
/**
- * Cache of whether deferred logging set.
+ * Durability setting for the table
*/
- private Boolean deferredLog = null;
+ private Durability durability = null;
/**
* Maps column family name to the respective HColumnDescriptors
@@ -373,7 +384,6 @@ public class HTableDescriptor implements WritableComparable {
final boolean valueIfNull) {
byte [] value = getValue(key);
if (value != null) {
- // TODO: Make value be a boolean rather than String of boolean.
return Boolean.valueOf(Bytes.toString(value));
}
return valueIfNull;
@@ -522,6 +532,13 @@ public class HTableDescriptor implements WritableComparable {
*/
private void setValue(final ImmutableBytesWritable key,
final ImmutableBytesWritable value) {
+ if (key.compareTo(DEFERRED_LOG_FLUSH_KEY) == 0) {
+ boolean isDeferredFlush = Boolean.valueOf(Bytes.toString(value.get()));
+ Log.warn("HTableDescriptor property:" + DEFERRED_LOG_FLUSH + " is deprecated, " +
+ "use " + DURABILITY + " instead");
+ setDurability(isDeferredFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY);
+ return;
+ }
values.put(key, value);
}
@@ -588,13 +605,11 @@ public class HTableDescriptor implements WritableComparable {
* @return true if that deferred log flush is enabled on the table
*
* @see #setDeferredLogFlush(boolean)
+ * @deprecated use {@link #getDurablity()}
*/
+ @Deprecated
public synchronized boolean isDeferredLogFlush() {
- if(this.deferredLog == null) {
- this.deferredLog =
- isSomething(DEFERRED_LOG_FLUSH_KEY, DEFAULT_DEFERRED_LOG_FLUSH);
- }
- return this.deferredLog;
+ return getDurability() == Durability.ASYNC_WAL;
}
/**
@@ -610,10 +625,33 @@ public class HTableDescriptor implements WritableComparable {
*
*
* @param isDeferredLogFlush
+ * @deprecated use {@link #setDurability(Durability)}
*/
+ @Deprecated
public synchronized void setDeferredLogFlush(final boolean isDeferredLogFlush) {
- setValue(DEFERRED_LOG_FLUSH_KEY, isDeferredLogFlush? TRUE: FALSE);
- this.deferredLog = isDeferredLogFlush;
+ this.setDurability(Durability.ASYNC_WAL);
+ }
+
+ public void setDurability(Durability durability) {
+ if (this.durability == Durability.USE_DEFAULT) {
+ Log.warn("Setting Durability.USE_DEFAULT to table is illegal, it is only " +
+ "intended to be used from Mutation");
+ durability = DEFAULT_DURABLITY;
+ }
+ this.durability = durability;
+ setValue(DURABILITY_KEY, durability.name());
+ }
+
+ public Durability getDurability() {
+ if (this.durability == null) {
+ byte[] durabilityValue = getValue(DURABILITY_KEY);
+ if (durabilityValue == null) {
+ this.durability = DEFAULT_DURABLITY;
+ } else {
+ this.durability = Durability.valueOf(Bytes.toString(durabilityValue));
+ }
+ }
+ return this.durability;
}
/**
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java
index 7088fcc..1fa042c 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java
@@ -28,8 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public enum Durability {
+ /* Developer note: Do not rename the enum field names. They are serialized in HTableDescriptor */
/**
- * Use the column family's default setting to determine durability.
+ * Use the table's default setting to determine durability.
* This must remain the first option.
*/
USE_DEFAULT,
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d2cae1e..cbef83b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -108,8 +108,8 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
@@ -373,6 +373,7 @@ public class HRegion implements HeapSize { // , Writable{
private final MetricsRegion metricsRegion;
private final MetricsRegionWrapperImpl metricsRegionWrapper;
private final boolean deferredLogSyncDisabled;
+ private final Durability durability;
/**
* HRegion constructor. This constructor should only be used for testing and
@@ -482,6 +483,7 @@ public class HRegion implements HeapSize { // , Writable{
// When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled.
this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval",
1 * 1000) <= 0;
+ this.durability = htd.getDurability();
if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting();
@@ -626,6 +628,7 @@ public class HRegion implements HeapSize { // , Writable{
for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
status.setStatus("Instantiating store for column family " + family);
completionService.submit(new Callable() {
+ @Override
public HStore call() throws IOException {
return instantiateHStore(family);
}
@@ -840,7 +843,7 @@ public class HRegion implements HeapSize { // , Writable{
private final Object closeLock = new Object();
/** Conf key for the periodic flush interval */
- public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
+ public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
"hbase.regionserver.optionalcacheflushinterval";
/** Default interval for the memstore flush */
public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
@@ -936,6 +939,7 @@ public class HRegion implements HeapSize { // , Writable{
for (final Store store : stores.values()) {
completionService
.submit(new Callable>>() {
+ @Override
public Pair> call() throws IOException {
return new Pair>(
store.getFamily().getName(), store.close());
@@ -1026,6 +1030,7 @@ public class HRegion implements HeapSize { // , Writable{
new ThreadFactory() {
private int count = 1;
+ @Override
public Thread newThread(Runnable r) {
return new Thread(r, threadNamePrefix + "-" + count++);
}
@@ -1477,7 +1482,7 @@ public class HRegion implements HeapSize { // , Writable{
// sync unflushed WAL changes when deferred log sync is enabled
// see HBASE-8208 for details
- if (wal != null && isDeferredLogSyncEnabled()) {
+ if (wal != null && !shouldSyncLogFromTableDurability()) {
wal.sync();
}
@@ -2116,7 +2121,7 @@ public class HRegion implements HeapSize { // , Writable{
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
Mutation m = batchOp.operations[i].getFirst();
- Durability tmpDur = m.getDurability();
+ Durability tmpDur = getEffectiveDurability(m.getDurability());
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
}
@@ -2141,8 +2146,10 @@ public class HRegion implements HeapSize { // , Writable{
// STEP 5. Append the edit to WAL. Do not sync wal.
// -------------------------
Mutation first = batchOp.operations[firstIndex].getFirst();
- txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
+ if (walEdit.size() > 0) {
+ txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
walEdit, first.getClusterId(), now, this.htableDescriptor);
+ }
// -------------------------------
// STEP 6. Release row locks, etc.
@@ -2249,6 +2256,14 @@ public class HRegion implements HeapSize { // , Writable{
}
}
+ /**
+ * Returns effective durability from the passed durability and
+ * the table descriptor.
+ */
+ protected Durability getEffectiveDurability(Durability d) {
+ return d == Durability.USE_DEFAULT ? this.durability : d;
+ }
+
//TODO, Think that gets/puts and deletes should be refactored a bit so that
//the getting of the lock happens before, so that you would just pass it into
//the methods. So in the case of checkAndMutate you could just do lockRow,
@@ -3374,6 +3389,7 @@ public class HRegion implements HeapSize { // , Writable{
private long maxResultSize;
private HRegion region;
+ @Override
public HRegionInfo getRegionInfo() {
return region.getRegionInfo();
}
@@ -3567,6 +3583,7 @@ public class HRegion implements HeapSize { // , Writable{
/*
* @return True if a filter rules the scanner is over, done.
*/
+ @Override
public synchronized boolean isFilterDone() throws IOException {
return this.filter != null && this.filter.filterAllRemaining();
}
@@ -4530,7 +4547,7 @@ public class HRegion implements HeapSize { // , Writable{
}
// 10. Sync edit log
if (txid != 0) {
- syncOrDefer(txid, processor.useDurability());
+ syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
}
walSyncSuccessful = true;
}
@@ -4638,7 +4655,8 @@ public class HRegion implements HeapSize { // , Writable{
byte[] row = append.getRow();
checkRow(row, "append");
boolean flush = false;
- boolean writeToWAL = append.getDurability() != Durability.SKIP_WAL;
+ Durability durability = getEffectiveDurability(append.getDurability());
+ boolean writeToWAL = durability != Durability.SKIP_WAL;
WALEdit walEdits = null;
List allKVs = new ArrayList(append.size());
Map> tempMemstore = new HashMap>();
@@ -4772,7 +4790,7 @@ public class HRegion implements HeapSize { // , Writable{
}
if (writeToWAL) {
// sync the transaction log outside the rowlock
- syncOrDefer(txid, append.getDurability());
+ syncOrDefer(txid, durability);
}
} finally {
if (w != null) {
@@ -4806,7 +4824,8 @@ public class HRegion implements HeapSize { // , Writable{
checkRow(row, "increment");
TimeRange tr = increment.getTimeRange();
boolean flush = false;
- boolean writeToWAL = increment.getDurability() != Durability.SKIP_WAL;
+ Durability durability = getEffectiveDurability(increment.getDurability());
+ boolean writeToWAL = durability != Durability.SKIP_WAL;
WALEdit walEdits = null;
List allKVs = new ArrayList(increment.size());
Map> tempMemstore = new HashMap>();
@@ -4916,7 +4935,7 @@ public class HRegion implements HeapSize { // , Writable{
}
if (writeToWAL) {
// sync the transaction log outside the rowlock
- syncOrDefer(txid, increment.getDurability());
+ syncOrDefer(txid, durability);
}
} finally {
if (w != null) {
@@ -4952,7 +4971,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ 39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(11 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);
@@ -5344,8 +5363,8 @@ public class HRegion implements HeapSize { // , Writable{
} else {
switch(durability) {
case USE_DEFAULT:
- // do what CF defaults to
- if (!isDeferredLogSyncEnabled()) {
+ // do what table defaults to
+ if (shouldSyncLogFromTableDurability()) {
this.log.sync(txid);
}
break;
@@ -5370,8 +5389,9 @@ public class HRegion implements HeapSize { // , Writable{
/**
* check if current region is deferred sync enabled.
*/
- private boolean isDeferredLogSyncEnabled() {
- return (this.htableDescriptor.isDeferredLogFlush() && !this.deferredLogSyncDisabled);
+ private boolean shouldSyncLogFromTableDurability() {
+ return this.deferredLogSyncDisabled ||
+ durability.ordinal() > Durability.ASYNC_WAL.ordinal();
}
/**
diff --git hbase-server/src/main/ruby/hbase/admin.rb hbase-server/src/main/ruby/hbase/admin.rb
index 4c908be..803ef3b 100644
--- hbase-server/src/main/ruby/hbase/admin.rb
+++ hbase-server/src/main/ruby/hbase/admin.rb
@@ -261,6 +261,7 @@ module Hbase
htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY]
htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE]
htd.setDeferredLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH]
+ htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY))) if arg[DURABILITY]
set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
@@ -477,6 +478,7 @@ module Hbase
htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY]
htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE]
htd.setDeferredLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH]
+ htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY))) if arg[DURABILITY]
set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
index bc8e72c..19d0af7 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -46,14 +47,14 @@ public class TestHTableDescriptor {
HTableDescriptor htd = new HTableDescriptor(HTableDescriptor.META_TABLEDESC);
final int v = 123;
htd.setMaxFileSize(v);
- htd.setDeferredLogFlush(true);
+ htd.setDurability(Durability.ASYNC_WAL);
htd.setReadOnly(true);
byte [] bytes = htd.toByteArray();
HTableDescriptor deserializedHtd = HTableDescriptor.parseFrom(bytes);
assertEquals(htd, deserializedHtd);
assertEquals(v, deserializedHtd.getMaxFileSize());
assertTrue(deserializedHtd.isReadOnly());
- assertTrue(deserializedHtd.isDeferredLogFlush());
+ assertEquals(Durability.ASYNC_WAL, deserializedHtd.getDurability());
}
/**
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 07f02ad..2cd35ae 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -18,6 +18,14 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@@ -27,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -56,6 +65,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -3868,6 +3878,101 @@ public class TestHRegion extends HBaseTestCase {
assertEquals(Bytes.toBytes("value1"), kvs.get(0).getValue());
}
+ @Test
+ public void testDurability() throws Exception {
+ String method = "testDurability";
+ // there are 4 x 5 cases:
+ // table durability(SYNC,FSYNC,ASYC,SKIP) x mutation durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
+
+ // expected cases for append and sync wal
+ durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
+ durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
+ durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
+
+ durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
+ durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
+ durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
+
+ durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
+ durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
+
+ durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
+ durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
+
+ // expected cases for async wal
+ // do not sync for deferred flush with large optionallogflushinterval
+ conf.setLong("hbase.regionserver.optionallogflushinterval", Integer.MAX_VALUE);
+ durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
+ durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
+ durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
+ durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
+ durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
+
+ // now small deferred log flush optionallogflushinterval, expect sync
+ conf.setLong("hbase.regionserver.optionallogflushinterval", 5);
+ durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
+ durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
+ durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
+ durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
+ durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
+
+ // expect skip wal cases
+ durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
+ durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
+ durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
+ durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
+ durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
+
+
+ }
+
+ private void durabilityTest(String method, Durability tableDurability,
+ Durability mutationDurability, long timeout, boolean expectAppend,
+ final boolean expectSync, final boolean expectSyncFromLogSyncer) throws Exception {
+ method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
+ byte[] tableName = Bytes.toBytes(method);
+ byte[] family = Bytes.toBytes("family");
+ Path logDir = new Path(new Path(DIR + method), "log");
+ HLog hlog = HLogFactory.createHLog(fs, logDir, UUID.randomUUID().toString(), conf);
+ final HLog log = spy(hlog);
+ this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, method, conf, false,
+ tableDurability, log, new byte[][] {family});
+
+ Put put = new Put(Bytes.toBytes("r1"));
+ put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
+ put.setDurability(mutationDurability);
+ region.put(put);
+
+ //verify append called or not
+ verify(log, expectAppend ? times(1) : never())
+ .appendNoSync((HRegionInfo)any(), eq(tableName),
+ (WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any());
+
+ //verify sync called or not
+ if (expectSync || expectSyncFromLogSyncer) {
+ TEST_UTIL.waitFor(timeout, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ try {
+ if (expectSync) {
+ verify(log, times(1)).sync(anyLong()); //Hregion calls this one
+ } else if (expectSyncFromLogSyncer) {
+ verify(log, times(1)).sync(); //log syncer calls this one
+ }
+ } catch (Throwable ignore) {}
+ return true;
+ }
+ });
+ } else {
+ verify(log, never()).sync(anyLong());
+ verify(log, never()).sync();
+ }
+
+ hlog.close();
+ region.close();
+ }
+
private void putData(int startRow, int numRows, byte [] qf,
byte [] ...families)
throws IOException {
@@ -3997,6 +4102,13 @@ public class TestHRegion extends HBaseTestCase {
return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
}
+ private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
+ String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
+ throws IOException {
+ return initHRegion(tableName, startKey, stopKey, callingMethod, conf, isReadOnly,
+ Durability.SYNC_WAL, null, families);
+ }
+
/**
* @param tableName
* @param startKey
@@ -4009,13 +4121,15 @@ public class TestHRegion extends HBaseTestCase {
* @return A region on which you must call {@link HRegion#closeHRegion(HRegion)} when done.
*/
private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
- String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
+ String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
+ HLog hlog, byte[]... families)
throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.setReadOnly(isReadOnly);
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
+ htd.setDurability(durability);
HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false);
Path path = new Path(DIR + callingMethod);
FileSystem fs = FileSystem.get(conf);
@@ -4024,7 +4138,7 @@ public class TestHRegion extends HBaseTestCase {
throw new IOException("Failed delete of " + path);
}
}
- return HRegion.createHRegion(info, path, conf, htd);
+ return HRegion.createHRegion(info, path, conf, htd, hlog);
}
/**