.../apache/hadoop/hbase/regionserver/HRegion.java | 36 +++++++++++++++------- .../hadoop/hbase/regionserver/RSRpcServices.java | 6 ++-- .../hadoop/hbase/regionserver/wal/WALEdit.java | 8 ++++- 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c93123c..8836609 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2854,17 +2854,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) - throws IOException { + public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) throws IOException { // As it stands, this is used for 3 things // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... - return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); + return batchMutate(mutations, nonceGroup, nonce, 0); + } + + public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce, + int cellCount) throws IOException { + // As it stands, this is used for 3 things + // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. + // * coprocessor calls (see ex. BulkDeleteEndpoint). + // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... + return batchMutate(new MutationBatch(mutations, nonceGroup, nonce), cellCount); } public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { - return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); + return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE, 0); } @Override @@ -2889,18 +2897,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } return statuses; } - return batchMutate(new ReplayBatch(mutations, replaySeqId)); + return batchMutate(new ReplayBatch(mutations, replaySeqId), 0); } /** * Perform a batch of mutations. * It supports only Put and Delete mutations and will ignore other types passed. * @param batchOp contains the list of mutations + * @param cellCount the cell count associated with the mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - OperationStatus[] batchMutate(BatchOperationInProgress batchOp) throws IOException { + OperationStatus[] batchMutate(BatchOperationInProgress batchOp, int cellCount) + throws IOException { boolean initialized = false; Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; startRegionOperation(op); @@ -2914,11 +2924,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!initialized) { this.writeRequestsCount.add(batchOp.operations.length); if (!batchOp.isInReplay()) { - doPreMutationHook(batchOp); + cellCount += doPreMutationHook(batchOp); } initialized = true; } - long addedSize = doMiniBatchMutation(batchOp); + long addedSize = doMiniBatchMutation(batchOp, cellCount); long newSize = this.addAndGetGlobalMemstoreSize(addedSize); if (isFlushSize(newSize)) { requestFlush(); @@ -2931,10 +2941,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } - private void doPreMutationHook(BatchOperationInProgress batchOp) + private int doPreMutationHook(BatchOperationInProgress batchOp) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); + int cellCount = 0; if (coprocessorHost != null) { for (int i = 0 ; i < batchOp.operations.length; i++) { Mutation m = batchOp.getMutation(i); @@ -2964,14 +2975,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (!walEdit.isEmpty()) { batchOp.walEditsFromCoprocessors[i] = walEdit; + cellCount += walEdit.size(); walEdit = new WALEdit(); } } } + return cellCount; } @SuppressWarnings("unchecked") - private long doMiniBatchMutation(BatchOperationInProgress batchOp) throws IOException { + private long doMiniBatchMutation(BatchOperationInProgress batchOp, int cellCount) + throws IOException { boolean isInReplay = batchOp.isInReplay(); // variable to note if all Put items are for the same CF -- metrics related boolean putsCfSetConsistent = true; @@ -2983,7 +2997,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Set deletesCfSet = null; long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; - WALEdit walEdit = new WALEdit(isInReplay); + WALEdit walEdit = new WALEdit(cellCount); MultiVersionConcurrencyControl.WriteEntry writeEntry = null; long txid = 0; boolean doRollBackMemstore = false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7eaadc2..75a7efa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -810,8 +810,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean batchContainsPuts = false, batchContainsDelete = false; try { int i = 0; + int cellCount = 0; for (ClientProtos.Action action: mutations) { MutationProto m = action.getMutation(); + cellCount += m.hasAssociatedCellCount()? m.getAssociatedCellCount(): 0; Mutation mutation; if (m.getMutateType() == MutationType.PUT) { mutation = ProtobufUtil.toPut(m, cells); @@ -828,8 +830,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionServer.cacheFlusher.reclaimMemStoreMemory(); } - OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE, - HConstants.NO_NONCE); + OperationStatus[] codes = ((HRegion) region).batchMutate(mArray, HConstants.NO_NONCE, + HConstants.NO_NONCE, cellCount); for (i = 0; i < codes.length; i++) { int index = mutations.get(i).getIndex(); Exception e = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index cea2ee7..1a87447 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -99,7 +99,7 @@ public class WALEdit implements Writable, HeapSize { private final int VERSION_2 = -1; private final boolean isReplay; - private ArrayList cells = new ArrayList(1); + private ArrayList cells = null; public static final WALEdit EMPTY_WALEDIT = new WALEdit(); @@ -118,6 +118,12 @@ public class WALEdit implements Writable, HeapSize { public WALEdit(boolean isReplay) { this.isReplay = isReplay; + cells = new ArrayList(1); + } + + public WALEdit(int cellCount) { + this.isReplay = false; + cells = new ArrayList(cellCount); } /**