diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e7a99a9..13efa25 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1107,7 +1107,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.rsAccounting != null) { rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize); } - return this.memstoreSize.addAndGet(memStoreSize); + long size = this.memstoreSize.addAndGet(memStoreSize); + // This is extremely bad if we make memstoreSize negative. Log as much info on the offending + // caller as possible. (memStoreSize might be a negative value already -- freeing memory) + if (size < 0) { + LOG.error("Asked to modify this region's (" + this.toString() + + ") memstoreSize to a negative value which is incorrect. Current memstoreSize=" + + (size-memStoreSize) + ", delta=" + memStoreSize, new Exception()); + } + return size; } @Override @@ -2349,7 +2357,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi perCfExtras = new StringBuilder(); for (Store store: storesToFlush) { perCfExtras.append("; ").append(store.getColumnFamilyName()); - perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize())); + perCfExtras.append("=").append(StringUtils.byteDesc(store.getFlushableSize())); } } LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + @@ -2925,8 +2933,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } initialized = true; } - long addedSize = doMiniBatchMutate(batchOp); - long newSize = this.addAndGetGlobalMemstoreSize(addedSize); + doMiniBatchMutate(batchOp); + long newSize = this.getMemstoreSize(); requestFlushIfNeeded(newSize); } } finally { @@ -3007,6 +3015,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int cellCount = 0; /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); + long addedSize = 0; try { // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. int numReadyToWrite = 0; @@ -3188,7 +3197,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // STEP 5. Write back to memstore - long addedSize = 0; for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; @@ -3249,6 +3257,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { // Call complete rather than completeAndWait because we probably had error if walKey != null if (writeEntry != null) mvcc.complete(writeEntry); + this.addAndGetGlobalMemstoreSize(addedSize); if (locked) { this.updatesLock.readLock().unlock(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 8cc04f7..328b559 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -35,6 +35,7 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -413,6 +414,44 @@ public class TestHRegion { HBaseTestingUtility.closeRegionAndWAL(region); } + @Test + public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException { + String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate"; + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + testName); + FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF); + HRegion region = initHRegion(tableName, null, null, name.getMethodName(), + CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES); + Store store = region.getStore(COLUMN_FAMILY_BYTES); + assertEquals(0, region.getMemstoreSize()); + + // Put one value + byte [] value = Bytes.toBytes(name.getMethodName()); + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); + region.put(put); + long onePutSize = region.getMemstoreSize(); + assertTrue(onePutSize > 0); + + RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); + doThrow(new IOException()) + .when(mockedCPHost).postBatchMutate(Mockito.>any()); + region.setCoprocessorHost(mockedCPHost); + + put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value); + try { + region.put(put); + fail("Should have failed with IOException"); + } catch (IOException expected) { + } + assertEquals("memstoreSize should be incremented", onePutSize * 2, region.getMemstoreSize()); + assertEquals("flushable size should be incremented", onePutSize * 2, store.getFlushableSize()); + + region.setCoprocessorHost(null); + HBaseTestingUtility.closeRegionAndWAL(region); + } + /** * Test we do not lose data if we fail a flush and then close. * Part of HBase-10466. Tests the following from the issue description: @@ -6045,6 +6084,7 @@ public class TestHRegion { final HTableDescriptor htd, final RegionServerServices rsServices) { super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); } + @Override protected long getNextSequenceId(WAL wal) throws IOException { return 42; }