diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index aaa3b6a..5dfdc8e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -630,7 +630,9 @@ public interface RegionObserver extends Coprocessor { /** * This will be called after applying a batch of Mutations on a region. The Mutations are added to - * memstore and WAL. + * memstore and WAL. The difference of this one with {@link #postPut(ObserverContext, Put, WALEdit, Durability) } + * and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability) } is + * this hook will be executed before the mvcc transaction completion. *

* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation. * If need a Cell reference for later use, copy the cell and use that. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7a9d4e2..da5e516 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3332,6 +3332,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi applyFamilyMapToMemstore(familyMaps[i], memstoreSize); } + // calling the post CP hook for batch mutation + if (!replay && coprocessorHost != null) { + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + coprocessorHost.postBatchMutate(miniBatchOp); + } + // STEP 6. Complete mvcc. if (replay) { this.mvcc.advanceTo(batchOp.getReplaySequenceId()); @@ -3348,14 +3356,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } releaseRowLocks(acquiredRowLocks); - // calling the post CP hook for batch mutation - if (!replay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - coprocessorHost.postBatchMutate(miniBatchOp); - } - for (int i = firstIndex; i < lastIndexExclusive; i ++) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; @@ -7109,21 +7109,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi applyToMemstore(getHStore(cell), cell, memstoreSize); } } - // STEP 8. Complete mvcc. + + // STEP 8. call postBatchMutate hook + processor.postBatchMutate(this); + + // STEP 9. Complete mvcc. mvcc.completeAndWait(writeEntry); writeEntry = null; - // STEP 9. Release region lock + // STEP 10. Release region lock if (locked) { this.updatesLock.readLock().unlock(); locked = false; } - // STEP 10. Release row lock(s) + // STEP 11. Release row lock(s) releaseRowLocks(acquiredRowLocks); - - // STEP 11. call postBatchMutate hook - processor.postBatchMutate(this); } success = true; } finally { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index da033c6..9092dd5 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -19,19 +19,33 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; - +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionLocation; @@ -44,6 +58,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; +import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -479,4 +494,101 @@ public class TestFromClientSide3 { ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection(); assertTrue(con.hasCellBlockSupport()); } + + @Test(timeout = 60000) + public void testPutWithPreBatchMutate ()throws Exception { + TableName tableName = TableName.valueOf("testPutWithPreBatchMutate"); + testPreBatchMutate(tableName, () -> { + try { + Table t = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + t.put(put); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + } + + @Test(timeout = 60000) + public void testRowMutationsWithPreBatchMutate ()throws Exception { + TableName tableName = TableName.valueOf("testRowMutationsWithPreBatchMutate"); + testPreBatchMutate(tableName, () -> { + try { + RowMutations rm = new RowMutations(ROW, 1); + Table t = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + rm.add(put); + t.mutateRow(rm); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + } + + private void testPreBatchMutate (TableName tableName, Runnable rn)throws Exception { + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addCoprocessor(WatiingForScanObserver.class.getName()); + desc.addFamily(new HColumnDescriptor(FAMILY)); + TEST_UTIL.getAdmin().createTable(desc); + ExecutorService service = Executors.newFixedThreadPool(2); + service.execute(rn); + final List cells = new ArrayList<>(); + service.execute(() -> { + try { + // waiting for update. + TimeUnit.SECONDS.sleep(3); + Table t = TEST_UTIL.getConnection().getTable(tableName); + Scan scan = new Scan(); + try (ResultScanner scanner = t.getScanner(scan)) { + for (Result r : scanner) { + cells.addAll(Arrays.asList(r.rawCells())); + } + } + } catch (IOException | InterruptedException ex) { + throw new RuntimeException(ex); + } + }); + service.shutdown(); + service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + assertEquals("The write is blocking by RegionObserver#postBatchMutate" + + ", so the data is invisible to reader", 0, cells.size()); + TEST_UTIL.deleteTable(tableName); + } + + private static T find(final TableName tableName, + Class clz) throws IOException, InterruptedException { + HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); + List regions = rs.getOnlineRegions(tableName); + assertEquals(1, regions.size()); + Region region = regions.get(0); + Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); + assertTrue("The cp instance should be " + clz.getName() + + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp)); + return clz.cast(cp); + } + + public static class WatiingForScanObserver extends BaseRegionObserver { + + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + try { + // waiting for scanner + latch.await(); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + + @Override + public RegionScanner postScannerOpen(final ObserverContext e, + final Scan scan, final RegionScanner s) throws IOException { + latch.countDown(); + return s; + } + } }