diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 72238cc..1191e7d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3118,6 +3118,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) { return 0L; + } else { + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (miniBatchOp.getOperationsFromCoprocessors(i) == null) { + continue; + } + // Else Coprocessor added more Mutations corresponding to the Mutation at this index. + for (int j = 0; j < miniBatchOp.getOperationsFromCoprocessors(i).length; j++) { + Mutation mutation = miniBatchOp.getOperationsFromCoprocessors(i)[j]; + // Acquire row locks. If not, the whole batch will fail. + acquiredRowLocks.add(getRowLock(mutation.getRow(), true)); + Map> cpFamilyMap = mutation.getFamilyCellMap(); + if (mutation.getDurability() == Durability.SKIP_WAL) { + recordMutationWithoutWal(cpFamilyMap); + } + + // Returned mutations from coprocessor correspond to the Mutation at index i. We can + // directly add the cells from those mutations to the familyMaps of this mutation. + mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later + } + } } } @@ -3294,9 +3314,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // call the coprocessor hook to do any finalization steps // after the put is done MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, - lastIndexExclusive); + new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); } @@ -3304,6 +3323,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void mergeFamilyMaps(Map> familyMap, + Map> toBeMerged) { + for (Map.Entry> entry : toBeMerged.entrySet()) { + List cells = familyMap.get(entry.getKey()); + if (cells == null) { + familyMap.put(entry.getKey(), entry.getValue()); + } else { + cells.addAll(entry.getValue()); + } + } + } + private void appendCurrentNonces(final Mutation mutation, final boolean replay, final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce) throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java index 2b12dec..cdbecac 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java @@ -18,20 +18,22 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; /** * Wraps together the mutations which are applied as a batch to the region and their operation - * status and WALEdits. + * status and WALEdits. * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preBatchMutate( * ObserverContext, MiniBatchOperationInProgress) * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#postBatchMutate( * ObserverContext, MiniBatchOperationInProgress) * @param T Pair<Mutation, Integer> pair of Mutations and associated rowlock ids . */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate("Coprocessors") public class MiniBatchOperationInProgress { private final T[] operations; + private Mutation[][] operationsFromCoprocessors; private final OperationStatus[] retCodeDetails; private final WALEdit[] walEditsFromCoprocessors; private final int firstIndex; @@ -63,7 +65,7 @@ public class MiniBatchOperationInProgress { /** * Sets the status code for the operation(Mutation) at the specified position. - * By setting this status, {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} + * By setting this status, {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} * can make HRegion to skip Mutations. * @param index * @param opStatus @@ -103,4 +105,25 @@ public class MiniBatchOperationInProgress { } return this.firstIndex + index; } + + /** + * Add more Mutations corresponding to the Mutation at the given index to be committed atomically + * in the same batch. These mutations are applied to the WAL and applied to the memstore as well. + * The timestamp of the cells in the given Mutations MUST be obtained from the original mutation. + * + * @param index the index that corresponds to the original mutation index in the batch + * @param newOperations the Mutations to add + */ + public void addOperationsFromCP(int index, Mutation[] newOperations) { + if (this.operationsFromCoprocessors == null) { + // lazy allocation to save on object allocation in case this is not used + this.operationsFromCoprocessors = new Mutation[operations.length][]; + } + this.operationsFromCoprocessors[getAbsoluteIndex(index)] = newOperations; + } + + public Mutation[] getOperationsFromCoprocessors(int index) { + return operationsFromCoprocessors == null ? null : + operationsFromCoprocessors[getAbsoluteIndex(index)]; + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java new file mode 100644 index 0000000..db7bcc0 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverForAddingMutationsFromCoprocessors.java @@ -0,0 +1,116 @@ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRegionObserverForAddingMutationsFromCoprocessors { + + private static final Log LOG + = LogFactory.getLog(TestRegionObserverForAddingMutationsFromCoprocessors.class); + + private static HBaseTestingUtility util; + private static final TableName tableName = TableName.valueOf("test"); + private static final byte[] dummy = Bytes.toBytes("dummy"); + private static final byte[] row1 = Bytes.toBytes("r1"); + private static final byte[] row2 = Bytes.toBytes("r2"); + private static final byte[] row3 = Bytes.toBytes("r3"); + private static final byte[] test = Bytes.toBytes("test"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + TestCoprocessor.class.getName()); + util = new HBaseTestingUtility(conf); + util.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + Admin admin = util.getAdmin(); + if (admin.tableExists(tableName)) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + } + admin.deleteTable(tableName); + } + util.createTable(tableName, new byte[][] {dummy, test}); + } + + /** + * Test various multiput operations. + * @throws Exception + */ + @Test + public void testMulti() throws Exception { + Table t = util.getConnection().getTable(tableName); + List puts = new ArrayList(); + Put p = new Put(row1); + p.addColumn(test,dummy,dummy); + puts.add(p); + t.put(puts); + Scan s = new Scan(); + ResultScanner scanner = t.getScanner(s); + int i = 0; + for (Result r: scanner) { + LOG.info(r.toString()); + i++; + } + assertEquals(i, 3); + scanner.close(); + t.close(); + } + + public static class TestCoprocessor extends BaseRegionObserver { + @Override + public void preBatchMutate( + ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) + throws IOException { + if (c.getEnvironment().getRegion().getTableDesc().getTableName().equals(tableName)) { + Mutation mut = miniBatchOp.getOperation(0); + List cells = mut.getFamilyCellMap().get(test); + Put[] puts = new Put[] { + new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(), + Bytes.toBytes("cpdummy")), + new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), + new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy), + }; + miniBatchOp.addOperationsFromCP(0, puts); + } + } + } + + // TODO: test with Delete + + // TODO: test with aborting the regionserver to check WAL edits +}