Index: src/main/java/org/apache/hadoop/hbase/client/Durability.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Durability.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/Durability.java (working copy) @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +/** + * Enum describing the durability guarantees for {@link Mutation} + * Note that the items must be sorted in order of increasing durability + */ +public enum Durability { + /** + * Use the column family's default setting to determine durability. + * This must remain the first option. + */ + USE_DEFAULT, + /** + * Do not write the Mutation to the WAL + */ + SKIP_WAL, + /** + * Write the Mutation to the WAL asynchronously + */ + ASYNC_WAL, + /** + * Write the Mutation to the WAL synchronously. + * The data is flushed to the filesystem implementation, but not necessarily to disk. + * For HDFS this will flush the data to the designated number of DataNodes. + * See HADOOP-6313 + */ + SYNC_WAL, + /** + * Write the Mutation to the WAL synchronously and force the entries to disk. + * (Note: this is currently not supported and will behave identical to {@link #SYNC_WAL}) + * See HADOOP-6313 + */ + FSYNC_WAL; + + public static Durability valueOf(int value) { + switch (value) { + case 0: return USE_DEFAULT; + case 1: return SKIP_WAL; + case 2: return ASYNC_WAL; + case 3: return SYNC_WAL; + case 4: return FSYNC_WAL; + default: return null; + } + } +} Index: src/main/java/org/apache/hadoop/hbase/client/Mutation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Mutation.java (revision 1467146) +++ src/main/java/org/apache/hadoop/hbase/client/Mutation.java (working copy) @@ -34,6 +34,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row { // Attribute used in Mutations to indicate the originating cluster. private static final String CLUSTER_ID_ATTR = "_c.id_"; + private static final String DURABILITY_ID_ATTR = "_d.id_"; protected byte [] row = null; protected long ts = HConstants.LATEST_TIMESTAMP; @@ -122,10 +123,28 @@ * @param write true if edits should be written to WAL, false if not */ public void setWriteToWAL(boolean write) { - this.writeToWAL = write; + setDurability(write ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } /** + * Set the durability for this mutation + * @param d + */ + public void setDurability(Durability d) { + setAttribute(DURABILITY_ID_ATTR, Bytes.toBytes(d.ordinal())); + this.writeToWAL = d != Durability.SKIP_WAL; + } + + /** Get the current durability */ + public Durability getDurability() { + byte[] attr = getAttribute(DURABILITY_ID_ATTR); + if (attr == null) { + return writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL; + } + return Durability.valueOf(Bytes.toInt(attr)); + } + + /** * Method for retrieving the put's familyMap * @return familyMap */ Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1467146) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -2380,6 +2381,8 @@ // ------------------------------------ // STEP 4. Build WAL edit // ---------------------------------- + Durability durability = Durability.USE_DEFAULT; + for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing if (batchOp.retCodeDetails[i].getOperationStatusCode() @@ -2389,12 +2392,17 @@ batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; Mutation m = batchOp.operations[i].getFirst(); - if (!m.getWriteToWAL()) { + Durability tmpDur = m.getDurability(); + if (tmpDur.ordinal() > durability.ordinal()) { + durability = tmpDur; + } + if (tmpDur == Durability.SKIP_WAL) { if (m instanceof Put) { recordPutWithoutWal(m.getFamilyMap()); } continue; } + // Add WAL edits by CP WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; if (fromCP != null) { @@ -2429,7 +2437,7 @@ // STEP 7. Sync wal. // ------------------------- if (walEdit.size() > 0) { - syncOrDefer(txid); + syncOrDefer(txid, durability); } walSyncSuccessful = true; // calling the post CP hook for batch mutation @@ -4856,6 +4864,7 @@ long now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); + Durability durability = Durability.USE_DEFAULT; try { // 5. Check mutations and apply edits to a single WALEdit for (Mutation m : mutations) { @@ -4873,6 +4882,10 @@ "Action must be Put or Delete. But was: " + m.getClass().getName()); } + Durability tmpDur = m.getDurability(); + if (tmpDur.ordinal() > durability.ordinal()) { + durability = tmpDur; + } if (m.getWriteToWAL()) { addFamilyMapToWALEdit(m.getFamilyMap(), walEdit); } @@ -4904,7 +4917,7 @@ // 9. sync WAL if required if (walEdit.size() > 0) { - syncOrDefer(txid); + syncOrDefer(txid, durability); } walSyncSuccessful = true; @@ -5122,7 +5135,8 @@ releaseRowLock(lid); } if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock + // sync the transaction log outside the rowlock + syncOrDefer(txid, append.getDurability()); } } finally { closeRegionOperation(); @@ -5269,7 +5283,8 @@ releaseRowLock(lid); } if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock + // sync the transaction log outside the rowlock + syncOrDefer(txid, Durability.USE_DEFAULT); } } finally { closeRegionOperation(); @@ -5366,7 +5381,8 @@ releaseRowLock(lid); } if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock + // sync the transaction log outside the rowlock + syncOrDefer(txid, Durability.USE_DEFAULT); } } finally { closeRegionOperation(); @@ -5835,9 +5851,32 @@ * @param txid should sync up to which transaction * @throws IOException If anything goes wrong with DFS */ - private void syncOrDefer(long txid) throws IOException { - if (this.getRegionInfo().isMetaRegion() || !isDeferredLogSyncEnabled()) { + private void syncOrDefer(long txid, Durability durability) throws IOException { + if (this.getRegionInfo().isMetaRegion()) { this.log.sync(txid); + } else { + switch(durability) { + case USE_DEFAULT: + // do what CF defaults to + if (!isDeferredLogSyncEnabled()) { + this.log.sync(txid); + } + break; + case SKIP_WAL: + // nothing do to + break; + case ASYNC_WAL: + // defer the sync, unless we globally can't + if (this.deferredLogSyncDisabled) { + this.log.sync(txid); + } + break; + case SYNC_WAL: + case FSYNC_WAL: + // sync the WAL edit (SYNC and FSYNC treated the same for now) + this.log.sync(txid); + break; + } } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java (working copy) @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests for HLog write durability + */ +@Category(MediumTests.class) +public class TestDurability { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static FileSystem FS; + private static MiniDFSCluster CLUSTER; + private static Configuration CONF; + private static final Path DIR = TEST_UTIL.getDataTestDir("TestDurability"); + + private static byte[] FAMILY = Bytes.toBytes("family"); + private static byte[] ROW = Bytes.toBytes("row"); + private static byte[] COL = Bytes.toBytes("col"); + + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CONF = TEST_UTIL.getConfiguration(); + CONF.setLong("hbase.regionserver.optionallogflushinterval", 500*1000); + TEST_UTIL.startMiniDFSCluster(1); + + CLUSTER = TEST_UTIL.getDFSCluster(); + FS = CLUSTER.getFileSystem(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testDurability() throws Exception { + HLog wal = new HLog(FS, new Path(DIR, "hlogdir"), + new Path(DIR, "hlogdir_archive"), CONF); + byte[] tableName = Bytes.toBytes("TestDurability"); + HRegion region = createHRegion(tableName, "region", wal, false); + HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, true); + + region.put(newPut(null)); + + verifyHLogCount(wal, 1); + + // a put through the deferred table does not write to the wal immdiately + deferredRegion.put(newPut(null)); + verifyHLogCount(wal, 1); + // but will after we sync the wal + wal.sync(); + verifyHLogCount(wal, 2); + + // a put through a deferred table will be sync with the put sync'ed put + deferredRegion.put(newPut(null)); + verifyHLogCount(wal, 2); + region.put(newPut(null)); + verifyHLogCount(wal, 4); + + // a put through a deferred table will be sync with the put sync'ed put + deferredRegion.put(newPut(Durability.USE_DEFAULT)); + verifyHLogCount(wal, 4); + region.put(newPut(Durability.USE_DEFAULT)); + verifyHLogCount(wal, 6); + + // SKIP_WAL never writes to the wal + region.put(newPut(Durability.SKIP_WAL)); + deferredRegion.put(newPut(Durability.SKIP_WAL)); + verifyHLogCount(wal, 6); + wal.sync(); + verifyHLogCount(wal, 6); + + // async overrides sync table default + region.put(newPut(Durability.ASYNC_WAL)); + deferredRegion.put(newPut(Durability.ASYNC_WAL)); + verifyHLogCount(wal, 6); + wal.sync(); + verifyHLogCount(wal, 8); + + // sync overrides async table default + region.put(newPut(Durability.SYNC_WAL)); + deferredRegion.put(newPut(Durability.SYNC_WAL)); + verifyHLogCount(wal, 10); + + // fsync behaves like sync + region.put(newPut(Durability.FSYNC_WAL)); + deferredRegion.put(newPut(Durability.FSYNC_WAL)); + verifyHLogCount(wal, 12); + } + + private Put[] newPut(Durability durability) { + Put p = new Put(ROW); + p.add(FAMILY, COL, COL); + if (durability != null) { + p.setDurability(durability); + } + return new Put[]{p}; + } + + private void verifyHLogCount(HLog log, int expected) throws Exception { + Path walPath = log.computeFilename(); + HLog.Reader reader = HLog.getReader(FS, walPath, CONF); + int count = 0; + HLog.Entry entry = new HLog.Entry(); + while (reader.next(entry) != null) count++; + reader.close(); + assertEquals(expected, count); + } + + // lifted from TestAtomicOperation + private HRegion createHRegion (byte [] tableName, String callingMethod, HLog log, boolean isDeferredLogFlush) + throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.setDeferredLogFlush(isDeferredLogFlush); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); + htd.addFamily(hcd); + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + Path path = new Path(DIR + callingMethod); + if (FS.exists(path)) { + if (!FS.delete(path, true)) { + throw new IOException("Failed delete of " + path); + } + } + return HRegion.createHRegion(info, path, HBaseConfiguration.create(), htd, log); + } + +}