diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index acaecf1..a89d169 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3009,6 +3009,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean locked = false; // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; + Map>[] familyMapsFromCoprocessors = null; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; @@ -3119,6 +3120,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) { return 0L; + } else if (miniBatchOp.operationsFromCoprocessors!=null) { + familyMapsFromCoprocessors = new Map[miniBatchOp.operationsFromCoprocessors.length]; + for(int i = 0; i < miniBatchOp.operationsFromCoprocessors.length; i++) { + Mutation mutation = (Mutation)miniBatchOp.operationsFromCoprocessors[i]; + Map> familyMap = mutation.getFamilyCellMap(); + // store the family map reference to allow for mutations + familyMapsFromCoprocessors[i] = familyMap; + } } } @@ -3201,11 +3210,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // STEP 5. Write back to memstore long addedSize = 0; - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + for (int i = firstIndex; i < ( lastIndexExclusive + (familyMapsFromCoprocessors==null ? 0 : + familyMapsFromCoprocessors.length)); i++) { + if (i < lastIndexExclusive && batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; } - addedSize += applyFamilyMapToMemstore(familyMaps[i], replay, + addedSize += applyFamilyMapToMemstore(i < lastIndexExclusive ? familyMaps[i]: + familyMapsFromCoprocessors[i-lastIndexExclusive], replay, replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); } @@ -3295,9 +3306,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // call the coprocessor hook to do any finalization steps // after the put is done MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, - lastIndexExclusive); + new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java index 2b12dec..fbeb3b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.Arrays; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -36,6 +38,8 @@ public class MiniBatchOperationInProgress { private final WALEdit[] walEditsFromCoprocessors; private final int firstIndex; private final int lastIndexExclusive; + + T[] operationsFromCoprocessors; public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails, WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) { @@ -103,4 +107,8 @@ public class MiniBatchOperationInProgress { } return this.firstIndex + index; } + + public void addOperations(T[] newOperations) { + this.operationsFromCoprocessors = newOperations; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java new file mode 100644 index 0000000..09ccee8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java @@ -0,0 +1,116 @@ +package org.apache.hadoop.hbase.coprocessor; + +import static junit.framework.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRegionObserverForAddingMutationsFromCoprocessors { + + private static HBaseTestingUtility util; + private static final TableName tableName = TableName.valueOf("test"); + private static final byte[] dummy = Bytes.toBytes("dummy"); + private static final byte[] row1 = Bytes.toBytes("r1"); + private static final byte[] row2 = Bytes.toBytes("r2"); + private static final byte[] row3 = Bytes.toBytes("r3"); + private static final byte[] test = Bytes.toBytes("test"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + TestCoprocessor.class.getName()); + util = new HBaseTestingUtility(conf); + util.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + Admin admin = util.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + } + admin.deleteTable(tableName); + } + util.createTable(tableName, new byte[][] {dummy, test}); + } + + /** + * Test various multiput operations. + * @throws Exception + */ + @Test + public void testMulti() throws Exception { + Table t = util.getConnection().getTable(tableName); + List puts = new ArrayList(); + Put p = new Put(row1); + p.addColumn(test,dummy,dummy); + puts.add(p); + t.put(puts); + Scan s = new Scan(); + ResultScanner scanner = t.getScanner(s); + int i = 0; + for(Result r: scanner) { + i++; + } + assertEquals(i, 3); + scanner.close(); + t.close(); + } + + public static class TestCoprocessor extends BaseRegionObserver { + @Override + public void preBatchMutate( + ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) + throws IOException { + if(c.getEnvironment().getRegion().getTableDesc().getTableName().equals(tableName)) { + Put[] puts = new Put[2]; + Put p = new Put(row2); + p.addColumn(test,dummy,dummy); + puts[0] = p; + p = new Put(row3); + p.addColumn(test,dummy,dummy); + puts[1] = p; + miniBatchOp.addOperations(puts); + WALEdit edit = new WALEdit(); + for(Put p1: puts) { + List list = p1.get(test, dummy); + for(Cell cell: list) { + edit.add(cell); + } + } + miniBatchOp.setWalEdit(0, edit); + } + } + } +}