Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1332812) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1680,10 +1680,12 @@ T[] operations; int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; + WALEdit[] walEditsFromCoprocessors; public BatchOperationInProgress(T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; + this.walEditsFromCoprocessors = new WALEdit[operations.length]; Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); } @@ -1720,14 +1722,20 @@ BatchOperationInProgress> batchOp = new BatchOperationInProgress>(putsAndLocks); + boolean initialized = false; + while (!batchOp.isDone()) { checkReadOnly(); checkResources(); long newSize; startRegionOperation(); - this.writeRequestsCount.increment(); try { + if (!initialized) { + this.writeRequestsCount.increment(); + doPrePutHook(batchOp); + initialized = true; + } long addedSize = doMiniBatchPut(batchOp); newSize = this.addAndGetGlobalMemstoreSize(addedSize); } finally { @@ -1740,24 +1748,33 @@ return batchOp.retCodeDetails; } - @SuppressWarnings("unchecked") - private long doMiniBatchPut( - BatchOperationInProgress> batchOp) throws IOException { - + private void doPrePutHook(BatchOperationInProgress> batchOp) + throws IOException { + /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); - /* Run coprocessor pre hook outside of locks to avoid deadlock */ if (coprocessorHost != null) { for (int i = 0; i < batchOp.operations.length; i++) { Pair nextPair = batchOp.operations[i]; Put put = nextPair.getFirst(); if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) { // pre hook says skip this Put - // mark as success and skip below + // mark as success and skip in doMiniBatchPut batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; } + if (!walEdit.isEmpty()) { + batchOp.walEditsFromCoprocessors[i] = walEdit; + walEdit = new WALEdit(); + } } } + } + @SuppressWarnings("unchecked") + private long doMiniBatchPut( + BatchOperationInProgress> batchOp) throws IOException { + + WALEdit walEdit = new WALEdit(); + long now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); boolean locked = false; @@ -1852,6 +1869,13 @@ Put p = batchOp.operations[i].getFirst(); if (!p.getWriteToWAL()) continue; + // Add WAL edits by CP + WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; + if (fromCP != null) { + for (KeyValue kv : fromCP.getKeyValues()) { + walEdit.add(kv); + } + } addFamilyMapToWALEdit(familyMaps[i], walEdit); }