diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java index be2bd91..2b9ac62 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java @@ -40,11 +40,13 @@ public abstract class BaseRowProcessor implements RowProcessor { @Override - public void preProcess(HRegion region, WALEdit walEdit) throws IOException { + public boolean preProcess(HRegion region, WALEdit walEdit) throws IOException { + return false; } @Override - public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { + public boolean preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { + return false; } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 71cc247..87b9a0b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7025,40 +7025,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } checkResources(); startRegionOperation(); - WALEdit walEdit = new WALEdit(); + long addedSize = 0; + List mutations = new ArrayList<>(); + try { + WALEdit walEdit = new WALEdit(); - // STEP 1. Run pre-process hook - preProcess(processor, walEdit); - // Short circuit the read only case - if (processor.readOnly()) { - try { + // STEP 1. Run pre-process hook + boolean bypass = preProcess(processor, walEdit); + if (bypass) { + return; + } + // Short circuit the read only case + if (processor.readOnly()) { long now = EnvironmentEdgeManager.currentTime(); doProcessRowWithTimeout(processor, now, this, null, null, timeout); processor.postProcess(this, walEdit, true); - } finally { - closeRegionOperation(); + return; } - return; - } - boolean locked; - List acquiredRowLocks; - long addedSize = 0; - List mutations = new ArrayList(); - Collection rowsToLock = processor.getRowsToLock(); - // This is assigned by mvcc either explicity in the below or in the guts of the WAL append - // when it assigns the edit a sequencedid (A.K.A the mvcc write number). - WriteEntry writeEntry = null; - try { + boolean locked; + List acquiredRowLocks; + Collection rowsToLock = processor.getRowsToLock(); + // This is assigned by mvcc either explicity in the below or in the guts of the WAL append + // when it assigns the edit a sequencedid (A.K.A the mvcc write number). + WriteEntry writeEntry = null; // STEP 2. Acquire the row lock(s) - acquiredRowLocks = new ArrayList(rowsToLock.size()); + acquiredRowLocks = new ArrayList<>(rowsToLock.size()); for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out // use a writer lock for mixed reads and writes acquiredRowLocks.add(getRowLockInternal(row, false)); } // STEP 3. Region lock - lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); + lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size()); locked = true; boolean success = false; long now = EnvironmentEdgeManager.currentTime(); @@ -7067,7 +7066,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { // STEP 5. Call the preBatchMutate hook - processor.preBatchMutate(this, walEdit); + if (processor.preBatchMutate(this, walEdit)) { + return; + } // STEP 6. Append and sync if walEdit has data to write out. if (!walEdit.isEmpty()) { @@ -7133,10 +7134,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private void preProcess(final RowProcessor processor, final WALEdit walEdit) + private boolean preProcess(final RowProcessor processor, final WALEdit walEdit) throws IOException { try { - processor.preProcess(this, walEdit); + return processor.preProcess(this, walEdit); } catch (IOException e) { closeRegionOperation(); throw e; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java index 995ea93..68911eb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -44,7 +44,7 @@ MultiRowMutationProcessorResponse> { Collection rowsToLock; Collection mutations; MiniBatchOperationInProgress miniBatch; - + private boolean bypass = false; MultiRowMutationProcessor(Collection mutations, Collection rowsToLock) { this.rowsToLock = rowsToLock; @@ -71,6 +71,9 @@ MultiRowMutationProcessorResponse> { HRegion region, List mutationsToApply, WALEdit walEdit) throws IOException { + if (bypass) { + return; + } byte[] byteNow = Bytes.toBytes(now); // Check mutations for (Mutation m : this.mutations) { @@ -101,40 +104,52 @@ MultiRowMutationProcessorResponse> { } @Override - public void preProcess(HRegion region, WALEdit walEdit) throws IOException { + public boolean preProcess(HRegion region, WALEdit walEdit) throws IOException { RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); if (coprocessorHost != null) { for (Mutation m : mutations) { if (m instanceof Put) { if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { // by pass everything - return; + bypass = true; + return bypass; } } else if (m instanceof Delete) { Delete d = (Delete) m; region.prepareDelete(d); if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) { // by pass everything - return; + bypass = true; + return bypass; } + } else { + throw new DoNotRetryIOException("Action must be Put or Delete. But was: " + + m.getClass().getName()); } } } + return bypass; } @Override - public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { - // TODO we should return back the status of this hook run to HRegion so that those Mutations - // with OperationStatus as SUCCESS or FAILURE should not get applied to memstore. + public boolean preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { + if (bypass) { + return bypass; + } RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); OperationStatus[] opStatus = new OperationStatus[mutations.size()]; Arrays.fill(opStatus, OperationStatus.NOT_RUN); WALEdit[] walEditsFromCP = new WALEdit[mutations.size()]; if (coprocessorHost != null) { - miniBatch = new MiniBatchOperationInProgress( + miniBatch = new MiniBatchOperationInProgress<>( mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0, mutations.size()); - coprocessorHost.preBatchMutate(miniBatch); + if (coprocessorHost.preBatchMutate(miniBatch)) { + bypass = true; + } + } + if (bypass) { + return bypass; } // Apply edits to a single WALEdit for (int i = 0; i < mutations.size(); i++) { @@ -149,10 +164,14 @@ MultiRowMutationProcessorResponse> { } } } + return bypass; } @Override public void postBatchMutate(HRegion region) throws IOException { + if (bypass) { + return; + } RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); if (coprocessorHost != null) { assert miniBatch != null; @@ -163,6 +182,9 @@ MultiRowMutationProcessorResponse> { @Override public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException { + if (bypass) { + return; + } RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); if (coprocessorHost != null) { for (Mutation m : mutations) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java index 34901b7..b558b1f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -94,14 +94,14 @@ public interface RowProcessor { * @param region the HRegion * @param walEdit the output WAL edits to apply to write ahead log */ - void preProcess(HRegion region, WALEdit walEdit) throws IOException; + boolean preProcess(HRegion region, WALEdit walEdit) throws IOException; /** * The hook to be executed after the process() but before applying the Mutations to region. Also * by the time this hook is called, mvcc transaction have started. * @param walEdit the output WAL edits to apply to write ahead log */ - void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException; + boolean preBatchMutate(HRegion region, WALEdit walEdit) throws IOException; /** * The hook to be executed after the process() and applying the Mutations to region. The diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index da033c6..a25da64 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -28,14 +29,24 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -52,18 +63,17 @@ import org.junit.experimental.categories.Category; @Category({LargeTests.class, ClientTests.class}) public class TestFromClientSide3 { private static final Log LOG = LogFactory.getLog(TestFromClientSide3.class); - private final static HBaseTestingUtility TEST_UTIL - = new HBaseTestingUtility(); - private static byte[] FAMILY = Bytes.toBytes("testFamily"); - private static Random random = new Random(); - private static int SLAVES = 3; - private static byte [] ROW = Bytes.toBytes("testRow"); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final Random random = new Random(); + private static final int SLAVES = 3; + private static final byte[] ROW = Bytes.toBytes("testRow"); private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); - private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); - private static byte [] VALUE = Bytes.toBytes("testValue"); - private final static byte[] COL_QUAL = Bytes.toBytes("f1"); - private final static byte[] VAL_BYTES = Bytes.toBytes("v1"); - private final static byte[] ROW_BYTES = Bytes.toBytes("r1"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + private static final byte[] COL_QUAL = Bytes.toBytes("f1"); + private static final byte[] VAL_BYTES = Bytes.toBytes("v1"); + private static final byte[] ROW_BYTES = Bytes.toBytes("r1"); /** * @throws java.lang.Exception @@ -259,6 +269,60 @@ public class TestFromClientSide3 { } @Test + public void testHTableRowMutationsWithObserver ()throws Exception { + TableName tableName = TableName.valueOf("testHTableRowMutationsWithObserver"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(FilterObserver.EXCLUDED_FAMILY)); + desc.addFamily(new HColumnDescriptor(FilterObserver.INCLUDED_FAMILY)); + desc.addCoprocessor(FilterObserver.class.getName()); + Table t = TEST_UTIL.createTable(desc, new byte[0][0]); + byte[] includedRow = Bytes.toBytes("row1"); + Put includedPut = new Put(includedRow); + includedPut.addColumn(FilterObserver.INCLUDED_FAMILY, includedRow, includedRow); + t.put(includedPut); + byte[] excludedRow = Bytes.toBytes("row2"); + Put excludedPut_v0 = new Put(excludedRow); + excludedPut_v0.addColumn(FilterObserver.EXCLUDED_FAMILY, excludedRow, excludedRow); + Put excludedPut_v1 = new Put(excludedRow); + excludedPut_v1.addColumn(FilterObserver.INCLUDED_FAMILY, excludedRow, excludedRow); + RowMutations muts = new RowMutations(excludedRow); + muts.add(excludedPut_v0); + muts.add(excludedPut_v1); + t.mutateRow(muts); + Get includedGet = new Get(includedRow); + Result includedResult = t.get(includedGet); + assertEquals(1, includedResult.rawCells().length); + Get excludedGet = new Get(excludedRow); + Result excludedResult = t.get(excludedGet); + assertEquals(0, excludedResult.rawCells().length); + + FilterObserver cp = find(tableName); + // includedPut: prePut + // excludedPut_v0: prePut + // excludedPut_v1: none + assertEquals(2, cp.preDeleteAndPutCount.get()); + // includedPut: preBatchMutate + // excludedPut_v0: none + // excludedPut_v1: none + assertEquals(1, cp.preBatchMutateCount.get()); // includedPut + // includedPut: postPut + postBatchMutateIndispensably + postBatchMutate + // excludedPut_v0: none + // excludedPut_v1: none + assertEquals(3, cp.postCount.get()); // includedPut + } + + private static FilterObserver find(final TableName tableName) throws IOException, InterruptedException { + HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); + List regions = rs.getOnlineRegions(tableName); + assertEquals(1, regions.size()); + Region region = regions.get(0); + Coprocessor cp = region.getCoprocessorHost().findCoprocessor(FilterObserver.class.getName()); + assertTrue("The cp instance should be " + FilterObserver.class.getName() + + ", current instance is " + cp.getClass().getName(), cp instanceof FilterObserver); + return (FilterObserver) cp; + } + + @Test public void testHTableBatchWithEmptyPut ()throws Exception { Table table = TEST_UTIL.createTable(TableName.valueOf("testHTableBatchWithEmptyPut"), new byte[][] { FAMILY }); @@ -479,4 +543,61 @@ public class TestFromClientSide3 { ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection(); assertTrue(con.hasCellBlockSupport()); } + + public static class FilterObserver extends BaseRegionObserver { + + static final byte[] INCLUDED_FAMILY = FAMILY; + static final byte[] EXCLUDED_FAMILY = Bytes.toBytes("exclude"); + final AtomicInteger preBatchMutateCount = new AtomicInteger(); + final AtomicInteger postCount = new AtomicInteger(); + final AtomicInteger preDeleteAndPutCount = new AtomicInteger(); + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + preDeleteAndPutCount.incrementAndGet(); + for (byte[] f : put.getFamilyCellMap().keySet()) { + if (Bytes.equals(f, EXCLUDED_FAMILY)) { + e.bypass(); + return; + } + } + } + + @Override + public void preDelete(final ObserverContext e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + preDeleteAndPutCount.incrementAndGet(); + } + + @Override + public void preBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + preBatchMutateCount.incrementAndGet(); + } + + @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + postCount.incrementAndGet(); + } + + @Override + public void postBatchMutateIndispensably(final ObserverContext ctx, + MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException { + postCount.incrementAndGet(); + } + + @Override + public void postPut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + postCount.incrementAndGet(); + } + + @Override + public void postDelete(final ObserverContext e, + final Delete delete, final WALEdit edit, final Durability durability) + throws IOException { + postCount.incrementAndGet(); + } + } }