commit 8f106d3408af14e468d7f2166104f593f44493ea Author: Todd Lipcon Date: Sat Apr 28 20:06:45 2012 -0700 fix preput hook diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 394b90b..c60bcd9 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1912,10 +1912,12 @@ public class HRegion implements HeapSize { // , Writable{ T[] operations; int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; + WALEdit[] walEditsFromCoprocessors; public BatchOperationInProgress(T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; + this.walEditsFromCoprocessors = new WALEdit[operations.length]; Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); } @@ -1952,13 +1954,20 @@ public class HRegion implements HeapSize { // , Writable{ BatchOperationInProgress> batchOp = new BatchOperationInProgress>(putsAndLocks); + boolean initialized = false; + while (!batchOp.isDone()) { checkReadOnly(); checkResources(); long newSize; startRegionOperation(); - this.writeRequestsCount.increment(); + if (!initialized) { + this.writeRequestsCount.increment(); + doPrePutHook(batchOp); + initialized = true; + } + try { long addedSize = doMiniBatchPut(batchOp); newSize = this.addAndGetGlobalMemstoreSize(addedSize); @@ -1972,6 +1981,32 @@ public class HRegion implements HeapSize { // , Writable{ return batchOp.retCodeDetails; } + private void doPrePutHook(BatchOperationInProgress> batchOp) + throws IOException { + /* Run coprocessor pre hook outside of locks to avoid deadlock */ + WALEdit walEdit = new WALEdit(); + if (coprocessorHost != null) { + for (int i = 0 ; i < batchOp.operations.length; i++) { + Pair nextPair = batchOp.operations[i]; + Put put = nextPair.getFirst(); + if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) { + // pre hook says skip this Put + // mark as success and skip below + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + Preconditions.checkState(walEdit.isEmpty(), + "if CP skips an edit, cannot write to WAL"); + } + if (!walEdit.isEmpty()) { + Preconditions.checkState(put.getWriteToWAL(), + "CP may not write to WAL if put has flagged this off"); + batchOp.walEditsFromCoprocessors[i] = walEdit; + walEdit = new WALEdit(); + } + } + } + } + + @SuppressWarnings("unchecked") private long doMiniBatchPut( BatchOperationInProgress> batchOp) throws IOException { @@ -1983,22 +2018,11 @@ public class HRegion implements HeapSize { // , Writable{ //The set of columnFamilies first seen. Set cfSet = null; + + WALEdit walEdit = new WALEdit(); long startTimeMs = EnvironmentEdgeManager.currentTimeMillis(); - WALEdit walEdit = new WALEdit(); - /* Run coprocessor pre hook outside of locks to avoid deadlock */ - if (coprocessorHost != null) { - for (int i = 0; i < batchOp.operations.length; i++) { - Pair nextPair = batchOp.operations[i]; - Put put = nextPair.getFirst(); - if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) { - // pre hook says skip this Put - // mark as success and skip below - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; - } - } - } MultiVersionConsistencyControl.WriteEntry w = null; long txid = 0; @@ -2137,8 +2161,17 @@ public class HRegion implements HeapSize { // , Writable{ Put p = batchOp.operations[i].getFirst(); if (!p.getWriteToWAL()) continue; addFamilyMapToWALEdit(familyMaps[i], walEdit); + + // Include WAL edits by CP + WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; + if (fromCP != null) { + for (KeyValue kv : fromCP.getKeyValues()) { + walEdit.add(kv); + } + } } + // ------------------------- // STEP 5. Append the edit to WAL. Do not sync wal. // -------------------------