Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1298561) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -4211,6 +4211,10 @@ } } + long txid = 0; + boolean walSyncSuccessful = false; + boolean locked = false; + // 2. acquire the row lock(s) acquiredLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { @@ -4225,6 +4229,7 @@ // 3. acquire the region lock this.updatesLock.readLock().lock(); + locked = true; // 4. Get a mvcc write number MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert(); @@ -4253,10 +4258,12 @@ } } - // 6. append/sync all edits at once - // TODO: Do batching as in doMiniBatchPut - this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit, - HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); + // 6. append all edits at once (don't sync) + if (walEdit.size() > 0) { + txid = this.log.appendNoSync(regionInfo, + this.htableDescriptor.getName(), walEdit, + HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); + } // 7. apply to memstore long addedSize = 0; @@ -4264,32 +4271,80 @@ addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w); } flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); - } finally { - // 8. roll mvcc forward - mvcc.completeMemstoreInsert(w); - // 9. release region lock + // 8. release locks this.updatesLock.readLock().unlock(); - } - // 10. run all coprocessor post hooks, after region lock is released - if (coprocessorHost != null) { - for (Mutation m : mutations) { - if (m instanceof Put) { - coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); - } else if (m instanceof Delete) { - coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL()); + locked = false; + if (acquiredLocks != null) { + // 11. release the row lock + for (Integer lid : acquiredLocks) { + releaseRowLock(lid); } + acquiredLocks = null; } + + // 9. sync WAL if required + if (walEdit.size() > 0 && + (this.regionInfo.isMetaRegion() || + !this.htableDescriptor.isDeferredLogFlush())) { + this.log.sync(txid); + } + walSyncSuccessful = true; + + // 10. advance mvcc + mvcc.completeMemstoreInsert(w); + w = null; + + // 11. run coprocessor post host hook + // after the WAL is sync'ed and all locks are released + // (similar to doMiniBatchPut) + if (coprocessorHost != null) { + for (Mutation m : mutations) { + if (m instanceof Put) { + coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); + } else if (m instanceof Delete) { + coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL()); + } + } + } + } finally { + // 12. clean up if needed + if (!walSyncSuccessful) { + int kvsRolledback = 0; + for (Mutation m : mutations) { + for (Map.Entry> e : m.getFamilyMap() + .entrySet()) { + List kvs = e.getValue(); + byte[] family = e.getKey(); + Store store = getStore(family); + // roll back each kv + for (KeyValue kv : kvs) { + store.rollback(kv); + kvsRolledback++; + } + } + } + LOG.info("mutateRowWithLocks: rolled back " + kvsRolledback + + " KeyValues"); + } + + if (w != null) { + mvcc.completeMemstoreInsert(w); + } + + if (locked) { + this.updatesLock.readLock().unlock(); + } + + if (acquiredLocks != null) { + for (Integer lid : acquiredLocks) { + releaseRowLock(lid); + } + } } } finally { - if (acquiredLocks != null) { - // 11. release the row lock - for (Integer lid : acquiredLocks) { - releaseRowLock(lid); - } - } if (flush) { - // 12. Flush cache if needed. Do it outside update lock. + // 13. Flush cache if needed. Do it outside update lock. requestFlush(); } closeRegionOperation();