Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1445231) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -185,8 +187,18 @@ public void postDelete(final ObserverContext e, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException { } + + @Override + public void preBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + } @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + } + + @Override public boolean preCheckAndPut(final ObserverContext e, final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final WritableByteArrayComparable comparator, Index: src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1445231) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -404,6 +406,27 @@ throws IOException; /** + * This will be called for every batch mutation operation happening at the server. This will be + * called after acquiring the locks on the mutating rows and after applying the proper timestamp + * for each Mutation at the server. The Mutation operation can contain Put/Delete. + * + * @param c + * @param miniBatchOp + * @throws IOException + */ + void preBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + + /** + * + * @param c + * @param miniBatchOp + * @throws IOException + */ + void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + + /** * Called before checkAndPut *

* Call CoprocessorEnvironment#bypass to skip default actions Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1445231) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -2258,6 +2258,15 @@ // ---------------------------------- w = mvcc.beginMemstoreInsert(); + // calling the pre CP hook for batch mutation + if (coprocessorHost != null) { + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>( + batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, + firstIndex, lastIndexExclusive); + if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; + } + // ------------------------------------ // STEP 3. Write back to memstore // Write to memstore. It is ok to write to memstore @@ -2331,6 +2340,15 @@ syncOrDefer(txid); } walSyncSuccessful = true; + // calling the post CP hook for batch mutation + if (coprocessorHost != null) { + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>( + batchOp.operations, batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, + firstIndex, lastIndexExclusive); + coprocessorHost.postBatchMutate(miniBatchOp); + } + // ------------------------------------------------------------------ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ Index: src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java (working copy) @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +public class MiniBatchOperationInProgress { + private final T[] operations; + private final OperationStatus[] retCodeDetails; + private final WALEdit[] walEditsFromCoprocessors; + private final int firstIndex; + private final int lastIndexExclusive; + private int nextIndexToProcess = -1; + + public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails, + WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) { + this.operations = operations; + this.retCodeDetails = retCodeDetails; + this.walEditsFromCoprocessors = walEditsFromCoprocessors; + this.firstIndex = firstIndex; + this.lastIndexExclusive = lastIndexExclusive; + } + + /** + * @return The number of operations(Mutations) involved in this batch. + */ + public int size() { + return this.lastIndexExclusive - this.firstIndex; + } + + /** + * @return Whether more operations(mutations) available in the batch + */ + public boolean hasNext() { + return this.nextIndexToProcess < this.lastIndexExclusive - 1; + } + + /** + * This advances the pointer to the next operation in the batch. Calls to setters for status and + * WALEdit will set these to the corresponding operation. Initially the pointer will be before the + * 1st element in this batch. Before calling {@link #getOperation()} or + * {@link #setOperationStatus(OperationStatus)} or {@link #setWalEdit(WALEdit)} one must call this + * method. + */ + public void next() { + if (this.nextIndexToProcess == -1) { + this.nextIndexToProcess = this.firstIndex; + return; + } + if (!hasNext()) { + throw new IllegalStateException("Reached the end of mini batch."); + } + this.nextIndexToProcess++; + } + + public void rewind() { + this.nextIndexToProcess = -1; + } + + /** + * @return The current operation(Mutation). + */ + public T getOperation() { + if (this.nextIndexToProcess == -1) { + throw new IllegalStateException("Call next() before getting operation"); + } + return operations[this.nextIndexToProcess]; + } + + /** + * Sets the status code for the current operation(Mutation). + * @param opStatus + */ + public void setOperationStatus(OperationStatus opStatus) { + if (this.nextIndexToProcess == -1) { + throw new IllegalStateException("Call next() before setting status"); + } + this.retCodeDetails[this.nextIndexToProcess] = opStatus; + } + + /** + * @return Gets the status code for the current operation(Mutation). + */ + public OperationStatus getOperationStatus() { + if (this.nextIndexToProcess == -1) { + throw new IllegalStateException("Call next() before getting status"); + } + return this.retCodeDetails[this.nextIndexToProcess]; + } + + /** + * Sets the walEdit for the current operation(Mutation). + * @param walEdit + */ + public void setWalEdit(WALEdit walEdit) { + if (this.nextIndexToProcess == -1) { + throw new IllegalStateException("Call next() before setting WALEdit"); + } + this.walEditsFromCoprocessors[this.nextIndexToProcess] = walEdit; + } + + /** + * @return Gets the walEdit for the current operation(Mutation). + */ + public WALEdit getWalEdit() { + if (this.nextIndexToProcess == -1) { + throw new IllegalStateException("Call next() before setting WALEdit"); + } + return this.walEditsFromCoprocessors[this.nextIndexToProcess]; + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1445231) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -885,8 +886,58 @@ } } } + + /** + * + * @param batchOp + * @return + * @throws IOException + */ + public boolean preBatchMutate( + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } /** + * + * @param batchOp + * @throws IOException + */ + public void postBatchMutate( + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + + /** * @param row row to check * @param family column family * @param qualifier column qualifier