Index: src/main/java/org/apache/hadoop/hbase/client/Mutation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Mutation.java (revision 1450613) +++ src/main/java/org/apache/hadoop/hbase/client/Mutation.java (working copy) @@ -34,6 +34,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row { // Attribute used in Mutations to indicate the originating cluster. private static final String CLUSTER_ID_ATTR = "_c.id_"; + private static final String DEFER_FLUSH_ATTR = "_d.f_"; protected byte [] row = null; protected long ts = HConstants.LATEST_TIMESTAMP; @@ -111,6 +112,7 @@ /** * @return true if edits should be applied to WAL, false if not + * @deprecated Use {@link #getWriteGuarantee()} instead */ public boolean getWriteToWAL() { return this.writeToWAL; @@ -120,12 +122,43 @@ * Set whether this Delete should be written to the WAL or not. * Not writing the WAL means you may lose edits on server crash. * @param write true if edits should be written to WAL, false if not + * @deprecated Use {@link #setWriteGuarantee(WriteGuarantee)} instead */ public void setWriteToWAL(boolean write) { this.writeToWAL = write; + setAttribute(DEFER_FLUSH_ATTR, null); } /** + * Set the write guarantee for this mutation + * @param wg + */ + public void setWriteGuarantee(WriteGuarantee wg) { + this.writeToWAL = true; + setAttribute(DEFER_FLUSH_ATTR, null); + switch (wg) { + case SKIP_WAL: + this.writeToWAL = false; + break; + case ASYNC_WAL: + setAttribute(DEFER_FLUSH_ATTR, Bytes.toBytes(true)); + break; + case SYNC_WAL: + default: + } + } + + /** Get the current write guarantee */ + public WriteGuarantee getWriteGuarantee() { + if (!writeToWAL) { + return WriteGuarantee.SKIP_WAL; + } + byte[] attr = getAttribute(DEFER_FLUSH_ATTR); + return attr != null && Bytes.toBoolean(attr) ? + WriteGuarantee.ASYNC_WAL : WriteGuarantee.SYNC_WAL; + } + + /** * Method for retrieving the put's familyMap * @return familyMap */ Index: src/main/java/org/apache/hadoop/hbase/client/WriteGuarantee.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/WriteGuarantee.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/WriteGuarantee.java (working copy) @@ -0,0 +1,20 @@ +package org.apache.hadoop.hbase.client; + +/** + * Enum describing the write guarantees for {@link Mutation} + */ +public enum WriteGuarantee { + /** + * Do not write the Mutation of the WAL + */ + SKIP_WAL, + /** + * Write the Mutation of the WAL asynchronously + */ + ASYNC_WAL, + /** + * Write the Mutation of the WAL synchronously. + * This is the default. + */ + SYNC_WAL +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1450613) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowLock; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.WriteGuarantee; import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.client.coprocessor.ExecResult; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -2292,6 +2293,7 @@ // ------------------------------------ // STEP 4. Build WAL edit // ---------------------------------- + boolean shouldSyncWal = false; for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing if (batchOp.retCodeDetails[i].getOperationStatusCode() @@ -2301,11 +2303,14 @@ batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; Mutation m = batchOp.operations[i].getFirst(); + // using getWriteToWal instead of getWriteGuarantee for performance if (!m.getWriteToWAL()) { if (m instanceof Put) { recordPutWithoutWal(m.getFamilyMap()); } continue; + } else if (!shouldSyncWal) { + shouldSyncWal = m.getWriteGuarantee() == WriteGuarantee.SYNC_WAL; } // Add WAL edits by CP WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; @@ -2341,7 +2346,7 @@ // STEP 7. Sync wal. // ------------------------- if (walEdit.size() > 0) { - syncOrDefer(txid); + syncOrDefer(txid, shouldSyncWal); } walSyncSuccessful = true; // calling the post CP hook for batch mutation @@ -4686,6 +4691,7 @@ byte[] byteNow = Bytes.toBytes(now); try { // 5. Check mutations and apply edits to a single WALEdit + boolean shouldSyncWal = false; for (Mutation m : mutations) { if (m instanceof Put) { Map> familyMap = m.getFamilyMap(); @@ -4703,6 +4709,9 @@ } if (m.getWriteToWAL()) { addFamilyMapToWALEdit(m.getFamilyMap(), walEdit); + if (!shouldSyncWal) { + shouldSyncWal = m.getWriteGuarantee() == WriteGuarantee.SYNC_WAL; + } } } @@ -4732,7 +4741,7 @@ // 9. sync WAL if required if (walEdit.size() > 0) { - syncOrDefer(txid); + syncOrDefer(txid, shouldSyncWal); } walSyncSuccessful = true; @@ -4949,7 +4958,8 @@ releaseRowLock(lid); } if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock + // sync the transaction log outside the rowlock + syncOrDefer(txid, append.getWriteGuarantee() == WriteGuarantee.SYNC_WAL); } } finally { closeRegionOperation(); @@ -5095,7 +5105,7 @@ releaseRowLock(lid); } if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock + syncOrDefer(txid, true); // sync the transaction log outside the rowlock } } finally { closeRegionOperation(); @@ -5191,7 +5201,7 @@ releaseRowLock(lid); } if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock + syncOrDefer(txid, true); // sync the transaction log outside the rowlock } } finally { closeRegionOperation(); @@ -5640,11 +5650,12 @@ * Calls sync with the given transaction ID if the region's table is not * deferring it. * @param txid should sync up to which transaction + * @param syncRequested true if the caller requested a sync * @throws IOException If anything goes wrong with DFS */ - private void syncOrDefer(long txid) throws IOException { + private void syncOrDefer(long txid, boolean syncRequested) throws IOException { if (this.regionInfo.isMetaRegion() || - !this.htableDescriptor.isDeferredLogFlush()) { + (!this.htableDescriptor.isDeferredLogFlush() && syncRequested)) { this.log.sync(txid); } }