.../apache/hadoop/hbase/regionserver/HRegion.java | 27 +++++++++++++++++----- .../hadoop/hbase/regionserver/wal/WALEdit.java | 11 ++++++++- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f03c205..499dec9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2759,6 +2759,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; WALEdit[] walEditsFromCoprocessors; + private int cellCountFromCP; public BatchOperationInProgress(T[] operations) { this.operations = operations; @@ -2775,6 +2776,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public abstract boolean isInReplay(); public abstract long getReplaySequenceId(); + public void addtoCellCountFromCP(int count) { + this.cellCountFromCP += count; + } + + public int getCellCountFromCP() { + return this.cellCountFromCP; + } + public boolean isDone() { return nextIndexToProcess == operations.length; } @@ -2906,17 +2915,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - OperationStatus[] batchMutate(BatchOperationInProgress batchOp) throws IOException { + OperationStatus[] batchMutate(BatchOperationInProgress batchOp) + throws IOException { boolean initialized = false; Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; startRegionOperation(op); + int cellCountFromCP = 0; try { while (!batchOp.isDone()) { if (!batchOp.isInReplay()) { checkReadOnly(); } checkResources(); - if (!initialized) { this.writeRequestsCount.add(batchOp.operations.length); if (!batchOp.isInReplay()) { @@ -2970,6 +2980,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (!walEdit.isEmpty()) { batchOp.walEditsFromCoprocessors[i] = walEdit; + batchOp.addtoCellCountFromCP(walEdit.size()); walEdit = new WALEdit(); } } @@ -2977,7 +2988,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @SuppressWarnings("unchecked") - private long doMiniBatchMutation(BatchOperationInProgress batchOp) throws IOException { + private long doMiniBatchMutation(BatchOperationInProgress batchOp) + throws IOException { boolean isInReplay = batchOp.isInReplay(); // variable to note if all Put items are for the same CF -- metrics related boolean putsCfSetConsistent = true; @@ -2989,7 +3001,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Set deletesCfSet = null; long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; - WALEdit walEdit = new WALEdit(isInReplay); + WALEdit walEdit = null; MultiVersionConcurrencyControl.WriteEntry writeEntry = null; long txid = 0; boolean doRollBackMemstore = false; @@ -3020,7 +3032,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Map> familyMap = mutation.getFamilyCellMap(); // store the family map reference to allow for mutations familyMaps[lastIndexExclusive] = familyMap; - // skip anything that "ran" already if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { @@ -3113,6 +3124,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // ------------------------------------ // STEP 2. Update any LATEST_TIMESTAMP timestamps // ---------------------------------- + int cellCount = batchOp.getCellCountFromCP(); for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) { // skip invalid if (batchOp.retCodeDetails[i].getOperationStatusCode() @@ -3127,8 +3139,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi noOfDeletes++; } rewriteCellTags(familyMaps[i], mutation); + for (List cells : familyMaps[i].values()) { + cellCount += cells.size(); + } } - + walEdit = new WALEdit(cellCount); lock(this.updatesLock.readLock(), numReadyToWrite); locked = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index cea2ee7..346a8ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -99,7 +99,7 @@ public class WALEdit implements Writable, HeapSize { private final int VERSION_2 = -1; private final boolean isReplay; - private ArrayList cells = new ArrayList(1); + private ArrayList cells = null; public static final WALEdit EMPTY_WALEDIT = new WALEdit(); @@ -117,7 +117,16 @@ public class WALEdit implements Writable, HeapSize { } public WALEdit(boolean isReplay) { + this(1, isReplay); + } + + public WALEdit(int cellCount) { + this(cellCount, false); + } + + public WALEdit(int cellCount, boolean isReplay) { this.isReplay = isReplay; + cells = new ArrayList(cellCount); } /**