Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1332774) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1861,10 +1861,12 @@ T[] operations; int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; + WALEdit[] walEditsFromCoprocessors; public BatchOperationInProgress(T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; + this.walEditsFromCoprocessors = new WALEdit[operations.length]; Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); } @@ -1901,14 +1903,21 @@ BatchOperationInProgress> batchOp = new BatchOperationInProgress>(putsAndLocks); + boolean initialized = false; + while (!batchOp.isDone()) { checkReadOnly(); checkResources(); long newSize; startRegionOperation(); - this.writeRequestsCount.increment(); + try { + if (!initialized) { + this.writeRequestsCount.increment(); + doPrePutHook(batchOp); + initialized = true; + } long addedSize = doMiniBatchPut(batchOp); newSize = this.addAndGetGlobalMemstoreSize(addedSize); } finally { @@ -1921,31 +1930,39 @@ return batchOp.retCodeDetails; } - @SuppressWarnings("unchecked") - private long doMiniBatchPut( - BatchOperationInProgress> batchOp) throws IOException { - final String tableName = getTableDesc().getNameAsString(); - - // The set of columnFamilies first seen. - Set cfSet = null; - // variable to note if all Put items are for the same CF -- metrics related - boolean cfSetConsistent = true; - long startTimeMs = EnvironmentEdgeManager.currentTimeMillis(); - + private void doPrePutHook(BatchOperationInProgress> batchOp) + throws IOException { + /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); - /* Run coprocessor pre hook outside of locks to avoid deadlock */ if (coprocessorHost != null) { for (int i = 0; i < batchOp.operations.length; i++) { Pair nextPair = batchOp.operations[i]; Put put = nextPair.getFirst(); if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) { // pre hook says skip this Put - // mark as success and skip below + // mark as success and skip in doMiniBatchPut batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; } + if (!walEdit.isEmpty()) { + batchOp.walEditsFromCoprocessors[i] = walEdit; + walEdit = new WALEdit(); + } } } + } + @SuppressWarnings("unchecked") + private long doMiniBatchPut( + BatchOperationInProgress> batchOp) throws IOException { + + // The set of columnFamilies first seen. + Set cfSet = null; + // variable to note if all Put items are for the same CF -- metrics related + boolean cfSetConsistent = true; + long startTimeMs = EnvironmentEdgeManager.currentTimeMillis(); + + WALEdit walEdit = new WALEdit(); + MultiVersionConsistencyControl.WriteEntry w = null; long txid = 0; boolean walSyncSuccessful = false; @@ -2082,6 +2099,13 @@ Put p = batchOp.operations[i].getFirst(); if (!p.getWriteToWAL()) continue; + // Add WAL edits by CP + WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; + if (fromCP != null) { + for (KeyValue kv : fromCP.getKeyValues()) { + walEdit.add(kv); + } + } addFamilyMapToWALEdit(familyMaps[i], walEdit); }