Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1337133)
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1338352)
@@ -430,7 +430,7 @@
putsAndLocks.add(pair);
}
- codes = region.put(putsAndLocks.toArray(new Pair[0]));
+ codes = region.batchMutateWithLocks(putsAndLocks.toArray(new Pair[0]), "multiput_");
LOG.info("...performed put");
for (int i = 0; i < 10; i++) {
assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1337133)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1338352)
@@ -91,6 +91,7 @@
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HMsg.Type;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -764,6 +765,7 @@
* Add to the passed msgs messages to pass to the master.
* @param msgs Current outboundMsgs array; we'll add messages to this List.
*/
+ // Amit: Warning n^2 loop. Can bring it down to O(n) using a hash map.
private void addOutboundMsgs(final List msgs) {
if (msgs.isEmpty()) {
this.outboundMsgs.drainTo(msgs);
@@ -2107,6 +2109,13 @@
@Override
public int put(final byte[] regionName, final List puts)
throws IOException {
+ return applyMutations(regionName, puts, "multiput_");
+ }
+
+ private int applyMutations(final byte[] regionName,
+ final List extends Mutation> mutations,
+ String methodName)
+ throws IOException {
checkOpen();
HRegion region = null;
try {
@@ -2116,16 +2125,17 @@
}
@SuppressWarnings("unchecked")
- Pair[] putsWithLocks = new Pair[puts.size()];
+ Pair[] opWithLocks = new Pair[mutations.size()];
int i = 0;
- for (Put p : puts) {
+ for (Mutation p : mutations) {
Integer lock = getLockFromId(p.getLockId());
- putsWithLocks[i++] = new Pair(p, lock);
+ opWithLocks[i++] = new Pair(p, lock);
}
- this.requestCount.addAndGet(puts.size());
- OperationStatusCode[] codes = region.put(putsWithLocks);
+ this.requestCount.addAndGet(mutations.size());
+ OperationStatusCode[] codes = region.batchMutateWithLocks(opWithLocks,
+ methodName);
for (i = 0; i < codes.length; i++) {
if (codes[i] != OperationStatusCode.SUCCESS)
return i;
@@ -2383,33 +2393,7 @@
@Override
public int delete(final byte[] regionName, final List deletes)
throws IOException {
- // Count of Deletes processed.
- int i = 0;
- checkOpen();
- HRegion region = null;
- try {
- boolean writeToWAL = true;
- region = getRegion(regionName);
- if (!region.getRegionInfo().isMetaTable()) {
- this.cacheFlusher.reclaimMemStoreMemory();
- }
- int size = deletes.size();
- Integer[] locks = new Integer[size];
- for (Delete delete: deletes) {
- this.requestCount.incrementAndGet();
- locks[i] = getLockFromId(delete.getLockId());
- region.delete(delete, locks[i], writeToWAL);
- i++;
- }
- } catch (WrongRegionException ex) {
- LOG.debug("Batch deletes: " + i, ex);
- return i;
- } catch (NotServingRegionException ex) {
- return i;
- } catch (Throwable t) {
- throw convertThrowableToIOE(cleanup(t));
- }
- return -1;
+ return applyMutations(regionName, deletes, "multidelete_");
}
@Override
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1337133)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1338352)
@@ -1649,7 +1649,7 @@
/*
* @param delete The passed delete is modified by this method. WARNING!
*/
- private void prepareDelete(Delete delete) throws IOException {
+ private void prepareDeleteFamilyMap(Delete delete) throws IOException {
// Check to see if this is a deleteRow insert
if(delete.getFamilyMap().isEmpty()){
for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
@@ -1707,7 +1707,7 @@
lid = getLock(lockid, row, true);
// All edits for the given row (across all column families) must happen atomically.
- prepareDelete(delete);
+ prepareDeleteFamilyMap(delete);
delete(delete.getFamilyMap(), writeToWAL);
} finally {
@@ -1919,23 +1919,25 @@
*/
public OperationStatusCode[] put(Put[] puts) throws IOException {
@SuppressWarnings("unchecked")
- Pair putsAndLocks[] = new Pair[puts.length];
+ Pair putsAndLocks[] = new Pair[puts.length];
for (int i = 0; i < puts.length; i++) {
- putsAndLocks[i] = new Pair(puts[i], null);
+ putsAndLocks[i] = new Pair(puts[i], null);
}
- return put(putsAndLocks);
+ return batchMutateWithLocks(putsAndLocks, "multiput_");
}
/**
* Perform a batch of puts.
* @param putsAndLocks the list of puts paired with their requested lock IDs.
+ * @param methodName "multiput_/multidelete_" to update metrics correctly.
* @throws IOException
*/
- public OperationStatusCode[] put(Pair[] putsAndLocks) throws IOException {
+ public OperationStatusCode[] batchMutateWithLocks(Pair[] putsAndLocks,
+ String methodName) throws IOException {
this.writeRequests.incrTotalRequstCount();
- BatchOperationInProgress> batchOp =
- new BatchOperationInProgress>(putsAndLocks);
+ BatchOperationInProgress> batchOp =
+ new BatchOperationInProgress>(putsAndLocks);
while (!batchOp.isDone()) {
checkReadOnly();
@@ -1944,7 +1946,7 @@
long newSize;
splitsAndClosesLock.readLock().lock();
try {
- long addedSize = doMiniBatchPut(batchOp);
+ long addedSize = doMiniBatchOp(batchOp, methodName);
newSize = this.incMemoryUsage(addedSize);
} finally {
splitsAndClosesLock.readLock().unlock();
@@ -1957,7 +1959,8 @@
return batchOp.retCodes;
}
- private long doMiniBatchPut(BatchOperationInProgress> batchOp) throws IOException {
+ private long doMiniBatchOp(BatchOperationInProgress> batchOp,
+ String methodNameForMetricsUpdate) throws IOException {
String signature = null;
// variable to note if all Put items are for the same CF -- metrics related
boolean isSignatureClear = true;
@@ -1979,25 +1982,27 @@
// ----------------------------------
int numReadyToWrite = 0;
while (lastIndexExclusive < batchOp.operations.length) {
- Pair nextPair = batchOp.operations[lastIndexExclusive];
- Put put = nextPair.getFirst();
+ Pair nextPair = batchOp.operations[lastIndexExclusive];
+ Mutation op = nextPair.getFirst();
Integer providedLockId = nextPair.getSecond();
// Check the families in the put. If bad, skip this one.
- try {
- checkFamilies(put.getFamilyMap().keySet());
- checkTimestamps(put, now);
- } catch (DoNotRetryIOException dnrioe) {
- LOG.warn("Sanity check error in batch put", dnrioe);
- batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.SANITY_CHECK_FAILURE;
- lastIndexExclusive++;
- continue;
+ if (op instanceof Put) {
+ try {
+ checkFamilies(op.getFamilyMap().keySet());
+ checkTimestamps(op, now);
+ } catch (DoNotRetryIOException dnrioe) {
+ LOG.warn("Sanity check error in batch processing", dnrioe);
+ batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.SANITY_CHECK_FAILURE;
+ lastIndexExclusive++;
+ continue;
+ }
}
// If we haven't got any rows in our batch, we should block to
// get the next one.
boolean shouldBlock = numReadyToWrite == 0;
- Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock);
+ Integer acquiredLockId = getLock(providedLockId, op.getRow(), shouldBlock);
if (acquiredLockId == null) {
// We failed to grab another lock
assert !shouldBlock : "Should never fail to get lock when blocking";
@@ -2014,13 +2019,13 @@
// all else, designate failure signature and mark as unclear
if (null == signature) {
signature = SchemaMetrics.generateSchemaMetricsPrefix(
- this.getTableDesc().getNameAsString(), put.getFamilyMap()
+ this.getTableDesc().getNameAsString(), op.getFamilyMap()
.keySet());
} else {
if (isSignatureClear) {
if (!signature.equals(SchemaMetrics.generateSchemaMetricsPrefix(
this.getTableDesc().getNameAsString(),
- put.getFamilyMap().keySet()))) {
+ op.getFamilyMap().keySet()))) {
isSignatureClear = false;
signature = SchemaMetrics.CF_UNKNOWN_PREFIX;
}
@@ -2030,7 +2035,6 @@
// We've now grabbed as many puts off the list as we can
assert numReadyToWrite > 0;
-
this.updatesLock.readLock().lock();
locked = true;
@@ -2038,9 +2042,17 @@
// STEP 2. Update any LATEST_TIMESTAMP timestamps
// ----------------------------------
for (int i = firstIndex; i < lastIndexExclusive; i++) {
- updateKVTimestamps(
- batchOp.operations[i].getFirst().getFamilyMap().values(),
- byteNow);
+ Mutation op = batchOp.operations[i].getFirst();
+
+ if (op instanceof Put) {
+ updateKVTimestamps(
+ op.getFamilyMap().values(),
+ byteNow);
+ }
+ else if (op instanceof Delete) {
+ prepareDeleteFamilyMap((Delete)op);
+ prepareDeleteTimestamps(op.getFamilyMap(), byteNow);
+ }
}
// ------------------------------------
@@ -2051,9 +2063,9 @@
// Skip puts that were determined to be invalid during preprocessing
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
- Put p = batchOp.operations[i].getFirst();
- if (!p.getWriteToWAL()) continue;
- addFamilyMapToWALEdit(p.getFamilyMap(), walEdit);
+ Mutation op = batchOp.operations[i].getFirst();
+ if (!op.getWriteToWAL()) continue;
+ addFamilyMapToWALEdit(op.getFamilyMap(), walEdit);
}
// Append the edit to WAL
@@ -2063,12 +2075,13 @@
// ------------------------------------
// STEP 4. Write back to memstore
// ----------------------------------
+
long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
- Put p = batchOp.operations[i].getFirst();
- addedSize += applyFamilyMapToMemstore(p.getFamilyMap());
+ Mutation op = batchOp.operations[i].getFirst();
+ addedSize += applyFamilyMapToMemstore(op.getFamilyMap());
batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
}
success = true;
@@ -2086,7 +2099,7 @@
if (null == signature) {
signature = SchemaMetrics.CF_BAD_FAMILY_PREFIX;
}
- HRegion.incrTimeVaryingMetric(signature + "multiput_", after - now);
+ HRegion.incrTimeVaryingMetric(signature + methodNameForMetricsUpdate, after - now);
if (!success) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
@@ -2154,7 +2167,7 @@
put(((Put)w).getFamilyMap(), writeToWAL);
} else {
Delete d = (Delete)w;
- prepareDelete(d);
+ prepareDeleteFamilyMap(d);
delete(d.getFamilyMap(), writeToWAL);
}
return true;
@@ -2372,8 +2385,8 @@
checkFamily(family);
}
}
- private void checkTimestamps(Put p, long now) throws DoNotRetryIOException {
- checkTimestamps(p.getFamilyMap(), now);
+ private void checkTimestamps(Mutation op, long now) throws DoNotRetryIOException {
+ checkTimestamps(op.getFamilyMap(), now);
}
private void checkTimestamps(final Map> familyMap,
@@ -3713,7 +3726,7 @@
updateKVTimestamps(familyMap.values(), byteNow);
} else if (m instanceof Delete) {
Delete d = (Delete) m;
- prepareDelete(d);
+ prepareDeleteFamilyMap(d);
Map> familyMap = m.getFamilyMap();
prepareDeleteTimestamps(familyMap, byteNow);
} else {