diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 0fddf7d..4669c28 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -34,9 +34,12 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.regex.Matcher; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -63,6 +66,8 @@ import com.google.protobuf.InvalidProtocolBufferException; @InterfaceStability.Evolving public class HTableDescriptor implements WritableComparable { + private static final Log LOG = LogFactory.getLog(HTableDescriptor.class); + /** * Changes prior to version 3 were not recorded here. * Version 3 adds metadata as a map where keys and values are byte[]. @@ -153,12 +158,25 @@ public class HTableDescriptor implements WritableComparable { /** * INTERNAL Used by HBase Shell interface to access this metadata - * attribute which denotes if the deferred log flush option is enabled + * attribute which denotes if the deferred log flush option is enabled. + * @deprecated Use {@link #DURABILITY} instead. */ + @Deprecated public static final String DEFERRED_LOG_FLUSH = "DEFERRED_LOG_FLUSH"; + @Deprecated private static final ImmutableBytesWritable DEFERRED_LOG_FLUSH_KEY = new ImmutableBytesWritable(Bytes.toBytes(DEFERRED_LOG_FLUSH)); + /** + * INTERNAL {@link Durability} setting for the table. + */ + public static final String DURABILITY = "DURABILITY"; + private static final ImmutableBytesWritable DURABILITY_KEY = + new ImmutableBytesWritable(Bytes.toBytes("DURABILITY")); + + /** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */ + private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT; + /* * The below are ugly but better than creating them each time till we * replace booleans being saved as Strings with plain booleans. Need a @@ -195,6 +213,7 @@ public class HTableDescriptor implements WritableComparable { String.valueOf(DEFAULT_MEMSTORE_FLUSH_SIZE)); DEFAULT_VALUES.put(DEFERRED_LOG_FLUSH, String.valueOf(DEFAULT_DEFERRED_LOG_FLUSH)); + DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name for (String s : DEFAULT_VALUES.keySet()) { RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s))); } @@ -210,10 +229,11 @@ public class HTableDescriptor implements WritableComparable { * Cache of whether this is root table or not. */ private volatile Boolean root = null; + /** - * Cache of whether deferred logging set. + * Durability setting for the table */ - private Boolean deferredLog = null; + private Durability durability = null; /** * Maps column family name to the respective HColumnDescriptors @@ -374,7 +394,6 @@ public class HTableDescriptor implements WritableComparable { final boolean valueIfNull) { byte [] value = getValue(key); if (value != null) { - // TODO: Make value be a boolean rather than String of boolean. return Boolean.valueOf(Bytes.toString(value)); } return valueIfNull; @@ -525,6 +544,13 @@ public class HTableDescriptor implements WritableComparable { */ public void setValue(final ImmutableBytesWritable key, final ImmutableBytesWritable value) { + if (key.compareTo(DEFERRED_LOG_FLUSH_KEY) == 0) { + boolean isDeferredFlush = Boolean.valueOf(Bytes.toString(value.get())); + LOG.warn("HTableDescriptor property:" + DEFERRED_LOG_FLUSH + " is deprecated, " + + "use " + DURABILITY + " instead"); + setDurability(isDeferredFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY); + return; + } values.put(key, value); } @@ -591,13 +617,11 @@ public class HTableDescriptor implements WritableComparable { * @return true if that deferred log flush is enabled on the table * * @see #setDeferredLogFlush(boolean) + * @deprecated use {@link #getDurability()} */ + @Deprecated public synchronized boolean isDeferredLogFlush() { - if(this.deferredLog == null) { - this.deferredLog = - isSomething(DEFERRED_LOG_FLUSH_KEY, DEFAULT_DEFERRED_LOG_FLUSH); - } - return this.deferredLog; + return getDurability() == Durability.ASYNC_WAL; } /** @@ -613,10 +637,42 @@ public class HTableDescriptor implements WritableComparable { *

* * @param isDeferredLogFlush + * @deprecated use {@link #setDurability(Durability)} */ + @Deprecated public synchronized void setDeferredLogFlush(final boolean isDeferredLogFlush) { - setValue(DEFERRED_LOG_FLUSH_KEY, isDeferredLogFlush? TRUE: FALSE); - this.deferredLog = isDeferredLogFlush; + this.setDurability(isDeferredLogFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY); + } + + /** + * Sets the {@link Durability} setting for the table. This defaults to Durability.USE_DEFAULT. + * @param durability enum value + */ + public void setDurability(Durability durability) { + this.durability = durability; + setValue(DURABILITY_KEY, durability.name()); + } + + /** + * Returns the durability setting for the table. + * @return durability setting for the table. + */ + public Durability getDurability() { + if (this.durability == null) { + byte[] durabilityValue = getValue(DURABILITY_KEY); + if (durabilityValue == null) { + this.durability = DEFAULT_DURABLITY; + } else { + try { + this.durability = Durability.valueOf(Bytes.toString(durabilityValue)); + } catch (IllegalArgumentException ex) { + LOG.warn("Received " + ex + " because Durability value for HTableDescriptor" + + " is not known. Durability:" + Bytes.toString(durabilityValue)); + this.durability = DEFAULT_DURABLITY; + } + } + } + return this.durability; } /** diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java index 7088fcc..61ed045 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java @@ -22,14 +22,16 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** - * Enum describing the durability guarantees for {@link Mutation} + * Enum describing the durability guarantees for tables and {@link Mutation}s * Note that the items must be sorted in order of increasing durability */ @InterfaceAudience.Public @InterfaceStability.Evolving public enum Durability { + /* Developer note: Do not rename the enum field names. They are serialized in HTableDescriptor */ /** - * Use the column family's default setting to determine durability. + * If this is for tables durability, use HBase's global default value (SYNC_WAL). + * Otherwise, if this is for mutation, use the table's default setting to determine durability. * This must remain the first option. */ USE_DEFAULT, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7f21e3a..79b0ea7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -109,8 +109,8 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; @@ -192,6 +192,12 @@ public class HRegion implements HeapSize { // , Writable{ public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand"; + /** + * This is the global default value for durability. All tables/mutations not + * defining a durability or using USE_DEFAULT will default to this value. + */ + private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL; + final AtomicBoolean closed = new AtomicBoolean(false); /* Closing can take some time; use the closing flag if there is stuff we don't * want to do while in closing state; e.g. like offer this region up to the @@ -399,6 +405,7 @@ public class HRegion implements HeapSize { // , Writable{ private final MetricsRegion metricsRegion; private final MetricsRegionWrapperImpl metricsRegionWrapper; private final boolean deferredLogSyncDisabled; + private final Durability durability; /** * HRegion constructor. This constructor should only be used for testing and @@ -508,6 +515,9 @@ public class HRegion implements HeapSize { // , Writable{ // When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled. this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000) <= 0; + this.durability = htd.getDurability() == Durability.USE_DEFAULT + ? DEFAULT_DURABLITY + : htd.getDurability(); if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -651,6 +661,7 @@ public class HRegion implements HeapSize { // , Writable{ for (final HColumnDescriptor family : htableDescriptor.getFamilies()) { status.setStatus("Instantiating store for column family " + family); completionService.submit(new Callable() { + @Override public HStore call() throws IOException { return instantiateHStore(family); } @@ -810,7 +821,7 @@ public class HRegion implements HeapSize { // , Writable{ public void setRecovering(boolean newState) { this.getRegionInfo().setRecovering(newState); } - + /** * @return True if current region is in recovering */ @@ -880,7 +891,7 @@ public class HRegion implements HeapSize { // , Writable{ private final Object closeLock = new Object(); /** Conf key for the periodic flush interval */ - public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = + public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = "hbase.regionserver.optionalcacheflushinterval"; /** Default interval for the memstore flush */ public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000; @@ -976,6 +987,7 @@ public class HRegion implements HeapSize { // , Writable{ for (final Store store : stores.values()) { completionService .submit(new Callable>>() { + @Override public Pair> call() throws IOException { return new Pair>( store.getFamily().getName(), store.close()); @@ -1066,6 +1078,7 @@ public class HRegion implements HeapSize { // , Writable{ new ThreadFactory() { private int count = 1; + @Override public Thread newThread(Runnable r) { return new Thread(r, threadNamePrefix + "-" + count++); } @@ -1520,7 +1533,7 @@ public class HRegion implements HeapSize { // , Writable{ // sync unflushed WAL changes when deferred log sync is enabled // see HBASE-8208 for details - if (wal != null && isDeferredLogSyncEnabled()) { + if (wal != null && !shouldSyncLog()) { wal.sync(); } @@ -1905,7 +1918,7 @@ public class HRegion implements HeapSize { // , Writable{ Pair[] mutationsAndLocks) throws IOException { return batchMutate(mutationsAndLocks, false); } - + /** * Perform a batch of mutations. * It supports only Put and Delete mutations and will ignore other types passed. @@ -1954,7 +1967,7 @@ public class HRegion implements HeapSize { // , Writable{ } return batchOp.retCodeDetails; } - + private void doPreMutationHook(BatchOperationInProgress> batchOp) throws IOException { @@ -2109,7 +2122,7 @@ public class HRegion implements HeapSize { // , Writable{ } } } - + // we should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp now = EnvironmentEdgeManager.currentTimeMillis(); @@ -2149,8 +2162,8 @@ public class HRegion implements HeapSize { // , Writable{ // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; } @@ -2186,7 +2199,7 @@ public class HRegion implements HeapSize { // , Writable{ batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; Mutation m = batchOp.operations[i].getFirst(); - Durability tmpDur = m.getDurability(); + Durability tmpDur = getEffectiveDurability(m.getDurability()); if (tmpDur.ordinal() > durability.ordinal()) { durability = tmpDur; } @@ -2209,8 +2222,10 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 5. Append the edit to WAL. Do not sync wal. // ------------------------- Mutation first = batchOp.operations[firstIndex].getFirst(); - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), + if (walEdit.size() > 0) { + txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), walEdit, first.getClusterId(), now, this.htableDescriptor); + } // ------------------------------- // STEP 6. Release row locks, etc. @@ -2234,8 +2249,8 @@ public class HRegion implements HeapSize { // , Writable{ walSyncSuccessful = true; // calling the post CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { - MiniBatchOperationInProgress> miniBatchOp = - new MiniBatchOperationInProgress>(batchOp.operations, + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } @@ -2317,6 +2332,14 @@ public class HRegion implements HeapSize { // , Writable{ } } + /** + * Returns effective durability from the passed durability and + * the table descriptor. + */ + protected Durability getEffectiveDurability(Durability d) { + return d == Durability.USE_DEFAULT ? this.durability : d; + } + //TODO, Think that gets/puts and deletes should be refactored a bit so that //the getting of the lock happens before, so that you would just pass it into //the methods. So in the case of checkAndMutate you could just do lockRow, @@ -3466,6 +3489,7 @@ public class HRegion implements HeapSize { // , Writable{ private long maxResultSize; private HRegion region; + @Override public HRegionInfo getRegionInfo() { return region.getRegionInfo(); } @@ -3658,6 +3682,7 @@ public class HRegion implements HeapSize { // , Writable{ /* * @return True if a filter rules the scanner is over, done. */ + @Override public synchronized boolean isFilterDone() throws IOException { return this.filter != null && this.filter.filterAllRemaining(); } @@ -4618,7 +4643,7 @@ public class HRegion implements HeapSize { // , Writable{ } // 10. Sync edit log if (txid != 0) { - syncOrDefer(txid, processor.useDurability()); + syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); } walSyncSuccessful = true; } @@ -4726,7 +4751,8 @@ public class HRegion implements HeapSize { // , Writable{ byte[] row = append.getRow(); checkRow(row, "append"); boolean flush = false; - boolean writeToWAL = append.getDurability() != Durability.SKIP_WAL; + Durability durability = getEffectiveDurability(append.getDurability()); + boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; List allKVs = new ArrayList(append.size()); Map> tempMemstore = new HashMap>(); @@ -4861,7 +4887,7 @@ public class HRegion implements HeapSize { // , Writable{ } if (writeToWAL) { // sync the transaction log outside the rowlock - syncOrDefer(txid, append.getDurability()); + syncOrDefer(txid, durability); } } finally { if (w != null) { @@ -4895,7 +4921,8 @@ public class HRegion implements HeapSize { // , Writable{ checkRow(row, "increment"); TimeRange tr = increment.getTimeRange(); boolean flush = false; - boolean writeToWAL = increment.getDurability() != Durability.SKIP_WAL; + Durability durability = getEffectiveDurability(increment.getDurability()); + boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; List allKVs = new ArrayList(increment.size()); Map> tempMemstore = new HashMap>(); @@ -5006,7 +5033,7 @@ public class HRegion implements HeapSize { // , Writable{ } if (writeToWAL) { // sync the transaction log outside the rowlock - syncOrDefer(txid, increment.getDurability()); + syncOrDefer(txid, durability); } } finally { if (w != null) { @@ -5043,7 +5070,7 @@ public class HRegion implements HeapSize { // , Writable{ ClassSize.OBJECT + ClassSize.ARRAY + 39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (11 * Bytes.SIZEOF_LONG) + + (12 * Bytes.SIZEOF_LONG) + 2 * Bytes.SIZEOF_BOOLEAN); public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + @@ -5346,7 +5373,7 @@ public class HRegion implements HeapSize { // , Writable{ case DELETE: case BATCH_MUTATE: // when a region is in recovering state, no read, split or merge is allowed - if (this.isRecovering() && (this.disallowWritesInRecovering || + if (this.isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering"); } @@ -5469,8 +5496,8 @@ public class HRegion implements HeapSize { // , Writable{ } else { switch(durability) { case USE_DEFAULT: - // do what CF defaults to - if (!isDeferredLogSyncEnabled()) { + // do what table defaults to + if (shouldSyncLog()) { this.log.sync(txid); } break; @@ -5493,10 +5520,11 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * check if current region is deferred sync enabled. + * Check whether we should sync the log from the table's durability settings */ - private boolean isDeferredLogSyncEnabled() { - return (this.htableDescriptor.isDeferredLogFlush() && !this.deferredLogSyncDisabled); + private boolean shouldSyncLog() { + return this.deferredLogSyncDisabled || + durability.ordinal() > Durability.ASYNC_WAL.ordinal(); } /** diff --git hbase-server/src/main/ruby/hbase/admin.rb hbase-server/src/main/ruby/hbase/admin.rb index fc88928..213f7a8 100644 --- hbase-server/src/main/ruby/hbase/admin.rb +++ hbase-server/src/main/ruby/hbase/admin.rb @@ -261,6 +261,7 @@ module Hbase htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY] htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE] htd.setDeferredLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH] + htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY))) if arg[DURABILITY] set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] @@ -469,6 +470,7 @@ module Hbase htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY] htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE] htd.setDeferredLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH] + htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY))) if arg[DURABILITY] set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java index bc8e72c..19d0af7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java @@ -27,6 +27,7 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -46,14 +47,14 @@ public class TestHTableDescriptor { HTableDescriptor htd = new HTableDescriptor(HTableDescriptor.META_TABLEDESC); final int v = 123; htd.setMaxFileSize(v); - htd.setDeferredLogFlush(true); + htd.setDurability(Durability.ASYNC_WAL); htd.setReadOnly(true); byte [] bytes = htd.toByteArray(); HTableDescriptor deserializedHtd = HTableDescriptor.parseFrom(bytes); assertEquals(htd, deserializedHtd); assertEquals(v, deserializedHtd.getMaxFileSize()); assertTrue(deserializedHtd.isReadOnly()); - assertTrue(deserializedHtd.isDeferredLogFlush()); + assertEquals(Durability.ASYNC_WAL, deserializedHtd.getDurability()); } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 796c9c4..8937b17 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -18,6 +18,14 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -27,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -56,6 +65,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -3868,6 +3878,107 @@ public class TestHRegion extends HBaseTestCase { assertEquals(Bytes.toBytes("value1"), kvs.get(0).getValue()); } + @Test + public void testDurability() throws Exception { + String method = "testDurability"; + // there are 5 x 5 cases: + // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) + + // expected cases for append and sync wal + durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false); + + durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false); + + durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false); + + durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false); + + durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false); + durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false); + + // expected cases for async wal + // do not sync for deferred flush with large optionallogflushinterval + conf.setLong("hbase.regionserver.optionallogflushinterval", Integer.MAX_VALUE); + durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false); + durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false); + + // now small deferred log flush optionallogflushinterval, expect sync + conf.setLong("hbase.regionserver.optionallogflushinterval", 5); + durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); + durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); + durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true); + durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true); + durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true); + durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true); + + // expect skip wal cases + durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false); + + } + + private void durabilityTest(String method, Durability tableDurability, + Durability mutationDurability, long timeout, boolean expectAppend, + final boolean expectSync, final boolean expectSyncFromLogSyncer) throws Exception { + method = method + "_" + tableDurability.name() + "_" + mutationDurability.name(); + byte[] tableName = Bytes.toBytes(method); + byte[] family = Bytes.toBytes("family"); + Path logDir = new Path(new Path(DIR + method), "log"); + HLog hlog = HLogFactory.createHLog(fs, logDir, UUID.randomUUID().toString(), conf); + final HLog log = spy(hlog); + this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, method, conf, false, + tableDurability, log, new byte[][] {family}); + + Put put = new Put(Bytes.toBytes("r1")); + put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1")); + put.setDurability(mutationDurability); + region.put(put); + + //verify append called or not + verify(log, expectAppend ? times(1) : never()) + .appendNoSync((HRegionInfo)any(), eq(tableName), + (WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any()); + + //verify sync called or not + if (expectSync || expectSyncFromLogSyncer) { + TEST_UTIL.waitFor(timeout, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + try { + if (expectSync) { + verify(log, times(1)).sync(anyLong()); //Hregion calls this one + } else if (expectSyncFromLogSyncer) { + verify(log, times(1)).sync(); //log syncer calls this one + } + } catch (Throwable ignore) {} + return true; + } + }); + } else { + verify(log, never()).sync(anyLong()); + verify(log, never()).sync(); + } + + hlog.close(); + region.close(); + } + private void putData(int startRow, int numRows, byte [] qf, byte [] ...families) throws IOException { @@ -3997,6 +4108,13 @@ public class TestHRegion extends HBaseTestCase { return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families); } + private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, + String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) + throws IOException { + return initHRegion(tableName, startKey, stopKey, callingMethod, conf, isReadOnly, + Durability.SYNC_WAL, null, families); + } + /** * @param tableName * @param startKey @@ -4009,7 +4127,8 @@ public class TestHRegion extends HBaseTestCase { * @return A region on which you must call {@link HRegion#closeHRegion(HRegion)} when done. */ private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey, - String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families) + String callingMethod, Configuration conf, boolean isReadOnly, Durability durability, + HLog hlog, byte[]... families) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); htd.setReadOnly(isReadOnly); @@ -4019,6 +4138,7 @@ public class TestHRegion extends HBaseTestCase { hcd.setMaxVersions(Integer.MAX_VALUE); htd.addFamily(hcd); } + htd.setDurability(durability); HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false); Path path = new Path(DIR + callingMethod); FileSystem fs = FileSystem.get(conf); @@ -4027,7 +4147,7 @@ public class TestHRegion extends HBaseTestCase { throw new IOException("Failed delete of " + path); } } - return HRegion.createHRegion(info, path, conf, htd); + return HRegion.createHRegion(info, path, conf, htd, hlog); } /**