Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1332447) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1928,10 +1928,12 @@ T[] operations; int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; + WALEdit[] walEditsFromCoprocessors; public BatchOperationInProgress(T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; + this.walEditsFromCoprocessors = new WALEdit[operations.length]; Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); } @@ -1968,14 +1970,21 @@ BatchOperationInProgress> batchOp = new BatchOperationInProgress>(putsAndLocks); + boolean initialized = false; + while (!batchOp.isDone()) { checkReadOnly(); checkResources(); long newSize; startRegionOperation(); - this.writeRequestsCount.increment(); + try { + if (!initialized) { + this.writeRequestsCount.increment(); + doPrePutHook(batchOp); + initialized = true; + } long addedSize = doMiniBatchPut(batchOp); newSize = this.addAndGetGlobalMemstoreSize(addedSize); } finally { @@ -1988,6 +1997,32 @@ return batchOp.retCodeDetails; } + private void doPrePutHook(BatchOperationInProgress> batchOp) + throws IOException { + /* Run coprocessor pre hook outside of locks to avoid deadlock */ + WALEdit walEdit = new WALEdit(); + if (coprocessorHost != null) { + for (int i = 0 ; i < batchOp.operations.length; i++) { + Pair nextPair = batchOp.operations[i]; + Put put = nextPair.getFirst(); + if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) { + // pre hook says skip this Put + // mark as success and skip below + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + Preconditions.checkState(walEdit.isEmpty(), + "if CP skips an edit, cannot write to WAL"); + } + if (!walEdit.isEmpty()) { + Preconditions.checkState(put.getWriteToWAL(), + "CP may not write to WAL if put has flagged this off"); + batchOp.walEditsFromCoprocessors[i] = walEdit; + walEdit = new WALEdit(); + } + } + } + } + + @SuppressWarnings("unchecked") private long doMiniBatchPut( BatchOperationInProgress> batchOp) throws IOException { @@ -1999,22 +2034,11 @@ //The set of columnFamilies first seen. Set cfSet = null; + + WALEdit walEdit = new WALEdit(); long startTimeMs = EnvironmentEdgeManager.currentTimeMillis(); - WALEdit walEdit = new WALEdit(); - /* Run coprocessor pre hook outside of locks to avoid deadlock */ - if (coprocessorHost != null) { - for (int i = 0; i < batchOp.operations.length; i++) { - Pair nextPair = batchOp.operations[i]; - Put put = nextPair.getFirst(); - if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) { - // pre hook says skip this Put - // mark as success and skip below - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; - } - } - } MultiVersionConsistencyControl.WriteEntry w = null; long txid = 0; @@ -2153,8 +2177,17 @@ Put p = batchOp.operations[i].getFirst(); if (!p.getWriteToWAL()) continue; addFamilyMapToWALEdit(familyMaps[i], walEdit); + + // Include WAL edits by CP + WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; + if (fromCP != null) { + for (KeyValue kv : fromCP.getKeyValues()) { + walEdit.add(kv); + } + } } + // ------------------------- // STEP 5. Append the edit to WAL. Do not sync wal. // -------------------------