Index: src/main/java/org/apache/hadoop/hbase/client/Mutation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Mutation.java (revision 1446918) +++ src/main/java/org/apache/hadoop/hbase/client/Mutation.java (working copy) @@ -34,6 +34,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row { // Attribute used in Mutations to indicate the originating cluster. private static final String CLUSTER_ID_ATTR = "_c.id_"; + private static final String DEFER_FLUSH_ATTR = "_d.f_"; protected byte [] row = null; protected long ts = HConstants.LATEST_TIMESTAMP; @@ -126,6 +127,20 @@ } /** + * Indicate for this mutation we can defer the log flush. + * @param defer + */ + public void setDeferWALFlush(boolean defer) { + setAttribute(DEFER_FLUSH_ATTR, Bytes.toBytes(defer)); + } + + /** Get deferred flush setting */ + public boolean getDeferWALFlush() { + byte[] attr = getAttribute(DEFER_FLUSH_ATTR); + return attr == null ? false : Bytes.toBoolean(attr); + } + + /** * Method for retrieving the put's familyMap * @return familyMap */ Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1446918) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -2279,6 +2279,7 @@ // ------------------------------------ // STEP 4. Build WAL edit // ---------------------------------- + boolean shouldSyncWal = false; for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing if (batchOp.retCodeDetails[i].getOperationStatusCode() @@ -2293,6 +2294,8 @@ recordPutWithoutWal(m.getFamilyMap()); } continue; + } else if (!shouldSyncWal) { + shouldSyncWal |= !m.getDeferWALFlush(); } // Add WAL edits by CP WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; @@ -2328,7 +2331,7 @@ // STEP 7. Sync wal. // ------------------------- if (walEdit.size() > 0) { - syncOrDefer(txid); + syncOrDefer(txid, shouldSyncWal); } walSyncSuccessful = true; // ------------------------------------------------------------------ @@ -4665,6 +4668,7 @@ byte[] byteNow = Bytes.toBytes(now); try { // 5. Check mutations and apply edits to a single WALEdit + boolean shouldSyncWal = false; for (Mutation m : mutations) { if (m instanceof Put) { Map> familyMap = m.getFamilyMap(); @@ -4682,6 +4686,9 @@ } if (m.getWriteToWAL()) { addFamilyMapToWALEdit(m.getFamilyMap(), walEdit); + if (!shouldSyncWal) { + shouldSyncWal |= !m.getDeferWALFlush(); + } } } @@ -4711,7 +4718,7 @@ // 9. sync WAL if required if (walEdit.size() > 0) { - syncOrDefer(txid); + syncOrDefer(txid, shouldSyncWal); } walSyncSuccessful = true; @@ -4928,7 +4935,7 @@ releaseRowLock(lid); } if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock + syncOrDefer(txid, !append.getDeferWALFlush()); // sync the transaction log outside the rowlock } } finally { closeRegionOperation(); @@ -5074,7 +5081,7 @@ releaseRowLock(lid); } if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock + syncOrDefer(txid, true); // sync the transaction log outside the rowlock } } finally { closeRegionOperation(); @@ -5170,7 +5177,7 @@ releaseRowLock(lid); } if (writeToWAL) { - syncOrDefer(txid); // sync the transaction log outside the rowlock + syncOrDefer(txid, true); // sync the transaction log outside the rowlock } } finally { closeRegionOperation(); @@ -5619,11 +5626,12 @@ * Calls sync with the given transaction ID if the region's table is not * deferring it. * @param txid should sync up to which transaction + * @param syncRequested true if the caller requested a sync * @throws IOException If anything goes wrong with DFS */ - private void syncOrDefer(long txid) throws IOException { + private void syncOrDefer(long txid, boolean syncRequested) throws IOException { if (this.regionInfo.isMetaRegion() || - !this.htableDescriptor.isDeferredLogFlush()) { + (!this.htableDescriptor.isDeferredLogFlush() && syncRequested)) { this.log.sync(txid); } }