Index: src/main/java/org/apache/hadoop/hbase/client/Durability.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/Durability.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/client/Durability.java (working copy)
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+/**
+ * Enum describing the durability guarantees for {@link Mutation}
+ * Note that the items must be sorted in order of increasing durability
+ */
+public enum Durability {
+ /**
+ * Use the column family's default setting to determine durability.
+ * This must remain the first option.
+ */
+ USE_DEFAULT,
+ /**
+ * Do not write the Mutation to the WAL
+ */
+ SKIP_WAL,
+ /**
+ * Write the Mutation to the WAL asynchronously
+ */
+ ASYNC_WAL,
+ /**
+ * Write the Mutation to the WAL synchronously.
+ * The data is flushed to the filesystem implementation, but not necessarily to disk.
+ * For HDFS this will flush the data to the designated number of DataNodes.
+ * See HADOOP-6313
+ */
+ SYNC_WAL,
+ /**
+ * Write the Mutation to the WAL synchronously and force the entries to disk.
+ * (Note: this is currently not supported and will behave identical to {@link #SYNC_WAL})
+ * See HADOOP-6313
+ */
+ FSYNC_WAL;
+
+ public static Durability valueOf(int value) {
+ switch (value) {
+ case 0: return USE_DEFAULT;
+ case 1: return SKIP_WAL;
+ case 2: return ASYNC_WAL;
+ case 3: return SYNC_WAL;
+ case 4: return FSYNC_WAL;
+ default: return null;
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/client/Mutation.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/Mutation.java (revision 1467146)
+++ src/main/java/org/apache/hadoop/hbase/client/Mutation.java (working copy)
@@ -34,6 +34,7 @@
public abstract class Mutation extends OperationWithAttributes implements Row {
// Attribute used in Mutations to indicate the originating cluster.
private static final String CLUSTER_ID_ATTR = "_c.id_";
+ private static final String DURABILITY_ID_ATTR = "_d.id_";
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
@@ -122,10 +123,28 @@
* @param write true if edits should be written to WAL, false if not
*/
public void setWriteToWAL(boolean write) {
- this.writeToWAL = write;
+ setDurability(write ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
/**
+ * Set the durability for this mutation
+ * @param d
+ */
+ public void setDurability(Durability d) {
+ setAttribute(DURABILITY_ID_ATTR, Bytes.toBytes(d.ordinal()));
+ this.writeToWAL = d != Durability.SKIP_WAL;
+ }
+
+ /** Get the current durability */
+ public Durability getDurability() {
+ byte[] attr = getAttribute(DURABILITY_ID_ATTR);
+ if (attr == null) {
+ return writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
+ }
+ return Durability.valueOf(Bytes.toInt(attr));
+ }
+
+ /**
* Method for retrieving the put's familyMap
* @return familyMap
*/
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1467146)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -84,6 +84,7 @@
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -2380,6 +2381,8 @@
// ------------------------------------
// STEP 4. Build WAL edit
// ----------------------------------
+ Durability durability = Durability.USE_DEFAULT;
+
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing
if (batchOp.retCodeDetails[i].getOperationStatusCode()
@@ -2389,12 +2392,17 @@
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
Mutation m = batchOp.operations[i].getFirst();
- if (!m.getWriteToWAL()) {
+ Durability tmpDur = m.getDurability();
+ if (tmpDur.ordinal() > durability.ordinal()) {
+ durability = tmpDur;
+ }
+ if (tmpDur == Durability.SKIP_WAL) {
if (m instanceof Put) {
recordPutWithoutWal(m.getFamilyMap());
}
continue;
}
+
// Add WAL edits by CP
WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
if (fromCP != null) {
@@ -2429,7 +2437,7 @@
// STEP 7. Sync wal.
// -------------------------
if (walEdit.size() > 0) {
- syncOrDefer(txid);
+ syncOrDefer(txid, durability);
}
walSyncSuccessful = true;
// calling the post CP hook for batch mutation
@@ -4856,6 +4864,7 @@
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
+ Durability durability = Durability.USE_DEFAULT;
try {
// 5. Check mutations and apply edits to a single WALEdit
for (Mutation m : mutations) {
@@ -4873,6 +4882,10 @@
"Action must be Put or Delete. But was: "
+ m.getClass().getName());
}
+ Durability tmpDur = m.getDurability();
+ if (tmpDur.ordinal() > durability.ordinal()) {
+ durability = tmpDur;
+ }
if (m.getWriteToWAL()) {
addFamilyMapToWALEdit(m.getFamilyMap(), walEdit);
}
@@ -4904,7 +4917,7 @@
// 9. sync WAL if required
if (walEdit.size() > 0) {
- syncOrDefer(txid);
+ syncOrDefer(txid, durability);
}
walSyncSuccessful = true;
@@ -5122,7 +5135,8 @@
releaseRowLock(lid);
}
if (writeToWAL) {
- syncOrDefer(txid); // sync the transaction log outside the rowlock
+ // sync the transaction log outside the rowlock
+ syncOrDefer(txid, append.getDurability());
}
} finally {
closeRegionOperation();
@@ -5269,7 +5283,8 @@
releaseRowLock(lid);
}
if (writeToWAL) {
- syncOrDefer(txid); // sync the transaction log outside the rowlock
+ // sync the transaction log outside the rowlock
+ syncOrDefer(txid, Durability.USE_DEFAULT);
}
} finally {
closeRegionOperation();
@@ -5366,7 +5381,8 @@
releaseRowLock(lid);
}
if (writeToWAL) {
- syncOrDefer(txid); // sync the transaction log outside the rowlock
+ // sync the transaction log outside the rowlock
+ syncOrDefer(txid, Durability.USE_DEFAULT);
}
} finally {
closeRegionOperation();
@@ -5835,9 +5851,32 @@
* @param txid should sync up to which transaction
* @throws IOException If anything goes wrong with DFS
*/
- private void syncOrDefer(long txid) throws IOException {
- if (this.getRegionInfo().isMetaRegion() || !isDeferredLogSyncEnabled()) {
+ private void syncOrDefer(long txid, Durability durability) throws IOException {
+ if (this.getRegionInfo().isMetaRegion()) {
this.log.sync(txid);
+ } else {
+ switch(durability) {
+ case USE_DEFAULT:
+ // do what CF defaults to
+ if (!isDeferredLogSyncEnabled()) {
+ this.log.sync(txid);
+ }
+ break;
+ case SKIP_WAL:
+ // nothing do to
+ break;
+ case ASYNC_WAL:
+ // defer the sync, unless we globally can't
+ if (this.deferredLogSyncDisabled) {
+ this.log.sync(txid);
+ }
+ break;
+ case SYNC_WAL:
+ case FSYNC_WAL:
+ // sync the WAL edit (SYNC and FSYNC treated the same for now)
+ this.log.sync(txid);
+ break;
+ }
}
}
Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java (working copy)
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for HLog write durability
+ */
+@Category(MediumTests.class)
+public class TestDurability {
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static FileSystem FS;
+ private static MiniDFSCluster CLUSTER;
+ private static Configuration CONF;
+ private static final Path DIR = TEST_UTIL.getDataTestDir("TestDurability");
+
+ private static byte[] FAMILY = Bytes.toBytes("family");
+ private static byte[] ROW = Bytes.toBytes("row");
+ private static byte[] COL = Bytes.toBytes("col");
+
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ CONF = TEST_UTIL.getConfiguration();
+ CONF.setLong("hbase.regionserver.optionallogflushinterval", 500*1000);
+ TEST_UTIL.startMiniDFSCluster(1);
+
+ CLUSTER = TEST_UTIL.getDFSCluster();
+ FS = CLUSTER.getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testDurability() throws Exception {
+ HLog wal = new HLog(FS, new Path(DIR, "hlogdir"),
+ new Path(DIR, "hlogdir_archive"), CONF);
+ byte[] tableName = Bytes.toBytes("TestDurability");
+ HRegion region = createHRegion(tableName, "region", wal, false);
+ HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, true);
+
+ region.put(newPut(null));
+
+ verifyHLogCount(wal, 1);
+
+ // a put through the deferred table does not write to the wal immdiately
+ deferredRegion.put(newPut(null));
+ verifyHLogCount(wal, 1);
+ // but will after we sync the wal
+ wal.sync();
+ verifyHLogCount(wal, 2);
+
+ // a put through a deferred table will be sync with the put sync'ed put
+ deferredRegion.put(newPut(null));
+ verifyHLogCount(wal, 2);
+ region.put(newPut(null));
+ verifyHLogCount(wal, 4);
+
+ // a put through a deferred table will be sync with the put sync'ed put
+ deferredRegion.put(newPut(Durability.USE_DEFAULT));
+ verifyHLogCount(wal, 4);
+ region.put(newPut(Durability.USE_DEFAULT));
+ verifyHLogCount(wal, 6);
+
+ // SKIP_WAL never writes to the wal
+ region.put(newPut(Durability.SKIP_WAL));
+ deferredRegion.put(newPut(Durability.SKIP_WAL));
+ verifyHLogCount(wal, 6);
+ wal.sync();
+ verifyHLogCount(wal, 6);
+
+ // async overrides sync table default
+ region.put(newPut(Durability.ASYNC_WAL));
+ deferredRegion.put(newPut(Durability.ASYNC_WAL));
+ verifyHLogCount(wal, 6);
+ wal.sync();
+ verifyHLogCount(wal, 8);
+
+ // sync overrides async table default
+ region.put(newPut(Durability.SYNC_WAL));
+ deferredRegion.put(newPut(Durability.SYNC_WAL));
+ verifyHLogCount(wal, 10);
+
+ // fsync behaves like sync
+ region.put(newPut(Durability.FSYNC_WAL));
+ deferredRegion.put(newPut(Durability.FSYNC_WAL));
+ verifyHLogCount(wal, 12);
+ }
+
+ private Put[] newPut(Durability durability) {
+ Put p = new Put(ROW);
+ p.add(FAMILY, COL, COL);
+ if (durability != null) {
+ p.setDurability(durability);
+ }
+ return new Put[]{p};
+ }
+
+ private void verifyHLogCount(HLog log, int expected) throws Exception {
+ Path walPath = log.computeFilename();
+ HLog.Reader reader = HLog.getReader(FS, walPath, CONF);
+ int count = 0;
+ HLog.Entry entry = new HLog.Entry();
+ while (reader.next(entry) != null) count++;
+ reader.close();
+ assertEquals(expected, count);
+ }
+
+ // lifted from TestAtomicOperation
+ private HRegion createHRegion (byte [] tableName, String callingMethod, HLog log, boolean isDeferredLogFlush)
+ throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.setDeferredLogFlush(isDeferredLogFlush);
+ HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+ htd.addFamily(hcd);
+ HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
+ Path path = new Path(DIR + callingMethod);
+ if (FS.exists(path)) {
+ if (!FS.delete(path, true)) {
+ throw new IOException("Failed delete of " + path);
+ }
+ }
+ return HRegion.createHRegion(info, path, HBaseConfiguration.create(), htd, log);
+ }
+
+}