Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1296937) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -139,17 +139,17 @@ } @Override - public void prePut(final ObserverContext e, + public void prePut(final ObserverContext e, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException { } @Override - public void postPut(final ObserverContext e, + public void postPut(final ObserverContext e, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException { } @Override - public void preDelete(final ObserverContext e, + public void preDelete(final ObserverContext e, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException { } @@ -274,4 +274,30 @@ public void postWALRestore(ObserverContext env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { } + + @Override + public void preBatchPut(ObserverContext c, + List putList, WALEdit edit) throws IOException { + } + + @Override + public void postBatchPut( + ObserverContext ctx, + List putList, WALEdit walEdit) { + } + + @Override + public void preRollBackMemstore( + ObserverContext ctx, List putList) + throws IOException { + } + + @Override + public void postCompleteBatchPut( + ObserverContext ctx, List putList) + throws IOException { + } + + + } Index: src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1296937) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -276,12 +276,13 @@ * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - void prePut(final ObserverContext c, + void prePut(final ObserverContext c, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException; /** - * Called after the client stores a value. + * Called after the client stores a value and if the put + * is successful. *

* Call CoprocessorEnvironment#complete to skip any subsequent chained * coprocessors @@ -291,7 +292,7 @@ * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - void postPut(final ObserverContext c, + void postPut(final ObserverContext c, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException; @@ -308,7 +309,7 @@ * @param writeToWAL true if the change should be written to the WAL * @throws IOException if an error occurred on the coprocessor */ - void preDelete(final ObserverContext c, + void preDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException; @@ -341,7 +342,7 @@ * @param compareOp the comparison operation * @param comparator the comparator * @param put data to put if check succeeds - * @param result + * @param result * @return the return value to return to client if bypassing default * processing * @throws IOException if an error occurred on the coprocessor @@ -388,7 +389,7 @@ * @param compareOp the comparison operation * @param comparator the comparator * @param delete delete to commit if check succeeds - * @param result + * @param result * @return the value to return to client if bypassing default processing * @throws IOException if an error occurred on the coprocessor */ @@ -649,4 +650,60 @@ */ void postWALRestore(final ObserverContext ctx, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException; + + + /** + * Internally the puts are handled as a batch + * Called before actual put operation starts in the region. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param ctx the environment provided by the region server + * @param putList list of puts + * @param edit The WALEdit object that will be written to the wal + * @throws IOException if an error occurred on the coprocessor + */ + void preBatchPut(final ObserverContext ctx, + final List putList, final WALEdit edit) + throws IOException; + + /** + * Internally the puts are handled as a batch. + * Called after actual batch put operation completes in the region. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param ctx the environment provided by the region server + * @param putList list of puts + * @param walEdit The WALEdit object that will be written to the wal + */ + void postBatchPut(final ObserverContext ctx, + final List putList, WALEdit walEdit) throws IOException; + + /** + * Called before rolling back the memstore incase the current set of puts + * failed + * + * @param ctx the environment provided by the region server + * @param putList list of puts + * @throws IOException + */ + void preRollBackMemstore( + final ObserverContext ctx, + final List putList) throws IOException; + + /** + * Called after the completion of put and will be called even if the + * put operation fails + * @param ctx + * @param putList + * @throws IOException + */ + void postCompleteBatchPut( + final ObserverContext ctx, List putList) + throws IOException; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1305831) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -2135,6 +2135,26 @@ // ---------------------------------- w = mvcc.beginMemstoreInsert(); + //------------------------------------------------ + // Call the coprocessor hook to do the actual put. + // Here the walDdit is updated with the actual timestamp + // for the kv. + // This hook will be called for all the puts that are currently + // acquired the locks + //------------------------------------------------- + if (coprocessorHost != null) { + List putList = new ArrayList(); + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // only for successful puts + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + continue; + } + Put p = batchOp.operations[i].getFirst(); + putList.add(p); + } + coprocessorHost.preBatchPut(putList, walEdit); + } + // ------------------------------------ // STEP 3. Write back to memstore // Write to memstore. It is ok to write to memstore @@ -2198,6 +2218,19 @@ this.log.sync(txid); } walSyncSuccessful = true; + // hook to complete the actual put + if (coprocessorHost != null) { + List putList = new ArrayList(); + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // only for successful puts + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { + continue; + } + Put p = batchOp.operations[i].getFirst(); + putList.add(p); + } + coprocessorHost.postBatchPut(putList, walEdit); + } // ------------------------------------------------------------------ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ @@ -2228,6 +2261,18 @@ // if the wal sync was unsuccessful, remove keys from memstore if (!walSyncSuccessful) { + if (coprocessorHost != null) { + List putList = new ArrayList(); + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // only for successful puts + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { + continue; + } + Put p = batchOp.operations[i].getFirst(); + putList.add(p); + } + coprocessorHost.preRollBackMemStore(putList); + } rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); } if (w != null) mvcc.completeMemstoreInsert(w); @@ -2242,6 +2287,21 @@ } } + // call the coprocessor hook to do to any finalization steps + // after the put is done + if (coprocessorHost != null) { + List putList = new ArrayList(); + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // only for successful puts + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { + continue; + } + Put p = batchOp.operations[i].getFirst(); + putList.add(p); + } + coprocessorHost.postCompleteBatchPut(putList); + } + // do after lock final long endTimeMs = EnvironmentEdgeManager.currentTimeMillis(); if (metricPrefix == null) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1229039) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; import java.io.IOException; @@ -666,6 +667,90 @@ return bypass; } + public boolean preBatchPut(final List putList, final WALEdit edit) + throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).preBatchPut(ctx, putList, edit); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } + + public boolean postBatchPut(final List putList, final WALEdit walEdit) + throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postBatchPut(ctx, + putList, walEdit); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } + + public boolean preRollBackMemStore(final List putList) + throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()) + .preRollBackMemstore(ctx, putList); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } + + public boolean postCompleteBatchPut(final List putList) throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postCompleteBatchPut(ctx, putList); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } /** * @param put The Put object * @param edit The WALEdit object. Index: src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (revision 1210745) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (working copy) @@ -85,6 +85,9 @@ boolean hadPostScannerClose = false; boolean hadPreScannerOpen = false; boolean hadPostScannerOpen = false; + boolean hadPreActualPut = false; + boolean hadPostActualPut = false; + boolean hadPostCompleteBatchPut = false; @Override public void preOpen(ObserverContext c) { @@ -258,7 +261,7 @@ } @Override - public void prePut(final ObserverContext c, + public void prePut(final ObserverContext c, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException { Map> familyMap = put.getFamilyMap(); @@ -318,7 +321,7 @@ } @Override - public void preDelete(final ObserverContext c, + public void preDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException { Map> familyMap = delete.getFamilyMap(); @@ -332,7 +335,7 @@ } @Override - public void postDelete(final ObserverContext c, + public void postDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException { Map> familyMap = delete.getFamilyMap(); @@ -378,6 +381,24 @@ } @Override + public void preBatchPut(ObserverContext c, + final List putList, WALEdit edit) throws IOException { + hadPreActualPut = true; + } + + @Override + public void postBatchPut(ObserverContext ctx, + List putList, WALEdit walEdit) { + hadPostActualPut = true; + } + + @Override + public void postCompleteBatchPut( + ObserverContext ctx, List putList) + throws IOException { + hadPostCompleteBatchPut = true; + } + @Override public Result postIncrement(final ObserverContext c, final Increment increment, final Result result) throws IOException { hadPostIncrement = true; @@ -430,4 +451,16 @@ public boolean hadDeleted() { return hadPreDeleted && hadPostDeleted; } + + public boolean hadPreBatchPut(){ + return hadPreActualPut; + } + + public boolean hadPostBatchPut(){ + return hadPostActualPut; + } + + public boolean hadPostCompleteBatchPut() { + return hadPostCompleteBatchPut; + } } Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (revision 1300035) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (working copy) @@ -85,9 +85,9 @@ HTable table = util.createTable(tableName, new byte[][] {A, B, C}); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, + "hadDelete", "hadPreBatchPut", "hadPostBatchPut", "hadPostCompleteBatchPut"}, TEST_TABLE, - new Boolean[] {false, false, false, false, false}); + new Boolean[] {false, false, false, false, false, false, false, false}); Put put = new Put(ROW); put.add(A, A, A); @@ -97,9 +97,9 @@ verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, + "hadDelete", "hadPreBatchPut", "hadPostBatchPut", "hadPostCompleteBatchPut"}, TEST_TABLE, - new Boolean[] {false, false, true, true, false} + new Boolean[] {false, false, true, true, false, true, true, true} ); Get get = new Get(ROW); @@ -110,9 +110,9 @@ verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, + "hadDelete","hadPreBatchPut", "hadPostBatchPut", "hadPostCompleteBatchPut"}, TEST_TABLE, - new Boolean[] {true, true, true, true, false} + new Boolean[] {true, true, true, true, false, true, true, true} ); Delete delete = new Delete(ROW); @@ -299,21 +299,21 @@ public boolean next(List results) throws IOException { return next(results, -1); } - + @Override - public boolean next(List results, String metric) + public boolean next(List results, String metric) throws IOException { return next(results, -1, metric); } @Override - public boolean next(List results, int limit) + public boolean next(List results, int limit) throws IOException{ return next(results, limit, null); } @Override - public boolean next(List results, int limit, String metric) + public boolean next(List results, int limit, String metric) throws IOException { List internalResults = new ArrayList(); boolean hasMore;