diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index aaa3b6a..5dfdc8e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -630,7 +630,9 @@ public interface RegionObserver extends Coprocessor { /** * This will be called after applying a batch of Mutations on a region. The Mutations are added to - * memstore and WAL. + * memstore and WAL. The difference of this one with {@link #postPut(ObserverContext, Put, WALEdit, Durability) } + * and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability) } is + * this hook will be executed before the mvcc transaction completion. *

* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation. * If need a Cell reference for later use, copy the cell and use that. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7a9d4e2..b20afd2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3332,6 +3332,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi applyFamilyMapToMemstore(familyMaps[i], memstoreSize); } + // calling the post CP hook for batch mutation + if (!replay && coprocessorHost != null) { + MiniBatchOperationInProgress miniBatchOp = + new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + coprocessorHost.postBatchMutate(miniBatchOp); + } + // STEP 6. Complete mvcc. if (replay) { this.mvcc.advanceTo(batchOp.getReplaySequenceId()); @@ -3348,14 +3356,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } releaseRowLocks(acquiredRowLocks); - // calling the post CP hook for batch mutation - if (!replay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - coprocessorHost.postBatchMutate(miniBatchOp); - } - for (int i = firstIndex; i < lastIndexExclusive; i ++) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; @@ -3419,7 +3419,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // call the coprocessor hook to do any finalization steps // after the put is done MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), + new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); } @@ -7055,7 +7055,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return; } - boolean locked; + boolean locked = false; + boolean success = false; List acquiredRowLocks; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); @@ -7063,86 +7064,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // when it assigns the edit a sequencedid (A.K.A the mvcc write number). WriteEntry writeEntry = null; MemstoreSize memstoreSize = new MemstoreSize(); + // STEP 2. Acquire the row lock(s) + acquiredRowLocks = new ArrayList(rowsToLock.size()); + for (byte[] row : rowsToLock) { + // Attempt to lock all involved rows, throw if any lock times out + // use a writer lock for mixed reads and writes + acquiredRowLocks.add(getRowLockInternal(row, false)); + } + // STEP 3. Region lock + lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size()); + locked = true; + long now = EnvironmentEdgeManager.currentTime(); try { - // STEP 2. Acquire the row lock(s) - acquiredRowLocks = new ArrayList(rowsToLock.size()); - for (byte[] row : rowsToLock) { - // Attempt to lock all involved rows, throw if any lock times out - // use a writer lock for mixed reads and writes - acquiredRowLocks.add(getRowLockInternal(row, false)); - } - // STEP 3. Region lock - lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); - locked = true; - boolean success = false; - long now = EnvironmentEdgeManager.currentTime(); - try { - // STEP 4. Let the processor scan the rows, generate mutations and add waledits - doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); - if (!mutations.isEmpty()) { - // STEP 5. Call the preBatchMutate hook - processor.preBatchMutate(this, walEdit); - - // STEP 6. Append and sync if walEdit has data to write out. - if (!walEdit.isEmpty()) { - writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()), - processor.getClusterIds(), now, nonceGroup, nonce); - } else { - // We are here if WAL is being skipped. - writeEntry = this.mvcc.begin(); - } + // STEP 4. Let the processor scan the rows, generate mutations and add waledits + doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); + if (!mutations.isEmpty()) { + // STEP 5. Call the preBatchMutate hook + processor.preBatchMutate(this, walEdit); - // STEP 7. Apply to memstore - long sequenceId = writeEntry.getWriteNumber(); - for (Mutation m : mutations) { - // Handle any tag based cell features. - // TODO: Do we need to call rewriteCellTags down in applyToMemstore()? Why not before - // so tags go into WAL? - rewriteCellTags(m.getFamilyCellMap(), m); - for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { - Cell cell = cellScanner.current(); - if (walEdit.isEmpty()) { - // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id. - // If no WAL, need to stamp it here. - CellUtil.setSequenceId(cell, sequenceId); - } - applyToMemstore(getHStore(cell), cell, memstoreSize); + // STEP 6. Append and sync if walEdit has data to write out. + if (!walEdit.isEmpty()) { + writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()), + processor.getClusterIds(), now, nonceGroup, nonce); + } else { + // We are here if WAL is being skipped. + writeEntry = this.mvcc.begin(); + } + + // STEP 7. Apply to memstore + long sequenceId = writeEntry.getWriteNumber(); + for (Mutation m : mutations) { + // Handle any tag based cell features. + // TODO: Do we need to call rewriteCellTags down in applyToMemstore()? Why not before + // so tags go into WAL? + rewriteCellTags(m.getFamilyCellMap(), m); + for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { + Cell cell = cellScanner.current(); + if (walEdit.isEmpty()) { + // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id. + // If no WAL, need to stamp it here. + CellUtil.setSequenceId(cell, sequenceId); } + applyToMemstore(getHStore(cell), cell, memstoreSize); } - // STEP 8. Complete mvcc. - mvcc.completeAndWait(writeEntry); - writeEntry = null; - - // STEP 9. Release region lock - if (locked) { - this.updatesLock.readLock().unlock(); - locked = false; - } + } - // STEP 10. Release row lock(s) - releaseRowLocks(acquiredRowLocks); + // STEP 8. call postBatchMutate hook + processor.postBatchMutate(this); - // STEP 11. call postBatchMutate hook - processor.postBatchMutate(this); - } - success = true; - } finally { - // Call complete rather than completeAndWait because we probably had error if walKey != null - if (writeEntry != null) mvcc.complete(writeEntry); - if (locked) { - this.updatesLock.readLock().unlock(); - } - // release locks if some were acquired but another timed out - releaseRowLocks(acquiredRowLocks); + // STEP 9. Complete mvcc. + mvcc.completeAndWait(writeEntry); + writeEntry = null; } - - // 12. Run post-process hook - processor.postProcess(this, walEdit, success); + success = true; } finally { - closeRegionOperation(); - if (!mutations.isEmpty()) { - long newSize = this.addAndGetMemstoreSize(memstoreSize); - requestFlushIfNeeded(newSize); + // Call complete rather than completeAndWait because we probably had error if walKey != null + if (writeEntry != null) mvcc.complete(writeEntry); + + // STEP 10. Release region lock + if (locked) { + this.updatesLock.readLock().unlock(); + } + + // STEP 11. Release row lock(s) + releaseRowLocks(acquiredRowLocks); + + try { + // 12. Run post-process hook + processor.postProcess(this, walEdit, success); + if (!mutations.isEmpty()) { + long newSize = this.addAndGetMemstoreSize(memstoreSize); + requestFlushIfNeeded(newSize); + } + } finally { + closeRegionOperation(); } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index da033c6..8b6b2e2 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -19,19 +19,33 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; - +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionLocation; @@ -44,6 +58,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; +import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -479,4 +494,101 @@ public class TestFromClientSide3 { ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection(); assertTrue(con.hasCellBlockSupport()); } + + @Test + public void testPutWithPreBatchMutate ()throws Exception { + TableName tableName = TableName.valueOf("testPutWithPreBatchMutate"); + testPreBatchMutate(tableName, () -> { + try { + Table t = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + t.put(put); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + } + + @Test + public void testRowMutationsWithPreBatchMutate ()throws Exception { + TableName tableName = TableName.valueOf("testRowMutationsWithPreBatchMutate"); + testPreBatchMutate(tableName, () -> { + try { + RowMutations rm = new RowMutations(ROW, 1); + Table t = TEST_UTIL.getConnection().getTable(tableName); + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + rm.add(put); + t.mutateRow(rm); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + } + + private void testPreBatchMutate (TableName tableName, Runnable rn)throws Exception { + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addCoprocessor(WatiingForScanObserver.class.getName()); + desc.addFamily(new HColumnDescriptor(FAMILY)); + TEST_UTIL.getAdmin().createTable(desc); + ExecutorService service = Executors.newFixedThreadPool(2); + service.execute(rn); + final List cells = new ArrayList<>(); + service.execute(() -> { + try { + // waiting for update. + TimeUnit.SECONDS.sleep(3); + Table t = TEST_UTIL.getConnection().getTable(tableName); + Scan scan = new Scan(); + try (ResultScanner scanner = t.getScanner(scan)) { + for (Result r : scanner) { + cells.addAll(Arrays.asList(r.rawCells())); + } + } + } catch (IOException | InterruptedException ex) { + throw new RuntimeException(ex); + } + }); + service.shutdown(); + service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + assertEquals("The write is blocking by RegionObserver#postBatchMutate," + + ", so the data is invisible to reader", 0, cells.size()); + TEST_UTIL.deleteTable(tableName); + } + + private static T find(final TableName tableName, + Class clz) throws IOException, InterruptedException { + HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName); + List regions = rs.getOnlineRegions(tableName); + assertEquals(1, regions.size()); + Region region = regions.get(0); + Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName()); + assertTrue("The cp instance should be " + clz.getName() + + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp)); + return clz.cast(cp); + } + + public static class WatiingForScanObserver extends BaseRegionObserver { + + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + try { + // waiting for scanner + latch.await(); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + + @Override + public RegionScanner postScannerOpen(final ObserverContext e, + final Scan scan, final RegionScanner s) throws IOException { + latch.countDown(); + return s; + } + } }