diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 42d5cdb..94ee36a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -618,6 +618,7 @@ public interface RegionObserver extends Coprocessor { * for each Mutation at the server. The batch may contain Put/Delete. By setting OperationStatus * of Mutations ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}), * {@link RegionObserver} can make Region to skip these Mutations. + * Note: The durability from CP will be replaced by the durability of corresponding mutation. * @param c the environment provided by the region server * @param miniBatchOp batch of Mutations getting applied to region. * @throws IOException if an error occurred on the coprocessor diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 43845ef..ea901b2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3227,10 +3227,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Acquire row locks. If not, the whole batch will fail. acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); - if (cpMutation.getDurability() == Durability.SKIP_WAL) { - recordMutationWithoutWal(cpFamilyMap); - } - // Returned mutations from coprocessor correspond to the Mutation at index i. We can // directly add the cells from those mutations to the familyMaps of this mutation. mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 6ba0351..9fda62c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -90,6 +90,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; @@ -2398,6 +2399,77 @@ public class TestHRegion { } @Test + public void testDataInMemoryWithoutWAL() throws IOException { + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL"); + FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF); + HRegion region = initHRegion(tableName, null, null, name.getMethodName(), + CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES); + + Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1, + System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1); + final long originalSize = KeyValueUtil.keyLength(originalCell) + originalCell.getValueLength(); + + Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1, + System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx")); + final long addSize = KeyValueUtil.keyLength(addCell) + addCell.getValueLength(); + + LOG.info("originalSize:" + originalSize + + ", addSize:" + addSize); + // start test. We expect that the addPut's durability will be replaced + // by originalPut's durability. + + // case 1: + testDataInMemoryWithoutWAL(region, + new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL), + new Put(row).add(addCell).setDurability(Durability.SKIP_WAL), + originalSize + addSize); + + // case 2: + testDataInMemoryWithoutWAL(region, + new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL), + new Put(row).add(addCell).setDurability(Durability.SYNC_WAL), + originalSize + addSize); + + // case 3: + testDataInMemoryWithoutWAL(region, + new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL), + new Put(row).add(addCell).setDurability(Durability.SKIP_WAL), + 0); + + // case 4: + testDataInMemoryWithoutWAL(region, + new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL), + new Put(row).add(addCell).setDurability(Durability.SYNC_WAL), + 0); + } + + private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut, + final Put addPut, long delta) throws IOException { + final long initSize = region.getDataInMemoryWithoutWAL(); + // save normalCPHost and replaced by mockedCPHost + RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); + RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); + Answer answer = new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + MiniBatchOperationInProgress mb = invocation.getArgumentAt(0, + MiniBatchOperationInProgress.class); + mb.addOperationsFromCP(0, new Mutation[]{addPut}); + return false; + } + }; + when(mockedCPHost.preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class))) + .then(answer); + region.setCoprocessorHost(mockedCPHost); + region.put(originalPut); + region.setCoprocessorHost(normalCPHost); + final long finalSize = region.getDataInMemoryWithoutWAL(); + assertEquals("finalSize:" + finalSize + ", initSize:" + + initSize + ", delta:" + delta,finalSize, initSize + delta); + } + + @Test public void testDeleteColumns_PostInsert() throws IOException, InterruptedException { Delete delete = new Delete(row); delete.deleteColumns(fam1, qual1);