Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1356566) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1699,15 +1699,14 @@ } /** - * Setup a Delete object with correct timestamps. - * Caller should the row and region locks. - * @param delete + * Setup correct timestamps in the KVs in Delete object. + * Caller should have the row and region locks. + * @param familyMap * @param now * @throws IOException */ - private void prepareDeleteTimestamps(Delete delete, byte[] byteNow) + private void prepareDeleteTimestamps(Map> familyMap, byte[] byteNow) throws IOException { - Map> familyMap = delete.getFamilyMap(); for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); @@ -1776,7 +1775,7 @@ updatesLock.readLock().lock(); try { - prepareDeleteTimestamps(delete, byteNow); + prepareDeleteTimestamps(delete.getFamilyMap(), byteNow); if (writeToWAL) { // write/sync to WAL should happen before we touch memstore. @@ -1903,31 +1902,47 @@ /** * Perform a batch put with no pre-specified locks - * @see HRegion#put(Pair[]) + * @see HRegion#batchMutate(Pair[]) */ public OperationStatus[] put(Put[] puts) throws IOException { @SuppressWarnings("unchecked") - Pair putsAndLocks[] = new Pair[puts.length]; + Pair putsAndLocks[] = new Pair[puts.length]; for (int i = 0; i < puts.length; i++) { - putsAndLocks[i] = new Pair(puts[i], null); + putsAndLocks[i] = new Pair(puts[i], null); } - return put(putsAndLocks); + return batchMutate(putsAndLocks); } /** * Perform a batch of puts. - * * @param putsAndLocks * the list of puts paired with their requested lock IDs. + * @return an array of OperationStatus which internally contains the OperationStatusCode and the + * exceptionMessage if any. + * @throws IOException + * @deprecated Instead use {@link HRegion#batchMutate(Pair[])} + */ + @Deprecated + public OperationStatus[] put(Pair[] putsAndLocks) throws IOException { + Pair[] mutationsAndLocks = new Pair[putsAndLocks.length]; + System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length); + return batchMutate(mutationsAndLocks); + } + + /** + * Perform a batch of mutations. + * It supports only Put and Delete mutations and will ignore other types passed. + * @param mutationsAndLocks + * the list of mutations paired with their requested lock IDs. * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - public OperationStatus[] put( - Pair[] putsAndLocks) throws IOException { - BatchOperationInProgress> batchOp = - new BatchOperationInProgress>(putsAndLocks); + public OperationStatus[] batchMutate( + Pair[] mutationsAndLocks) throws IOException { + BatchOperationInProgress> batchOp = + new BatchOperationInProgress>(mutationsAndLocks); boolean initialized = false; @@ -1941,10 +1956,10 @@ try { if (!initialized) { this.writeRequestsCount.increment(); - doPrePutHook(batchOp); + doPreMutationHook(batchOp); initialized = true; } - long addedSize = doMiniBatchPut(batchOp); + long addedSize = doMiniBatchMutation(batchOp); newSize = this.addAndGetGlobalMemstoreSize(addedSize); } finally { closeRegionOperation(); @@ -1956,18 +1971,32 @@ return batchOp.retCodeDetails; } - private void doPrePutHook(BatchOperationInProgress> batchOp) + private void doPreMutationHook(BatchOperationInProgress> batchOp) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { for (int i = 0; i < batchOp.operations.length; i++) { - Pair nextPair = batchOp.operations[i]; - Put put = nextPair.getFirst(); - if (coprocessorHost.prePut(put, walEdit, put.getWriteToWAL())) { - // pre hook says skip this Put - // mark as success and skip in doMiniBatchPut - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + Pair nextPair = batchOp.operations[i]; + Mutation m = nextPair.getFirst(); + if (m instanceof Put) { + if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { + // pre hook says skip this Put + // mark as success and skip in doMiniBatchMutation + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + } + } else if (m instanceof Delete) { + if (coprocessorHost.preDelete((Delete) m, walEdit, m.getWriteToWAL())) { + // pre hook says skip this Delete + // mark as success and skip in doMiniBatchMutation + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + } + } else { + // In case of passing Append mutations along with the Puts and Deletes in batchMutate + // mark the operation return code as failure so that it will not be considered in + // the doMiniBatchMutation + batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE, + "Put/Delete mutations only supported in batchMutate() now"); } if (!walEdit.isEmpty()) { batchOp.walEditsFromCoprocessors[i] = walEdit; @@ -1977,14 +2006,19 @@ } } + // The mutation will be either a Put or Delete. @SuppressWarnings("unchecked") - private long doMiniBatchPut( - BatchOperationInProgress> batchOp) throws IOException { + private long doMiniBatchMutation( + BatchOperationInProgress> batchOp) throws IOException { - // The set of columnFamilies first seen. - Set cfSet = null; + // The set of columnFamilies first seen for Put. + Set putsCfSet = null; // variable to note if all Put items are for the same CF -- metrics related - boolean cfSetConsistent = true; + boolean putsCfSetConsistent = true; + // The set of columnFamilies first seen for Delete. + Set deletesCfSet = null; + // variable to note if all Delete items are for the same CF -- metrics related + boolean deletesCfSetConsistent = true; long startTimeMs = EnvironmentEdgeManager.currentTimeMillis(); WALEdit walEdit = new WALEdit(); @@ -2002,6 +2036,7 @@ int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; boolean success = false; + int noOfPuts = 0, noOfDeletes = 0; try { // ------------------------------------ // STEP 1. Try to acquire as many locks as we can, and ensure @@ -2010,11 +2045,11 @@ int numReadyToWrite = 0; long now = EnvironmentEdgeManager.currentTimeMillis(); while (lastIndexExclusive < batchOp.operations.length) { - Pair nextPair = batchOp.operations[lastIndexExclusive]; - Put put = nextPair.getFirst(); + Pair nextPair = batchOp.operations[lastIndexExclusive]; + Mutation mutation = nextPair.getFirst(); Integer providedLockId = nextPair.getSecond(); - Map> familyMap = put.getFamilyMap(); + Map> familyMap = mutation.getFamilyMap(); // store the family map reference to allow for mutations familyMaps[lastIndexExclusive] = familyMap; @@ -2025,22 +2060,24 @@ continue; } - // Check the families in the put. If bad, skip this one. try { - checkFamilies(familyMap.keySet()); - checkTimestamps(put, now); + if (mutation instanceof Put) { + checkFamilies(familyMap.keySet()); + checkTimestamps(mutation.getFamilyMap(), now); + } else { + prepareDelete((Delete) mutation); + } } catch (DoNotRetryIOException dnrioe) { - LOG.warn("No such column family in batch put", dnrioe); + LOG.warn("No such column family in batch mutation", dnrioe); batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( OperationStatusCode.SANITY_CHECK_FAILURE, dnrioe.getMessage()); lastIndexExclusive++; continue; } - // If we haven't got any rows in our batch, we should block to // get the next one. boolean shouldBlock = numReadyToWrite == 0; - Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock); + Integer acquiredLockId = getLock(providedLockId, mutation.getRow(), shouldBlock); if (acquiredLockId == null) { // We failed to grab another lock assert !shouldBlock : "Should never fail to get lock when blocking"; @@ -2052,25 +2089,35 @@ lastIndexExclusive++; numReadyToWrite++; - // If Column Families stay consistent through out all of the - // individual puts then metrics can be reported as a mutliput across - // column families in the first put. - if (cfSet == null) { - cfSet = put.getFamilyMap().keySet(); + if (mutation instanceof Put) { + // If Column Families stay consistent through out all of the + // individual puts then metrics can be reported as a mutliput across + // column families in the first put. + if (putsCfSet == null) { + putsCfSet = mutation.getFamilyMap().keySet(); + } else { + putsCfSetConsistent = putsCfSetConsistent + && mutation.getFamilyMap().keySet().equals(putsCfSet); + } } else { - cfSetConsistent = cfSetConsistent && put.getFamilyMap().keySet().equals(cfSet); + if (deletesCfSet == null) { + deletesCfSet = mutation.getFamilyMap().keySet(); + } else { + deletesCfSetConsistent = deletesCfSetConsistent + && mutation.getFamilyMap().keySet().equals(deletesCfSet); + } } } // we should record the timestamp only after we have acquired the rowLock, - // otherwise, newer puts are not guaranteed to have a newer timestamp + // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); - // Nothing to put -- an exception in the above such as NoSuchColumnFamily? + // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? if (numReadyToWrite <= 0) return 0L; - // We've now grabbed as many puts off the list as we can + // We've now grabbed as many mutations off the list as we can // ------------------------------------ // STEP 2. Update any LATEST_TIMESTAMP timestamps @@ -2079,10 +2126,14 @@ // skip invalid if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) continue; - - updateKVTimestamps( - familyMaps[i].values(), - byteNow); + Mutation mutation = batchOp.operations[i].getFirst(); + if (mutation instanceof Put) { + updateKVTimestamps(familyMaps[i].values(), byteNow); + noOfPuts++; + } else { + prepareDeleteTimestamps(familyMaps[i], byteNow); + noOfDeletes++; + } } this.updatesLock.readLock().lock(); @@ -2123,9 +2174,11 @@ } batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; - Put p = batchOp.operations[i].getFirst(); - if (!p.getWriteToWAL()) { - recordPutWithoutWal(p.getFamilyMap()); + Mutation m = batchOp.operations[i].getFirst(); + if (!m.getWriteToWAL()) { + if (m instanceof Put) { + recordPutWithoutWal(m.getFamilyMap()); + } continue; } // Add WAL edits by CP @@ -2141,7 +2194,7 @@ // ------------------------- // STEP 5. Append the edit to WAL. Do not sync wal. // ------------------------- - Put first = batchOp.operations[firstIndex].getFirst(); + Mutation first = batchOp.operations[firstIndex].getFirst(); txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), walEdit, first.getClusterId(), now, this.htableDescriptor); @@ -2177,7 +2230,7 @@ // ------------------------------------ // STEP 9. Run coprocessor post hooks. This should be done after the wal is - // sycned so that the coprocessor contract is adhered to. + // synced so that the coprocessor contract is adhered to. // ------------------------------------ if (coprocessorHost != null) { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -2186,11 +2239,14 @@ != OperationStatusCode.SUCCESS) { continue; } - Put p = batchOp.operations[i].getFirst(); - coprocessorHost.postPut(p, walEdit, p.getWriteToWAL()); + Mutation m = batchOp.operations[i].getFirst(); + if (m instanceof Put) { + coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); + } else { + coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL()); + } } } - success = true; return addedSize; } finally { @@ -2212,14 +2268,26 @@ } // do after lock - final long endTimeMs = EnvironmentEdgeManager.currentTimeMillis(); + final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs; // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. // null will be treated as unknown. - final Set keptCfs = cfSetConsistent ? cfSet : null; - this.opMetrics.updateMultiPutMetrics(keptCfs, endTimeMs - startTimeMs); - + // Total time taken might be involving Puts and Deletes. + // Split the time for puts and deletes based on the total number of Puts and Deletes. + long timeTakenForPuts = 0; + if (noOfPuts > 0) { + // There were some Puts in the batch. + double noOfMutations = noOfPuts + noOfDeletes; + timeTakenForPuts = (long) (netTimeMs * (noOfPuts / noOfMutations)); + final Set keptCfs = putsCfSetConsistent ? putsCfSet : null; + this.opMetrics.updateMultiPutMetrics(keptCfs, timeTakenForPuts); + } + if (noOfDeletes > 0) { + // There were some Deletes in the batch. + final Set keptCfs = deletesCfSetConsistent ? deletesCfSet : null; + this.opMetrics.updateMultiDeleteMetrics(keptCfs, netTimeMs - timeTakenForPuts); + } if (!success) { for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { @@ -2527,10 +2595,10 @@ /** * Remove all the keys listed in the map from the memstore. This method is - * called when a Put has updated memstore but subequently fails to update + * called when a Put/Delete has updated memstore but subequently fails to update * the wal. This method is then invoked to rollback the memstore. */ - private void rollbackMemstore(BatchOperationInProgress> batchOp, + private void rollbackMemstore(BatchOperationInProgress> batchOp, Map>[] familyMaps, int start, int end) { int kvsRolledback = 0; @@ -2571,9 +2639,6 @@ checkFamily(family); } } - private void checkTimestamps(Put p, long now) throws DoNotRetryIOException { - checkTimestamps(p.getFamilyMap(), now); - } private void checkTimestamps(final Map> familyMap, long now) throws DoNotRetryIOException { @@ -4208,7 +4273,7 @@ } else if (m instanceof Delete) { Delete d = (Delete) m; prepareDelete(d); - prepareDeleteTimestamps(d, byteNow); + prepareDeleteTimestamps(d.getFamilyMap(), byteNow); } else { throw new DoNotRetryIOException( "Action must be Put or Delete. But was: " Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1356566) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.MultiAction; import org.apache.hadoop.hbase.client.MultiResponse; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; @@ -2015,15 +2016,15 @@ } @SuppressWarnings("unchecked") - Pair[] putsWithLocks = new Pair[puts.size()]; + Pair[] putsWithLocks = new Pair[puts.size()]; for (Put p : puts) { Integer lock = getLockFromId(p.getLockId()); - putsWithLocks[i++] = new Pair(p, lock); + putsWithLocks[i++] = new Pair(p, lock); } this.requestCount.addAndGet(puts.size()); - OperationStatus codes[] = region.put(putsWithLocks); + OperationStatus codes[] = region.batchMutate(putsWithLocks); for (i = 0; i < codes.length; i++) { if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { return i; @@ -3373,20 +3374,17 @@ // actions in the list. Collections.sort(actionsForRegion); Row action; - List> puts = new ArrayList>(); + List> mutations = new ArrayList>(); for (Action a : actionsForRegion) { action = a.getAction(); int originalIndex = a.getOriginalIndex(); try { - if (action instanceof Delete) { - delete(regionName, (Delete)action); - response.add(regionName, originalIndex, new Result()); + if (action instanceof Delete || action instanceof Put) { + mutations.add(a); } else if (action instanceof Get) { response.add(regionName, originalIndex, get(regionName, (Get)action)); - } else if (action instanceof Put) { - puts.add(a); // wont throw. } else if (action instanceof Exec) { ExecResult result = execCoprocessor(regionName, (Exec)action); response.add(regionName, new Pair( @@ -3415,7 +3413,7 @@ // We do the puts with result.put so we can get the batching efficiency // we so need. All this data munging doesn't seem great, but at least // we arent copying bytes or anything. - if (!puts.isEmpty()) { + if (!mutations.isEmpty()) { try { HRegion region = getRegion(regionName); @@ -3423,30 +3421,30 @@ this.cacheFlusher.reclaimMemStoreMemory(); } - List> putsWithLocks = - Lists.newArrayListWithCapacity(puts.size()); - for (Action a : puts) { - Put p = (Put) a.getAction(); + List> mutationsWithLocks = + Lists.newArrayListWithCapacity(mutations.size()); + for (Action a : mutations) { + Mutation m = (Mutation) a.getAction(); Integer lock; try { - lock = getLockFromId(p.getLockId()); + lock = getLockFromId(m.getLockId()); } catch (UnknownRowLockException ex) { response.add(regionName, a.getOriginalIndex(), ex); continue; } - putsWithLocks.add(new Pair(p, lock)); + mutationsWithLocks.add(new Pair(m, lock)); } - this.requestCount.addAndGet(puts.size()); + this.requestCount.addAndGet(mutations.size()); OperationStatus[] codes = - region.put(putsWithLocks.toArray(new Pair[]{})); + region.batchMutate(mutationsWithLocks.toArray(new Pair[]{})); for( int i = 0 ; i < codes.length ; i++) { OperationStatus code = codes[i]; - Action theAction = puts.get(i); + Action theAction = mutations.get(i); Object result = null; if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) { @@ -3461,7 +3459,7 @@ } } catch (IOException ioe) { // fail all the puts with the ioe in question. - for (Action a: puts) { + for (Action a: mutations) { response.add(regionName, a.getOriginalIndex(), ioe); } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java (revision 1356566) +++ src/main/java/org/apache/hadoop/hbase/regionserver/metrics/OperationMetrics.java (working copy) @@ -44,6 +44,7 @@ private static final String ICV_KEY = "incrementColumnValue_"; private static final String INCREMENT_KEY = "increment_"; private static final String MULTIPUT_KEY = "multiput_"; + private static final String MULTIDELETE_KEY = "multidelete_"; private static final String APPEND_KEY = "append_"; /** Conf key controlling whether we should expose metrics.*/ @@ -100,6 +101,16 @@ } /** + * Update the stats associated with {@link HTable#delete(java.util.List)}. + * + * @param columnFamilies Set of CF's this multidelete is associated with + * @param value the time + */ + public void updateMultiDeleteMetrics(Set columnFamilies, long value) { + doUpdateTimeVarying(columnFamilies, MULTIDELETE_KEY, value); + } + + /** * Update the metrics associated with a {@link Get} * * @param columnFamilies