Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1337133) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1338352) @@ -430,7 +430,7 @@ putsAndLocks.add(pair); } - codes = region.put(putsAndLocks.toArray(new Pair[0])); + codes = region.batchMutateWithLocks(putsAndLocks.toArray(new Pair[0]), "multiput_"); LOG.info("...performed put"); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE : Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1337133) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1338352) @@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HMsg.Type; import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -764,6 +765,7 @@ * Add to the passed msgs messages to pass to the master. * @param msgs Current outboundMsgs array; we'll add messages to this List. */ + // Amit: Warning n^2 loop. Can bring it down to O(n) using a hash map. private void addOutboundMsgs(final List msgs) { if (msgs.isEmpty()) { this.outboundMsgs.drainTo(msgs); @@ -2107,6 +2109,13 @@ @Override public int put(final byte[] regionName, final List puts) throws IOException { + return applyMutations(regionName, puts, "multiput_"); + } + + private int applyMutations(final byte[] regionName, + final List mutations, + String methodName) + throws IOException { checkOpen(); HRegion region = null; try { @@ -2116,16 +2125,17 @@ } @SuppressWarnings("unchecked") - Pair[] putsWithLocks = new Pair[puts.size()]; + Pair[] opWithLocks = new Pair[mutations.size()]; int i = 0; - for (Put p : puts) { + for (Mutation p : mutations) { Integer lock = getLockFromId(p.getLockId()); - putsWithLocks[i++] = new Pair(p, lock); + opWithLocks[i++] = new Pair(p, lock); } - this.requestCount.addAndGet(puts.size()); - OperationStatusCode[] codes = region.put(putsWithLocks); + this.requestCount.addAndGet(mutations.size()); + OperationStatusCode[] codes = region.batchMutateWithLocks(opWithLocks, + methodName); for (i = 0; i < codes.length; i++) { if (codes[i] != OperationStatusCode.SUCCESS) return i; @@ -2383,33 +2393,7 @@ @Override public int delete(final byte[] regionName, final List deletes) throws IOException { - // Count of Deletes processed. - int i = 0; - checkOpen(); - HRegion region = null; - try { - boolean writeToWAL = true; - region = getRegion(regionName); - if (!region.getRegionInfo().isMetaTable()) { - this.cacheFlusher.reclaimMemStoreMemory(); - } - int size = deletes.size(); - Integer[] locks = new Integer[size]; - for (Delete delete: deletes) { - this.requestCount.incrementAndGet(); - locks[i] = getLockFromId(delete.getLockId()); - region.delete(delete, locks[i], writeToWAL); - i++; - } - } catch (WrongRegionException ex) { - LOG.debug("Batch deletes: " + i, ex); - return i; - } catch (NotServingRegionException ex) { - return i; - } catch (Throwable t) { - throw convertThrowableToIOE(cleanup(t)); - } - return -1; + return applyMutations(regionName, deletes, "multidelete_"); } @Override Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1337133) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1338352) @@ -1649,7 +1649,7 @@ /* * @param delete The passed delete is modified by this method. WARNING! */ - private void prepareDelete(Delete delete) throws IOException { + private void prepareDeleteFamilyMap(Delete delete) throws IOException { // Check to see if this is a deleteRow insert if(delete.getFamilyMap().isEmpty()){ for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){ @@ -1707,7 +1707,7 @@ lid = getLock(lockid, row, true); // All edits for the given row (across all column families) must happen atomically. - prepareDelete(delete); + prepareDeleteFamilyMap(delete); delete(delete.getFamilyMap(), writeToWAL); } finally { @@ -1919,23 +1919,25 @@ */ public OperationStatusCode[] put(Put[] puts) throws IOException { @SuppressWarnings("unchecked") - Pair putsAndLocks[] = new Pair[puts.length]; + Pair putsAndLocks[] = new Pair[puts.length]; for (int i = 0; i < puts.length; i++) { - putsAndLocks[i] = new Pair(puts[i], null); + putsAndLocks[i] = new Pair(puts[i], null); } - return put(putsAndLocks); + return batchMutateWithLocks(putsAndLocks, "multiput_"); } /** * Perform a batch of puts. * @param putsAndLocks the list of puts paired with their requested lock IDs. + * @param methodName "multiput_/multidelete_" to update metrics correctly. * @throws IOException */ - public OperationStatusCode[] put(Pair[] putsAndLocks) throws IOException { + public OperationStatusCode[] batchMutateWithLocks(Pair[] putsAndLocks, + String methodName) throws IOException { this.writeRequests.incrTotalRequstCount(); - BatchOperationInProgress> batchOp = - new BatchOperationInProgress>(putsAndLocks); + BatchOperationInProgress> batchOp = + new BatchOperationInProgress>(putsAndLocks); while (!batchOp.isDone()) { checkReadOnly(); @@ -1944,7 +1946,7 @@ long newSize; splitsAndClosesLock.readLock().lock(); try { - long addedSize = doMiniBatchPut(batchOp); + long addedSize = doMiniBatchOp(batchOp, methodName); newSize = this.incMemoryUsage(addedSize); } finally { splitsAndClosesLock.readLock().unlock(); @@ -1957,7 +1959,8 @@ return batchOp.retCodes; } - private long doMiniBatchPut(BatchOperationInProgress> batchOp) throws IOException { + private long doMiniBatchOp(BatchOperationInProgress> batchOp, + String methodNameForMetricsUpdate) throws IOException { String signature = null; // variable to note if all Put items are for the same CF -- metrics related boolean isSignatureClear = true; @@ -1979,25 +1982,27 @@ // ---------------------------------- int numReadyToWrite = 0; while (lastIndexExclusive < batchOp.operations.length) { - Pair nextPair = batchOp.operations[lastIndexExclusive]; - Put put = nextPair.getFirst(); + Pair nextPair = batchOp.operations[lastIndexExclusive]; + Mutation op = nextPair.getFirst(); Integer providedLockId = nextPair.getSecond(); // Check the families in the put. If bad, skip this one. - try { - checkFamilies(put.getFamilyMap().keySet()); - checkTimestamps(put, now); - } catch (DoNotRetryIOException dnrioe) { - LOG.warn("Sanity check error in batch put", dnrioe); - batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.SANITY_CHECK_FAILURE; - lastIndexExclusive++; - continue; + if (op instanceof Put) { + try { + checkFamilies(op.getFamilyMap().keySet()); + checkTimestamps(op, now); + } catch (DoNotRetryIOException dnrioe) { + LOG.warn("Sanity check error in batch processing", dnrioe); + batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.SANITY_CHECK_FAILURE; + lastIndexExclusive++; + continue; + } } // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; - Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock); + Integer acquiredLockId = getLock(providedLockId, op.getRow(), shouldBlock); if (acquiredLockId == null) { // We failed to grab another lock assert !shouldBlock : "Should never fail to get lock when blocking"; @@ -2014,13 +2019,13 @@ // all else, designate failure signature and mark as unclear if (null == signature) { signature = SchemaMetrics.generateSchemaMetricsPrefix( - this.getTableDesc().getNameAsString(), put.getFamilyMap() + this.getTableDesc().getNameAsString(), op.getFamilyMap() .keySet()); } else { if (isSignatureClear) { if (!signature.equals(SchemaMetrics.generateSchemaMetricsPrefix( this.getTableDesc().getNameAsString(), - put.getFamilyMap().keySet()))) { + op.getFamilyMap().keySet()))) { isSignatureClear = false; signature = SchemaMetrics.CF_UNKNOWN_PREFIX; } @@ -2030,7 +2035,6 @@ // We've now grabbed as many puts off the list as we can assert numReadyToWrite > 0; - this.updatesLock.readLock().lock(); locked = true; @@ -2038,9 +2042,17 @@ // STEP 2. Update any LATEST_TIMESTAMP timestamps // ---------------------------------- for (int i = firstIndex; i < lastIndexExclusive; i++) { - updateKVTimestamps( - batchOp.operations[i].getFirst().getFamilyMap().values(), - byteNow); + Mutation op = batchOp.operations[i].getFirst(); + + if (op instanceof Put) { + updateKVTimestamps( + op.getFamilyMap().values(), + byteNow); + } + else if (op instanceof Delete) { + prepareDeleteFamilyMap((Delete)op); + prepareDeleteTimestamps(op.getFamilyMap(), byteNow); + } } // ------------------------------------ @@ -2051,9 +2063,9 @@ // Skip puts that were determined to be invalid during preprocessing if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue; - Put p = batchOp.operations[i].getFirst(); - if (!p.getWriteToWAL()) continue; - addFamilyMapToWALEdit(p.getFamilyMap(), walEdit); + Mutation op = batchOp.operations[i].getFirst(); + if (!op.getWriteToWAL()) continue; + addFamilyMapToWALEdit(op.getFamilyMap(), walEdit); } // Append the edit to WAL @@ -2063,12 +2075,13 @@ // ------------------------------------ // STEP 4. Write back to memstore // ---------------------------------- + long addedSize = 0; for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue; - Put p = batchOp.operations[i].getFirst(); - addedSize += applyFamilyMapToMemstore(p.getFamilyMap()); + Mutation op = batchOp.operations[i].getFirst(); + addedSize += applyFamilyMapToMemstore(op.getFamilyMap()); batchOp.retCodes[i] = OperationStatusCode.SUCCESS; } success = true; @@ -2086,7 +2099,7 @@ if (null == signature) { signature = SchemaMetrics.CF_BAD_FAMILY_PREFIX; } - HRegion.incrTimeVaryingMetric(signature + "multiput_", after - now); + HRegion.incrTimeVaryingMetric(signature + methodNameForMetricsUpdate, after - now); if (!success) { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -2154,7 +2167,7 @@ put(((Put)w).getFamilyMap(), writeToWAL); } else { Delete d = (Delete)w; - prepareDelete(d); + prepareDeleteFamilyMap(d); delete(d.getFamilyMap(), writeToWAL); } return true; @@ -2372,8 +2385,8 @@ checkFamily(family); } } - private void checkTimestamps(Put p, long now) throws DoNotRetryIOException { - checkTimestamps(p.getFamilyMap(), now); + private void checkTimestamps(Mutation op, long now) throws DoNotRetryIOException { + checkTimestamps(op.getFamilyMap(), now); } private void checkTimestamps(final Map> familyMap, @@ -3713,7 +3726,7 @@ updateKVTimestamps(familyMap.values(), byteNow); } else if (m instanceof Delete) { Delete d = (Delete) m; - prepareDelete(d); + prepareDeleteFamilyMap(d); Map> familyMap = m.getFamilyMap(); prepareDeleteTimestamps(familyMap, byteNow); } else {