Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1354813) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1778,15 +1778,14 @@ } /** - * Setup a Delete object with correct timestamps. - * Caller should the row and region locks. - * @param delete + * Setup correct timestamps in the KVs in Delete object. + * Caller should have the row and region locks. + * @param familyMap * @param now * @throws IOException */ - void prepareDeleteTimestamps(Delete delete, byte[] byteNow) + void prepareDeleteTimestamps(Map> familyMap, byte[] byteNow) throws IOException { - Map> familyMap = delete.getFamilyMap(); for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); @@ -1855,7 +1854,7 @@ updatesLock.readLock().lock(); try { - prepareDeleteTimestamps(delete, byteNow); + prepareDeleteTimestamps(delete.getFamilyMap(), byteNow); if (writeToWAL) { // write/sync to WAL should happen before we touch memstore. @@ -1986,27 +1985,27 @@ */ public OperationStatus[] put(Put[] puts) throws IOException { @SuppressWarnings("unchecked") - Pair putsAndLocks[] = new Pair[puts.length]; + Pair putsAndLocks[] = new Pair[puts.length]; for (int i = 0; i < puts.length; i++) { - putsAndLocks[i] = new Pair(puts[i], null); + putsAndLocks[i] = new Pair(puts[i], null); } - return put(putsAndLocks); + return batchMutate(putsAndLocks); } /** - * Perform a batch of puts. + * Perform a batch of mutations. It can involve Puts and Deletes * * @param putsAndLocks - * the list of puts paired with their requested lock IDs. + * the list of mutations paired with their requested lock IDs. * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - public OperationStatus[] put( - Pair[] putsAndLocks) throws IOException { - BatchOperationInProgress> batchOp = - new BatchOperationInProgress>(putsAndLocks); + public OperationStatus[] batchMutate( + Pair[] mutationsAndLocks) throws IOException { + BatchOperationInProgress> batchOp = + new BatchOperationInProgress>(mutationsAndLocks); boolean initialized = false; @@ -2020,10 +2019,10 @@ try { if (!initialized) { this.writeRequestsCount.increment(); - doPrePutHook(batchOp); + doPreMutationHook(batchOp); initialized = true; } - long addedSize = doMiniBatchPut(batchOp); + long addedSize = doMiniBatchMutation(batchOp); newSize = this.addAndGetGlobalMemstoreSize(addedSize); } finally { closeRegionOperation(); @@ -2035,18 +2034,26 @@ return batchOp.retCodeDetails; } - private void doPrePutHook(BatchOperationInProgress> batchOp) + private void doPreMutationHook(BatchOperationInProgress> batchOp) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { for (int i = 0 ; i < batchOp.operations.length; i++) { - Pair nextPair = batchOp.operations[i]; - Put put = nextPair.getFirst(); - if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) { - // pre hook says skip this Put - // mark as success and skip in doMiniBatchPut - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + Pair nextPair = batchOp.operations[i]; + Mutation m = nextPair.getFirst(); + if (m instanceof Put) { + if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { + // pre hook says skip this Put + // mark as success and skip in doMiniBatchMutation + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + } + } else { + if (coprocessorHost.preDelete((Delete) m, walEdit, m.getWriteToWAL())) { + // pre hook says skip this Delete + // mark as success and skip in doMiniBatchMutation + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + } } if (!walEdit.isEmpty()) { batchOp.walEditsFromCoprocessors[i] = walEdit; @@ -2058,15 +2065,18 @@ @SuppressWarnings("unchecked") - private long doMiniBatchPut( - BatchOperationInProgress> batchOp) throws IOException { + private long doMiniBatchMutation( + BatchOperationInProgress> batchOp) throws IOException { // variable to note if all Put items are for the same CF -- metrics related - boolean cfSetConsistent = true; + boolean putsCfSetConsistent = true; + //The set of columnFamilies first seen for Put. + Set putsCfSet = null; + // variable to note if all Delete items are for the same CF -- metrics related + boolean deletesCfSetConsistent = true; + //The set of columnFamilies first seen for Delete. + Set deletesCfSet = null; - //The set of columnFamilies first seen. - Set cfSet = null; - WALEdit walEdit = new WALEdit(); long startTimeMs = EnvironmentEdgeManager.currentTimeMillis(); @@ -2085,6 +2095,7 @@ int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; boolean success = false; + int noOfPuts = 0, noOfDeletes = 0; try { // ------------------------------------ // STEP 1. Try to acquire as many locks as we can, and ensure @@ -2093,11 +2104,12 @@ int numReadyToWrite = 0; long now = EnvironmentEdgeManager.currentTimeMillis(); while (lastIndexExclusive < batchOp.operations.length) { - Pair nextPair = batchOp.operations[lastIndexExclusive]; - Put put = nextPair.getFirst(); + Pair nextPair = batchOp.operations[lastIndexExclusive]; + Mutation mutation = nextPair.getFirst(); + boolean isPutMutation = mutation instanceof Put; Integer providedLockId = nextPair.getSecond(); - Map> familyMap = put.getFamilyMap(); + Map> familyMap = mutation.getFamilyMap(); // store the family map reference to allow for mutations familyMaps[lastIndexExclusive] = familyMap; @@ -2108,22 +2120,25 @@ continue; } - // Check the families in the put. If bad, skip this one. try { - checkFamilies(familyMap.keySet()); - checkTimestamps(put, now); + if (isPutMutation) { + // Check the families in the put. If bad, skip this one. + checkFamilies(familyMap.keySet()); + checkTimestamps(mutation.getFamilyMap(), now); + } else { + prepareDelete((Delete) mutation); + } } catch (DoNotRetryIOException dnrioe) { - LOG.warn("No such column family in batch put", dnrioe); + LOG.warn("No such column family in batch mutation", dnrioe); batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( OperationStatusCode.SANITY_CHECK_FAILURE, dnrioe.getMessage()); lastIndexExclusive++; continue; } - // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; - Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock); + Integer acquiredLockId = getLock(providedLockId, mutation.getRow(), shouldBlock); if (acquiredLockId == null) { // We failed to grab another lock assert !shouldBlock : "Should never fail to get lock when blocking"; @@ -2135,25 +2150,35 @@ lastIndexExclusive++; numReadyToWrite++; - //If Column Families stay consistent through out all of the - //individual puts then metrics can be reported as a mutliput across - //column families in the first put. - if (cfSet == null) { - cfSet = put.getFamilyMap().keySet(); + if (isPutMutation) { + // If Column Families stay consistent through out all of the + // individual puts then metrics can be reported as a mutliput across + // column families in the first put. + if (putsCfSet == null) { + putsCfSet = mutation.getFamilyMap().keySet(); + } else { + putsCfSetConsistent = putsCfSetConsistent + && mutation.getFamilyMap().keySet().equals(putsCfSet); + } } else { - cfSetConsistent = cfSetConsistent && put.getFamilyMap().keySet().equals(cfSet); + if (deletesCfSet == null) { + deletesCfSet = mutation.getFamilyMap().keySet(); + } else { + deletesCfSetConsistent = deletesCfSetConsistent + && mutation.getFamilyMap().keySet().equals(deletesCfSet); + } } } // we should record the timestamp only after we have acquired the rowLock, - // otherwise, newer puts are not guaranteed to have a newer timestamp + // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); - // Nothing to put -- an exception in the above such as NoSuchColumnFamily? + // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? if (numReadyToWrite <= 0) return 0L; - // We've now grabbed as many puts off the list as we can + // We've now grabbed as many mutations off the list as we can // ------------------------------------ // STEP 2. Update any LATEST_TIMESTAMP timestamps @@ -2163,9 +2188,14 @@ if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) continue; - updateKVTimestamps( - familyMaps[i].values(), - byteNow); + Mutation mutation = batchOp.operations[i].getFirst(); + if (mutation instanceof Put) { + updateKVTimestamps(familyMaps[i].values(), byteNow); + noOfPuts++; + } else { + prepareDeleteTimestamps(familyMaps[i], byteNow); + noOfDeletes++; + } } this.updatesLock.readLock().lock(); @@ -2206,9 +2236,11 @@ } batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; - Put p = batchOp.operations[i].getFirst(); - if (!p.getWriteToWAL()) { - recordPutWithoutWal(p.getFamilyMap()); + Mutation m = batchOp.operations[i].getFirst(); + if (!m.getWriteToWAL()) { + if (m instanceof Put) { + recordPutWithoutWal(m.getFamilyMap()); + } continue; } // Add WAL edits by CP @@ -2225,7 +2257,7 @@ // ------------------------- // STEP 5. Append the edit to WAL. Do not sync wal. // ------------------------- - Put first = batchOp.operations[firstIndex].getFirst(); + Mutation first = batchOp.operations[firstIndex].getFirst(); txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), walEdit, first.getClusterId(), now, this.htableDescriptor); @@ -2261,7 +2293,7 @@ // ------------------------------------ // STEP 9. Run coprocessor post hooks. This should be done after the wal is - // sycned so that the coprocessor contract is adhered to. + // synced so that the coprocessor contract is adhered to. // ------------------------------------ if (coprocessorHost != null) { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -2270,8 +2302,12 @@ != OperationStatusCode.SUCCESS) { continue; } - Put p = batchOp.operations[i].getFirst(); - coprocessorHost.postPut(p, walEdit, p.getWriteToWAL()); + Mutation m = batchOp.operations[i].getFirst(); + if (m instanceof Put) { + coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); + } else { + coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL()); + } } } @@ -2296,14 +2332,26 @@ } // do after lock - final long endTimeMs = EnvironmentEdgeManager.currentTimeMillis(); + final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis() - startTimeMs; - //See if the column families were consistent through the whole thing. - //if they were then keep them. If they were not then pass a null. - //null will be treated as unknown. - final Set keptCfs = cfSetConsistent ? cfSet : null; - this.opMetrics.updateMultiPutMetrics(keptCfs, endTimeMs - startTimeMs); - + // See if the column families were consistent through the whole thing. + // if they were then keep them. If they were not then pass a null. + // null will be treated as unknown. + // Total time taken might be involving Puts and Deletes. + // Split the time for puts and deletes based on the total number of Puts and Deletes. + long timeTakenForPuts = 0; + if (noOfPuts > 0) { + // There were some Puts in the batch. + double noOfMutations = noOfPuts + noOfDeletes; + timeTakenForPuts = (long) (netTimeMs * (noOfPuts / noOfMutations)); + final Set keptCfs = putsCfSetConsistent ? putsCfSet : null; + this.opMetrics.updateMultiPutMetrics(keptCfs, timeTakenForPuts); + } + if (noOfDeletes > 0) { + // There were some Deletes in the batch. + final Set keptCfs = deletesCfSetConsistent ? deletesCfSet : null; + this.opMetrics.updateMultiDeleteMetrics(keptCfs, netTimeMs - timeTakenForPuts); + } if (!success) { for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { @@ -2612,10 +2660,10 @@ /** * Remove all the keys listed in the map from the memstore. This method is - * called when a Put has updated memstore but subequently fails to update + * called when a Put/Delete has updated memstore but subequently fails to update * the wal. This method is then invoked to rollback the memstore. */ - private void rollbackMemstore(BatchOperationInProgress> batchOp, + private void rollbackMemstore(BatchOperationInProgress> batchOp, Map>[] familyMaps, int start, int end) { int kvsRolledback = 0; @@ -2656,9 +2704,6 @@ checkFamily(family); } } - private void checkTimestamps(Put p, long now) throws DoNotRetryIOException { - checkTimestamps(p.getFamilyMap(), now); - } void checkTimestamps(final Map> familyMap, long now) throws DoNotRetryIOException { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1354813) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowMutations; @@ -3224,7 +3225,7 @@ mutateRows(region, mutates); } else { ActionResult.Builder resultBuilder = null; - List puts = new ArrayList(); + List mutates = new ArrayList(); for (ClientProtos.MultiAction actionUnion : request.getActionList()) { requestCount.incrementAndGet(); try { @@ -3239,10 +3240,10 @@ } else if (actionUnion.hasMutate()) { Mutate mutate = actionUnion.getMutate(); MutateType type = mutate.getMutateType(); - if (type != MutateType.PUT) { - if (!puts.isEmpty()) { - put(builder, region, puts); - puts.clear(); + if (type != MutateType.PUT && type != MutateType.DELETE) { + if (!mutates.isEmpty()) { + doBatchOp(builder, region, mutates); + mutates.clear(); } else if (!region.getRegionInfo().isMetaTable()) { cacheFlusher.reclaimMemStoreMemory(); } @@ -3256,17 +3257,13 @@ r = increment(region, mutate); break; case PUT: - puts.add(mutate); + mutates.add(mutate); break; case DELETE: - Delete delete = ProtobufUtil.toDelete(mutate); - Integer lock = getLockFromId(delete.getLockId()); - region.delete(delete, lock, delete.getWriteToWAL()); - r = new Result(); + mutates.add(mutate); break; - default: - throw new DoNotRetryIOException( - "Unsupported mutate type: " + type.name()); + default: + throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); } if (r != null) { result = ProtobufUtil.toResult(r); @@ -3294,8 +3291,8 @@ builder.addResult(ResponseConverter.buildActionResult(ie)); } } - if (!puts.isEmpty()) { - put(builder, region, puts); + if (!mutates.isEmpty()) { + doBatchOp(builder, region, mutates); } } return builder.build(); @@ -3755,16 +3752,16 @@ } /** - * Execute a list of put mutations. + * Execute a list of Put/Delete mutations. * * @param builder * @param region - * @param puts + * @param mutates */ - protected void put(final MultiResponse.Builder builder, - final HRegion region, final List puts) { + protected void doBatchOp(final MultiResponse.Builder builder, + final HRegion region, final List mutates) { @SuppressWarnings("unchecked") - Pair[] putsWithLocks = new Pair[puts.size()]; + Pair[] mutationsWithLocks = new Pair[mutates.size()]; try { ActionResult.Builder resultBuilder = ActionResult.newBuilder(); @@ -3773,19 +3770,24 @@ ActionResult result = resultBuilder.build(); int i = 0; - for (Mutate put : puts) { - Put p = ProtobufUtil.toPut(put); - Integer lock = getLockFromId(p.getLockId()); - putsWithLocks[i++] = new Pair(p, lock); + for (Mutate m : mutates) { + Mutation mutation = null; + if (m.getMutateType() == MutateType.PUT) { + mutation = ProtobufUtil.toPut(m); + } else { + mutation = ProtobufUtil.toDelete(m); + } + Integer lock = getLockFromId(mutation.getLockId()); + mutationsWithLocks[i++] = new Pair(mutation, lock); builder.addResult(result); } - requestCount.addAndGet(puts.size()); + requestCount.addAndGet(mutates.size()); if (!region.getRegionInfo().isMetaTable()) { cacheFlusher.reclaimMemStoreMemory(); } - OperationStatus codes[] = region.put(putsWithLocks); + OperationStatus codes[] = region.batchMutate(mutationsWithLocks); for (i = 0; i < codes.length; i++) { if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { result = ResponseConverter.buildActionResult( @@ -3795,7 +3797,7 @@ } } catch (IOException ie) { ActionResult result = ResponseConverter.buildActionResult(ie); - for (int i = 0, n = puts.size(); i < n; i++) { + for (int i = 0, n = mutates.size(); i < n; i++) { builder.setResult(i, result); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java (revision 1354813) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java (working copy) @@ -46,6 +46,7 @@ private static final String ICV_KEY = "incrementColumnValue_"; private static final String INCREMENT_KEY = "increment_"; private static final String MULTIPUT_KEY = "multiput_"; + private static final String MULTIDELETE_KEY = "multidelete_"; private static final String APPEND_KEY = "append_"; /** Conf key controlling whether we should expose metrics.*/ @@ -102,6 +103,16 @@ } /** + * Update the stats associated with {@link HTable#delete(java.util.List)}. + * + * @param columnFamilies Set of CF's this multidelete is associated with + * @param value the time + */ + public void updateMultiDeleteMetrics(Set columnFamilies, long value) { + doUpdateTimeVarying(columnFamilies, MULTIDELETE_KEY, value); + } + + /** * Update the metrics associated with a {@link Get} * * @param columnFamilies Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java (revision 1354813) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java (working copy) @@ -69,7 +69,7 @@ } else if (m instanceof Delete) { Delete d = (Delete) m; region.prepareDelete(d); - region.prepareDeleteTimestamps(d, byteNow); + region.prepareDeleteTimestamps(d.getFamilyMap(), byteNow); } else { throw new DoNotRetryIOException( "Action must be Put or Delete. But was: " Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1354813) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -610,14 +611,14 @@ LOG.info("Nexta, a batch put which uses an already-held lock"); lockedRow = region.obtainRowLock(Bytes.toBytes("row_2")); LOG.info("...obtained row lock"); - List> putsAndLocks = Lists.newArrayList(); + List> putsAndLocks = Lists.newArrayList(); for (int i = 0; i < 10; i++) { - Pair pair = new Pair(puts[i], null); + Pair pair = new Pair(puts[i], null); if (i == 2) pair.setSecond(lockedRow); putsAndLocks.add(pair); } - codes = region.put(putsAndLocks.toArray(new Pair[0])); + codes = region.batchMutate(putsAndLocks.toArray(new Pair[0])); LOG.info("...performed put"); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :