diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index aaa3b6a..9818c77 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -630,7 +630,9 @@ public interface RegionObserver extends Coprocessor {
/**
* This will be called after applying a batch of Mutations on a region. The Mutations are added to
- * memstore and WAL.
+ * memstore and WAL. The difference of this one with {@link #postPut(ObserverContext, Put, WALEdit, Durability) }
+ * and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability) is
+ * this hook will be executed before the mvcc transaction completion.
*
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 7a9d4e2..b20afd2 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3332,6 +3332,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
applyFamilyMapToMemstore(familyMaps[i], memstoreSize);
}
+ // calling the post CP hook for batch mutation
+ if (!replay && coprocessorHost != null) {
+ MiniBatchOperationInProgress miniBatchOp =
+ new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
+ batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
+ coprocessorHost.postBatchMutate(miniBatchOp);
+ }
+
// STEP 6. Complete mvcc.
if (replay) {
this.mvcc.advanceTo(batchOp.getReplaySequenceId());
@@ -3348,14 +3356,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
releaseRowLocks(acquiredRowLocks);
- // calling the post CP hook for batch mutation
- if (!replay && coprocessorHost != null) {
- MiniBatchOperationInProgress miniBatchOp =
- new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(),
- batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
- coprocessorHost.postBatchMutate(miniBatchOp);
- }
-
for (int i = firstIndex; i < lastIndexExclusive; i ++) {
if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
@@ -3419,7 +3419,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// call the coprocessor hook to do any finalization steps
// after the put is done
MiniBatchOperationInProgress miniBatchOp =
- new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(),
+ new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
}
@@ -7055,7 +7055,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return;
}
- boolean locked;
+ boolean locked = false;
+ boolean success = false;
List acquiredRowLocks;
List mutations = new ArrayList();
Collection rowsToLock = processor.getRowsToLock();
@@ -7063,86 +7064,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// when it assigns the edit a sequencedid (A.K.A the mvcc write number).
WriteEntry writeEntry = null;
MemstoreSize memstoreSize = new MemstoreSize();
+ // STEP 2. Acquire the row lock(s)
+ acquiredRowLocks = new ArrayList(rowsToLock.size());
+ for (byte[] row : rowsToLock) {
+ // Attempt to lock all involved rows, throw if any lock times out
+ // use a writer lock for mixed reads and writes
+ acquiredRowLocks.add(getRowLockInternal(row, false));
+ }
+ // STEP 3. Region lock
+ lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
+ locked = true;
+ long now = EnvironmentEdgeManager.currentTime();
try {
- // STEP 2. Acquire the row lock(s)
- acquiredRowLocks = new ArrayList(rowsToLock.size());
- for (byte[] row : rowsToLock) {
- // Attempt to lock all involved rows, throw if any lock times out
- // use a writer lock for mixed reads and writes
- acquiredRowLocks.add(getRowLockInternal(row, false));
- }
- // STEP 3. Region lock
- lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
- locked = true;
- boolean success = false;
- long now = EnvironmentEdgeManager.currentTime();
- try {
- // STEP 4. Let the processor scan the rows, generate mutations and add waledits
- doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
- if (!mutations.isEmpty()) {
- // STEP 5. Call the preBatchMutate hook
- processor.preBatchMutate(this, walEdit);
-
- // STEP 6. Append and sync if walEdit has data to write out.
- if (!walEdit.isEmpty()) {
- writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()),
- processor.getClusterIds(), now, nonceGroup, nonce);
- } else {
- // We are here if WAL is being skipped.
- writeEntry = this.mvcc.begin();
- }
+ // STEP 4. Let the processor scan the rows, generate mutations and add waledits
+ doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
+ if (!mutations.isEmpty()) {
+ // STEP 5. Call the preBatchMutate hook
+ processor.preBatchMutate(this, walEdit);
- // STEP 7. Apply to memstore
- long sequenceId = writeEntry.getWriteNumber();
- for (Mutation m : mutations) {
- // Handle any tag based cell features.
- // TODO: Do we need to call rewriteCellTags down in applyToMemstore()? Why not before
- // so tags go into WAL?
- rewriteCellTags(m.getFamilyCellMap(), m);
- for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
- Cell cell = cellScanner.current();
- if (walEdit.isEmpty()) {
- // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id.
- // If no WAL, need to stamp it here.
- CellUtil.setSequenceId(cell, sequenceId);
- }
- applyToMemstore(getHStore(cell), cell, memstoreSize);
+ // STEP 6. Append and sync if walEdit has data to write out.
+ if (!walEdit.isEmpty()) {
+ writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()),
+ processor.getClusterIds(), now, nonceGroup, nonce);
+ } else {
+ // We are here if WAL is being skipped.
+ writeEntry = this.mvcc.begin();
+ }
+
+ // STEP 7. Apply to memstore
+ long sequenceId = writeEntry.getWriteNumber();
+ for (Mutation m : mutations) {
+ // Handle any tag based cell features.
+ // TODO: Do we need to call rewriteCellTags down in applyToMemstore()? Why not before
+ // so tags go into WAL?
+ rewriteCellTags(m.getFamilyCellMap(), m);
+ for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
+ Cell cell = cellScanner.current();
+ if (walEdit.isEmpty()) {
+ // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id.
+ // If no WAL, need to stamp it here.
+ CellUtil.setSequenceId(cell, sequenceId);
}
+ applyToMemstore(getHStore(cell), cell, memstoreSize);
}
- // STEP 8. Complete mvcc.
- mvcc.completeAndWait(writeEntry);
- writeEntry = null;
-
- // STEP 9. Release region lock
- if (locked) {
- this.updatesLock.readLock().unlock();
- locked = false;
- }
+ }
- // STEP 10. Release row lock(s)
- releaseRowLocks(acquiredRowLocks);
+ // STEP 8. call postBatchMutate hook
+ processor.postBatchMutate(this);
- // STEP 11. call postBatchMutate hook
- processor.postBatchMutate(this);
- }
- success = true;
- } finally {
- // Call complete rather than completeAndWait because we probably had error if walKey != null
- if (writeEntry != null) mvcc.complete(writeEntry);
- if (locked) {
- this.updatesLock.readLock().unlock();
- }
- // release locks if some were acquired but another timed out
- releaseRowLocks(acquiredRowLocks);
+ // STEP 9. Complete mvcc.
+ mvcc.completeAndWait(writeEntry);
+ writeEntry = null;
}
-
- // 12. Run post-process hook
- processor.postProcess(this, walEdit, success);
+ success = true;
} finally {
- closeRegionOperation();
- if (!mutations.isEmpty()) {
- long newSize = this.addAndGetMemstoreSize(memstoreSize);
- requestFlushIfNeeded(newSize);
+ // Call complete rather than completeAndWait because we probably had error if walKey != null
+ if (writeEntry != null) mvcc.complete(writeEntry);
+
+ // STEP 10. Release region lock
+ if (locked) {
+ this.updatesLock.readLock().unlock();
+ }
+
+ // STEP 11. Release row lock(s)
+ releaseRowLocks(acquiredRowLocks);
+
+ try {
+ // 12. Run post-process hook
+ processor.postProcess(this, walEdit, success);
+ if (!mutations.isEmpty()) {
+ long newSize = this.addAndGetMemstoreSize(memstoreSize);
+ requestFlushIfNeeded(newSize);
+ }
+ } finally {
+ closeRegionOperation();
}
}
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index da033c6..8b6b2e2 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -19,19 +19,33 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,6 +58,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.AfterClass;
+import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -479,4 +494,101 @@ public class TestFromClientSide3 {
ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection();
assertTrue(con.hasCellBlockSupport());
}
+
+ @Test
+ public void testPutWithPreBatchMutate ()throws Exception {
+ TableName tableName = TableName.valueOf("testPutWithPreBatchMutate");
+ testPreBatchMutate(tableName, () -> {
+ try {
+ Table t = TEST_UTIL.getConnection().getTable(tableName);
+ Put put = new Put(ROW);
+ put.addColumn(FAMILY, QUALIFIER, VALUE);
+ t.put(put);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ }
+
+ @Test
+ public void testRowMutationsWithPreBatchMutate ()throws Exception {
+ TableName tableName = TableName.valueOf("testRowMutationsWithPreBatchMutate");
+ testPreBatchMutate(tableName, () -> {
+ try {
+ RowMutations rm = new RowMutations(ROW, 1);
+ Table t = TEST_UTIL.getConnection().getTable(tableName);
+ Put put = new Put(ROW);
+ put.addColumn(FAMILY, QUALIFIER, VALUE);
+ rm.add(put);
+ t.mutateRow(rm);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ }
+
+ private void testPreBatchMutate (TableName tableName, Runnable rn)throws Exception {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addCoprocessor(WatiingForScanObserver.class.getName());
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+ TEST_UTIL.getAdmin().createTable(desc);
+ ExecutorService service = Executors.newFixedThreadPool(2);
+ service.execute(rn);
+ final List| cells = new ArrayList<>();
+ service.execute(() -> {
+ try {
+ // waiting for update.
+ TimeUnit.SECONDS.sleep(3);
+ Table t = TEST_UTIL.getConnection().getTable(tableName);
+ Scan scan = new Scan();
+ try (ResultScanner scanner = t.getScanner(scan)) {
+ for (Result r : scanner) {
+ cells.addAll(Arrays.asList(r.rawCells()));
+ }
+ }
+ } catch (IOException | InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ service.shutdown();
+ service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+ assertEquals("The write is blocking by RegionObserver#postBatchMutate,"
+ + ", so the data is invisible to reader", 0, cells.size());
+ TEST_UTIL.deleteTable(tableName);
+ }
+
+ private static T find(final TableName tableName,
+ Class clz) throws IOException, InterruptedException {
+ HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
+ List regions = rs.getOnlineRegions(tableName);
+ assertEquals(1, regions.size());
+ Region region = regions.get(0);
+ Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
+ assertTrue("The cp instance should be " + clz.getName()
+ + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
+ return clz.cast(cp);
+ }
+
+ public static class WatiingForScanObserver extends BaseRegionObserver {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public void postBatchMutate(final ObserverContext c,
+ final MiniBatchOperationInProgress miniBatchOp) throws IOException {
+ try {
+ // waiting for scanner
+ latch.await();
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public RegionScanner postScannerOpen(final ObserverContext e,
+ final Scan scan, final RegionScanner s) throws IOException {
+ latch.countDown();
+ return s;
+ }
+ }
}
|