From afb0b11323f669c6d88b2acd085f7c3513b31796 Mon Sep 17 00:00:00 2001 From: Umesh Agashe Date: Wed, 27 Sep 2017 15:17:35 -0700 Subject: [PATCH] HBASE-18703 WIP refactor doMiniBatchMutate() for mutateRows() methods to use it. --- .../org/apache/hadoop/hbase/util/NonceKey.java | 14 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 625 +++++++++++---------- .../regionserver/MultiRowMutationProcessor.java | 205 ------- .../hadoop/hbase/regionserver/RSRpcServices.java | 60 +- .../apache/hadoop/hbase/regionserver/Region.java | 30 + .../hadoop/hbase/regionserver/TestHRegion.java | 12 +- .../hadoop/hbase/regionserver/TestParallelPut.java | 2 +- .../TestWALMonotonicallyIncreasingSeqId.java | 2 +- 8 files changed, 394 insertions(+), 556 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java index 6da808ed639f3ae2268097ed3d1fb92b2be790d7..87bc65cef122658b18494ce1092b971dae63ee59 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java @@ -26,12 +26,11 @@ import org.apache.yetus.audience.InterfaceAudience; */ // TODO: we could use pure byte arrays, but then we wouldn't be able to use hash map. @InterfaceAudience.Private -public class NonceKey { +public class NonceKey implements Comparable { private long group; private long nonce; public NonceKey(long group, long nonce) { - assert nonce != HConstants.NO_NONCE; this.group = group; this.nonce = nonce; } @@ -62,4 +61,13 @@ public class NonceKey { public long getNonce() { return nonce; } -} + + @Override + public int compareTo(NonceKey o) { + int rc = Long.compare(group, o.getNonceGroup()); + if (rc == 0) { + return Long.compare(nonce, o.getNonce()); + } + return rc; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4fa2c704ebc74dfe30c4943295193f7866000010..06cfc7f7491ed0ab71ae412e325b7ba2aa32ed09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -70,6 +70,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -101,6 +103,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Delete; @@ -805,11 +808,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi configurationManager = Optional.empty(); // disable stats tracking system tables, but check the config for everything else - this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals( - NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ? - false : - conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, - HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + this.regionStatsEnabled = !htd.getTableName().getNamespaceAsString().equals( + NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) && conf.getBoolean( + HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); this.maxCellSize = conf.getLong(HBASE_MAX_CELL_SIZE_KEY, DEFAULT_MAX_CELL_SIZE); } @@ -1924,13 +1925,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // upkeep. ////////////////////////////////////////////////////////////////////////////// /** - * @return returns size of largest HStore. - */ - public long getLargestHStoreSize() { - return stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L); - } - - /** * Do preparation for pending compaction. * @throws IOException */ @@ -3045,17 +3039,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) - throws IOException { + public OperationStatus[] batchMutate(Mutation[] mutations, byte[][] rowsToLock, long nonceGroup, + long nonce, boolean atomic) throws IOException { // As it stands, this is used for 3 things // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... - return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); + return batchMutate(new MutationBatch(mutations, nonceGroup, nonce), rowsToLock, atomic); + } + + @Override + public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce, + boolean atomic) throws IOException { + return batchMutate(mutations, null, nonceGroup, nonce, atomic); + } + + @Override + public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) + throws IOException { + return batchMutate(mutations, null, nonceGroup, nonce, false); } - public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { - return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); + public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic) throws IOException { + return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE, atomic); } @Override @@ -3080,7 +3086,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } return statuses; } - return batchMutate(new ReplayBatch(mutations, replaySeqId)); + return batchMutate(new ReplayBatch(mutations, replaySeqId), null, false); } /** @@ -3091,7 +3097,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - OperationStatus[] batchMutate(BatchOperation batchOp) throws IOException { + OperationStatus[] batchMutate(BatchOperation batchOp, byte[][] rowsToLock, boolean atomic) + throws IOException { boolean initialized = false; Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; startRegionOperation(op); @@ -3109,7 +3116,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } initialized = true; } - doMiniBatchMutate(batchOp); + doMiniBatchMutate(batchOp, rowsToLock, atomic); long newSize = this.getMemstoreSize(); requestFlushIfNeeded(newSize); } @@ -3159,15 +3166,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)} - * In here we also handle replay of edits on region recover. + * Called to do a piece of the batch that came in to + * {@link #batchMutate(Mutation[], byte[][], long, long, boolean)} In here we also handle replay + * of edits on region recover. * @return Change in size brought about by applying batchOp */ // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120 - private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { + private void doMiniBatchMutate(BatchOperation batchOp, byte[][] rowsToLock, boolean atomic) + throws IOException { + Preconditions.checkArgument(atomic || rowsToLock == null, + "Locking custom rows for non-atomic batch mutation not supported!"); boolean replay = batchOp.isInReplay(); - long currentNonceGroup = HConstants.NO_NONCE; - long currentNonce = HConstants.NO_NONCE; WALEdit walEdit = null; boolean locked = false; // reference family maps directly so coprocessors can mutate them if desired @@ -3184,260 +3193,149 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); MemstoreSize memstoreSize = new MemstoreSize(); final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch(); + try { - // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. - int numReadyToWrite = 0; - long now = EnvironmentEdgeManager.currentTime(); - while (lastIndexExclusive < batchOp.operations.length) { - if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) { - lastIndexExclusive++; - continue; - } - Mutation mutation = batchOp.getMutation(lastIndexExclusive); - // If we haven't got any rows in our batch, we should block to get the next one. - RowLock rowLock = null; - try { - rowLock = getRowLockInternal(mutation.getRow(), true); - } catch (TimeoutIOException e) { - // We will retry when other exceptions, but we should stop if we timeout . - throw e; - } catch (IOException ioe) { - LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); - } - if (rowLock == null) { - // We failed to grab another lock - break; // Stop acquiring more rows for this batch - } else { - acquiredRowLocks.add(rowLock); - } + // STEP 1. Validate operations + int numRowsReadyToWrite = validateBatchOp(batchOp, EnvironmentEdgeManager.currentTime(), + familyMaps, observedExceptions); - lastIndexExclusive++; - numReadyToWrite++; - if (replay) { - for (List cells : mutation.getFamilyCellMap().values()) { - cellCount += cells.size(); - } - } + if (numRowsReadyToWrite == 0) { + // Nothing to do here + return; + } else if (atomic && numRowsReadyToWrite != batchOp.operations.length) { + // not all operations can be applied + throw new IOException("Can't apply all operations atomically!"); + } + + // STEP 2. Try to acquire adequate number of locks + int startLocksIndex = rowsToLock != null ? 0 : firstIndex; + int locksLength = rowsToLock != null ? rowsToLock.length : batchOp.operations.length; + IntStream rowIndices = IntStream.range(startLocksIndex, locksLength); + if (rowsToLock == null) { + rowIndices = rowIndices.filter(i -> + batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN); + } + Stream rowStream = rowIndices.mapToObj(i -> rowsToLock != null ? rowsToLock[i] : + batchOp.getMutation(i).getRow()); + lockBatchOfRows(rowStream.iterator(), acquiredRowLocks, atomic); + + if (atomic) { + if (acquiredRowLocks.size() != locksLength) { + // failed to get all locks + throw new IOException("Can't apply all operations atomically!"); + } + } else if (acquiredRowLocks.isEmpty()) { + // Nothing to put/delete - an exception or no lock + return; + } else { + numRowsReadyToWrite = acquiredRowLocks.size(); } - // We've now grabbed as many mutations off the list as we can - - // STEP 2. Update any LATEST_TIMESTAMP timestamps + // STEP 3. Update any LATEST_TIMESTAMP timestamps // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp - now = EnvironmentEdgeManager.currentTime(); + long now = EnvironmentEdgeManager.currentTime(); byte[] byteNow = Bytes.toBytes(now); - - // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? - if (numReadyToWrite <= 0) { - return; - } - - for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) { - // skip invalid - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - // lastIndexExclusive was incremented above. + Durability durability = Durability.USE_DEFAULT; + for (int mutationCount = 0; + lastIndexExclusive < batchOp.operations.length && mutationCount < numRowsReadyToWrite; + lastIndexExclusive++) { + if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() != + OperationStatusCode.NOT_RUN) { continue; } + mutationCount++; - Mutation mutation = batchOp.getMutation(i); - if (mutation instanceof Put) { - updateCellTimestamps(familyMaps[i].values(), byteNow); - noOfPuts++; - } else { - prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); - noOfDeletes++; - } - rewriteCellTags(familyMaps[i], mutation); - WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; - if (fromCP != null) { - cellCount += fromCP.size(); + Mutation mutation = batchOp.getMutation(lastIndexExclusive); + Durability tmpDurability = mutation.getDurability(); + if (tmpDurability.ordinal() > durability.ordinal()) { + durability = tmpDurability; } - if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { - for (List cells : familyMaps[i].values()) { - cellCount += cells.size(); + + if (!replay) { + if (mutation instanceof Put) { + updateCellTimestamps(familyMaps[lastIndexExclusive].values(), byteNow); + noOfPuts++; + } else { + prepareDeleteTimestamps(mutation, familyMaps[lastIndexExclusive], byteNow); + noOfDeletes++; + } + rewriteCellTags(familyMaps[lastIndexExclusive], mutation); + + WALEdit fromCP = batchOp.walEditsFromCoprocessors[lastIndexExclusive]; + if (fromCP != null) { + cellCount += fromCP.size(); + } + if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { + cellCount += mutation.getFamilyCellMap().values().stream().mapToInt(List::size).sum(); } } } - lock(this.updatesLock.readLock(), numReadyToWrite); + + lock(this.updatesLock.readLock(), numRowsReadyToWrite); locked = true; // calling the pre CP hook for batch mutation if (!replay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - if (coprocessorHost.preBatchMutate(miniBatchOp)) { - return; - } else { - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - // lastIndexExclusive was incremented above. - continue; - } - // we pass (i - firstIndex) below since the call expects a relative index - Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); - if (cpMutations == null) { - continue; - } - Mutation mutation = batchOp.getMutation(i); - boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL; - // Else Coprocessor added more Mutations corresponding to the Mutation at this index. - for (int j = 0; j < cpMutations.length; j++) { - Mutation cpMutation = cpMutations[j]; - Map> cpFamilyMap = cpMutation.getFamilyCellMap(); - checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now); - - // Acquire row locks. If not, the whole batch will fail. - acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); - - // Returned mutations from coprocessor correspond to the Mutation at index i. We can - // directly add the cells from those mutations to the familyMaps of this mutation. - mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later - - // The durability of returned mutation is replaced by the corresponding mutation. - // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the - // cells of returned mutation. - if (!skipWal) { - for (List cells : cpFamilyMap.values()) { - cellCount += cells.size(); - } - } - } - } - } + MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress<>( + batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, + batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + cellCount += validateAndMergeCPMutations(batchOp, miniBatchOp, familyMaps, + acquiredRowLocks, now, replay); } // STEP 3. Build WAL edit - walEdit = new WALEdit(cellCount, replay); - Durability durability = Durability.USE_DEFAULT; - for (int i = firstIndex; i < lastIndexExclusive; i++) { - // Skip puts that were determined to be invalid during preprocessing - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - continue; - } - - Mutation m = batchOp.getMutation(i); - Durability tmpDur = getEffectiveDurability(m.getDurability()); - if (tmpDur.ordinal() > durability.ordinal()) { - durability = tmpDur; - } - // we use durability of the original mutation for the mutation passed by CP. - if (tmpDur == Durability.SKIP_WAL) { - recordMutationWithoutWal(m.getFamilyCellMap()); - continue; - } - - long nonceGroup = batchOp.getNonceGroup(i); - long nonce = batchOp.getNonce(i); - // In replay, the batch may contain multiple nonces. If so, write WALEdit for each. - // Given how nonces are originally written, these should be contiguous. - // They don't have to be, it will still work, just write more WALEdits than needed. - if (nonceGroup != currentNonceGroup || nonce != currentNonce) { - // Write what we have so far for nonces out to WAL - appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce); - walEdit = new WALEdit(cellCount, replay); - currentNonceGroup = nonceGroup; - currentNonce = nonce; - } - - // Add WAL edits by CP - WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; - if (fromCP != null) { - for (Cell cell : fromCP.getCells()) { - walEdit.add(cell); - } - } - addFamilyMapToWALEdit(familyMaps[i], walEdit); + Map walEditMap = buildWALEdits(batchOp, firstIndex, lastIndexExclusive, + familyMaps, cellCount, replay); + if (!replay && walEditMap.size() > 1) { + throw new IOException("Multiple nonces per batch and not in replay"); } // STEP 4. Append the final edit to WAL and sync. Mutation mutation = batchOp.getMutation(firstIndex); - WALKey walKey = null; - long txid; - if (replay) { - // use wal key from the original - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); - walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); - if (!walEdit.isEmpty()) { - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - if (txid != 0) { - sync(txid, durability); - } - } - } else { - try { - if (!walEdit.isEmpty()) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, - this.getReplicationScope()); - // TODO: Use the doAppend methods below... complicated by the replay stuff above. - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - if (txid != 0) { - sync(txid, durability); - } - if (writeEntry == null) { - // if MVCC not preassigned, wait here until assigned - writeEntry = walKey.getWriteEntry(); - } - } - } catch (IOException ioe) { - if (walKey != null && writeEntry == null) { - // the writeEntry is not preassigned and error occurred during append or sync - mvcc.complete(walKey.getWriteEntry()); - } - throw ioe; + for (Map.Entry walEditEntry : walEditMap.entrySet()) { + walEdit = walEditEntry.getValue(); + NonceKey nonceKey = walEditEntry.getKey(); + writeEntry = doWALAppend(walEdit, getEffectiveDurability(durability), + mutation.getClusterIds(), now, nonceKey.getNonceGroup(), nonceKey.getNonce(), + replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID); + if (replay && writeEntry != null) { + mvcc.complete(writeEntry); } } - if (walKey == null) { + + if (!replay && writeEntry == null) { // If no walKey, then not in replay and skipping WAL or some such. Begin an MVCC transaction // to get sequence id. writeEntry = mvcc.begin(); } // STEP 5. Write back to memstore - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - continue; - } - // We need to update the sequence id for following reasons. - // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. - // 2) If no WAL, FSWALEntry won't be used - // we use durability of the original mutation for the mutation passed by CP. - boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; - if (updateSeqId) { - this.updateSequenceId(familyMaps[i].values(), - replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); - } - applyFamilyMapToMemstore(familyMaps[i], memstoreSize); - } - - // update memstore size - this.addAndGetMemstoreSize(memstoreSize); + writeBatchToMemstore(batchOp, firstIndex, lastIndexExclusive, familyMaps, memstoreSize, + writeEntry, replay); // calling the post CP hook for batch mutation if (!replay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress<>( + batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, + batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } // STEP 6. Complete mvcc. if (replay) { this.mvcc.advanceTo(batchOp.getReplaySequenceId()); - } else { + } else if (writeEntry != null) { // writeEntry won't be empty if not in replay mode mvcc.completeAndWait(writeEntry); writeEntry = null; } - // STEP 7. Release row locks, etc. + success = true; + } finally { + // Call complete rather than completeAndWait because we probably had error if walKey != null + if (writeEntry != null) mvcc.complete(writeEntry); + if (locked) { this.updatesLock.readLock().unlock(); locked = false; @@ -3446,7 +3344,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (int i = firstIndex; i < lastIndexExclusive; i ++) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + batchOp.retCodeDetails[i] = success ? OperationStatus.SUCCESS : OperationStatus.FAILURE; } } @@ -3468,15 +3366,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - success = true; - } finally { - // Call complete rather than completeAndWait because we probably had error if walKey != null - if (writeEntry != null) mvcc.complete(writeEntry); - if (locked) { - this.updatesLock.readLock().unlock(); - } - releaseRowLocks(acquiredRowLocks); - // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. // null will be treated as unknown. @@ -3495,13 +3384,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion.updateDelete(); } } - if (!success) { - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { - batchOp.retCodeDetails[i] = OperationStatus.FAILURE; - } - } - } + if (coprocessorHost != null && !batchOp.isInReplay()) { // call the coprocessor hook to do any finalization steps // after the put is done @@ -3515,6 +3398,168 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + /** + * This method checks if batch operation is valid and stores familyMap of a valid operation into + * the input {@code familyMaps} + * @param batchOp batch of operations + * @param familyMaps map of row to family map + * + * @return returns number of rows ready to written + * @throws IOException + */ + private int validateBatchOp(final BatchOperation batchOp, long timestamp, + final Map>[] familyMaps, + final ObservedExceptionsInBatch observedExceptions) throws IOException { + int numRowsReadyToWrite = 0; + for (int i = 0; i < batchOp.operations.length; i++) { + if (!checkBatchOp(batchOp, i, familyMaps, timestamp, observedExceptions)) { + numRowsReadyToWrite++; + } + } + return numRowsReadyToWrite; + } + + /** + * Tries to get as many locks as required on input rows + * @param rowsToLock rows to lock + * @param acquiredRowLocks list of rows locked + * @param atomic if true, all rows need to be locked. Exception is thrown otherwise + * @throws IOException + */ + private void lockBatchOfRows(final Iterator rowsToLock, + final List acquiredRowLocks, boolean atomic) throws IOException { + while (rowsToLock.hasNext()) { + byte[] row = rowsToLock.next(); + RowLock rowLock = null; + try { + rowLock = getRowLockInternal(row, !atomic); + } catch (TimeoutIOException e) { + // We will retry when other exceptions, but we should stop if we timeout . + throw e; + } catch (IOException ioe) { + LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(row), ioe); + if (atomic) { + throw ioe; + } + } + if (rowLock != null) { + acquiredRowLocks.add(rowLock); + } else { + break; // We failed to grab another lock, stop acquiring more rows for this batch + } + } + } + + private int validateAndMergeCPMutations(final BatchOperation batchOp, + final MiniBatchOperationInProgress miniBatchOp, + final Map>[] familyMaps, final List acquiredRowLocks, + long timestamp, boolean replay) throws IOException { + int cellCount = 0; + int firstIndex = batchOp.nextIndexToProcess; + int lastIndexExclusive = firstIndex + miniBatchOp.size(); + if (!coprocessorHost.preBatchMutate(miniBatchOp)) { + for (int i = batchOp.nextIndexToProcess; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + // lastIndexExclusive was incremented above. + continue; + } + // we pass (i - firstIndex) below since the call expects a relative index + Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); + if (cpMutations == null) { + continue; + } + Mutation mutation = batchOp.getMutation(i); + boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL; + // Else Coprocessor added more Mutations corresponding to the Mutation at this index. + for (int j = 0; j < cpMutations.length; j++) { + Mutation cpMutation = cpMutations[j]; + Map> cpFamilyMap = cpMutation.getFamilyCellMap(); + checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, timestamp); + + // Acquire row locks. If not, the whole batch will fail. + acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); + + // Returned mutations from coprocessor correspond to the Mutation at index i. We can + // directly add the cells from those mutations to the familyMaps of this mutation. + mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later + + // The durability of returned mutation is replaced by the corresponding mutation. + // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the + // cells of returned mutation. + if (!skipWal) { + cellCount += cpFamilyMap.values().stream().mapToInt(List::size).sum(); + } + } + } + } + return cellCount; + } + + private Map buildWALEdits(final BatchOperation batchOp, int firstIndex, + int lastIndexExclusive, final Map>[] familyMaps, int cellCount, + boolean replay) { + Map walEditMap = new TreeMap<>(); + Durability durability = Durability.USE_DEFAULT; + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // Skip puts that were determined to be invalid during pre-processing + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + continue; + } + + Mutation m = batchOp.getMutation(i); + Durability tmpDur = getEffectiveDurability(m.getDurability()); + if (tmpDur.ordinal() > durability.ordinal()) { + durability = tmpDur; + } + // we use durability of the original mutation for the mutation passed by CP. + if (tmpDur == Durability.SKIP_WAL) { + recordMutationWithoutWal(m.getFamilyCellMap()); + continue; + } + + NonceKey nonceKey = new NonceKey(batchOp.getNonceGroup(i), batchOp.getNonce(i)); + WALEdit walEdit = walEditMap.get(nonceKey); + if (walEdit == null) { + walEdit = new WALEdit(cellCount, replay); + walEditMap.put(nonceKey, walEdit); + } + + // Add WAL edits by CP + WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; + if (fromCP != null) { + for (Cell cell : fromCP.getCells()) { + walEdit.add(cell); + } + } + addFamilyMapToWALEdit(familyMaps[i], walEdit); + } + return walEditMap; + } + + private void writeBatchToMemstore(final BatchOperation batchOp, int firstIndex, + int lastIndexExclusive, final Map>[] familyMaps, + final MemstoreSize memstoreSize, final WriteEntry writeEntry, boolean replay) + throws IOException { + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + continue; + } + // We need to update the sequence id for following reasons. + // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. + // 2) If no WAL, FSWALEntry won't be used + // we use durability of the original mutation for the mutation passed by CP. + boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; + if (updateSeqId) { + this.updateSequenceId(familyMaps[i].values(), + replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); + } + applyFamilyMapToMemstore(familyMaps[i], memstoreSize); + } + + // update memstore size + this.addAndGetMemstoreSize(memstoreSize); + } + private void mergeFamilyMaps(Map> familyMap, Map> toBeMerged) { for (Map.Entry> entry : toBeMerged.entrySet()) { @@ -3527,19 +3572,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private void appendCurrentNonces(final Mutation mutation, final boolean replay, - final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce) - throws IOException { - if (walEdit.isEmpty()) return; - if (!replay) throw new IOException("Multiple nonces per batch and not in replay"); - WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, mutation.getClusterIds(), - currentNonceGroup, currentNonce, mvcc, this.getReplicationScope()); - this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - // Complete the mvcc transaction started down in append else it will block others - this.mvcc.complete(walKey.getWriteEntry()); - } - private boolean checkBatchOp(BatchOperation batchOp, final int lastIndexExclusive, final Map>[] familyMaps, final long now, final ObservedExceptionsInBatch observedExceptions) @@ -3745,6 +3777,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (mutation != null) { doBatchMutate(mutation); } else { + batchMutate(rowMutations.getMutations().toArray(new Mutation[0]), true); mutateRow(rowMutations); } this.checkAndMutateChecksPassed.increment(); @@ -3801,7 +3834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private void doBatchMutate(Mutation mutation) throws IOException { // Currently this is only called for puts and deletes, so no nonces. - OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}); + OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}, false); if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { @@ -6855,7 +6888,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } - void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException { + void prepareGet(final Get get) throws IOException { checkRow(get.getRow(), "Get"); // Verify families are all valid if (get.hasFamilies()) { @@ -6917,8 +6950,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void mutateRow(RowMutations rm) throws IOException { - // Don't need nonces here - RowMutations only supports puts and deletes - mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); + batchMutate(rm.getMutations().toArray(new Mutation[0]), true); } /** @@ -6945,9 +6977,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { - writeRequestsCount.add(mutations.size()); - MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); - processRowsWithLocks(proc, -1, nonceGroup, nonce); + batchMutate(mutations.toArray(new Mutation[0]), rowsToLock.toArray(new byte[0][0]), nonceGroup, + nonce, true); } /** @@ -6977,8 +7008,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void processRowsWithLocks(RowProcessor processor) throws IOException { - processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE, - HConstants.NO_NONCE); + processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE, HConstants.NO_NONCE); } @Override @@ -7045,7 +7075,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // STEP 6. Append and sync if walEdit has data to write out. if (!walEdit.isEmpty()) { writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()), - processor.getClusterIds(), now, nonceGroup, nonce); + processor.getClusterIds(), now, nonceGroup, nonce, WALKey.NO_SEQUENCE_ID); } else { // We are here if WAL is being skipped. writeEntry = this.mvcc.begin(); @@ -7278,32 +7308,40 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long nonce) throws IOException { return doWALAppend(walEdit, durability, WALKey.EMPTY_UUIDS, System.currentTimeMillis(), - nonceGroup, nonce); + nonceGroup, nonce, WALKey.NO_SEQUENCE_ID); } /** * @return writeEntry associated with this append */ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, - long now, long nonceGroup, long nonce) - throws IOException { + long now, long nonceGroup, long nonce, long replaySeqId) throws IOException { WriteEntry writeEntry = null; - // Using default cluster id, as this can only happen in the originating cluster. - // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey - // here instead of WALKey directly to support legacy coprocessors. - WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, - nonceGroup, nonce, mvcc, this.getReplicationScope()); - try { - long txid = - this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - // Call sync on our edit. - if (txid != 0) sync(txid, durability); - writeEntry = walKey.getWriteEntry(); - } catch (IOException ioe) { - if (walKey != null) mvcc.complete(walKey.getWriteEntry()); - throw ioe; + if (!walEdit.isEmpty()) { + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, + nonce, mvcc) : + new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + clusterIds, nonceGroup, nonce, mvcc, this.getReplicationScope()); + if (walEdit.isReplay()) { + walKey.setOrigLogSeqNum(replaySeqId); + } + + try { + long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); + // Call sync on our edit. + if (txid != 0) sync(txid, durability); + writeEntry = walKey.getWriteEntry(); + } catch (IOException ioe) { + if (walKey != null) mvcc.complete(walKey.getWriteEntry()); + throw ioe; + } } + return writeEntry; } @@ -7671,13 +7709,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * Give the region a chance to prepare before it is split. - */ - protected void prepareToSplit() { - // nothing - } - - /** * Return the splitpoint. null indicates the region isn't splittable * If the splitpoint isn't explicitly specified, it will go over the stores * to find the best splitpoint. Currently the criteria of best splitpoint diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java deleted file mode 100644 index 09ac73d3293d3231ff6c4bd959dcabcc7b869c92..0000000000000000000000000000000000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest; -import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * A MultiRowProcessor that performs multiple puts and deletes. - */ -@InterfaceAudience.Private -class MultiRowMutationProcessor extends BaseRowProcessor { - Collection rowsToLock; - Collection mutations; - MiniBatchOperationInProgress miniBatch; - - MultiRowMutationProcessor(Collection mutations, - Collection rowsToLock) { - this.rowsToLock = rowsToLock; - this.mutations = mutations; - } - - @Override - public Collection getRowsToLock() { - return rowsToLock; - } - - @Override - public boolean readOnly() { - return false; - } - - @Override - public MultiRowMutationProcessorResponse getResult() { - return MultiRowMutationProcessorResponse.getDefaultInstance(); - } - - @Override - public void process(long now, - HRegion region, - List mutationsToApply, - WALEdit walEdit) throws IOException { - byte[] byteNow = Bytes.toBytes(now); - // Check mutations - for (Mutation m : this.mutations) { - if (m instanceof Put) { - Map> familyMap = m.getFamilyCellMap(); - region.checkFamilies(familyMap.keySet()); - region.checkTimestamps(familyMap, now); - region.updateCellTimestamps(familyMap.values(), byteNow); - } else if (m instanceof Delete) { - Delete d = (Delete) m; - region.prepareDelete(d); - region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow); - } else { - throw new DoNotRetryIOException("Action must be Put or Delete. But was: " - + m.getClass().getName()); - } - mutationsToApply.add(m); - } - // Apply edits to a single WALEdit - for (Mutation m : mutations) { - for (List cells : m.getFamilyCellMap().values()) { - boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL; - for (Cell cell : cells) { - if (writeToWAL) walEdit.add(cell); - } - } - } - } - - @Override - public void preProcess(HRegion region, WALEdit walEdit) throws IOException { - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - if (coprocessorHost != null) { - for (Mutation m : mutations) { - if (m instanceof Put) { - if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { - // by pass everything - return; - } - } else if (m instanceof Delete) { - Delete d = (Delete) m; - region.prepareDelete(d); - if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) { - // by pass everything - return; - } - } - } - } - } - - @Override - public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { - // TODO we should return back the status of this hook run to HRegion so that those Mutations - // with OperationStatus as SUCCESS or FAILURE should not get applied to memstore. - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - OperationStatus[] opStatus = new OperationStatus[mutations.size()]; - Arrays.fill(opStatus, OperationStatus.NOT_RUN); - WALEdit[] walEditsFromCP = new WALEdit[mutations.size()]; - if (coprocessorHost != null) { - miniBatch = new MiniBatchOperationInProgress<>( - mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0, - mutations.size()); - coprocessorHost.preBatchMutate(miniBatch); - } - // Apply edits to a single WALEdit - for (int i = 0; i < mutations.size(); i++) { - if (opStatus[i] == OperationStatus.NOT_RUN) { - // Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook - // itself. No need to apply again to region - if (walEditsFromCP[i] != null) { - // Add the WALEdit created by CP hook - for (Cell walCell : walEditsFromCP[i].getCells()) { - walEdit.add(walCell); - } - } - } - } - } - - @Override - public void postBatchMutate(HRegion region) throws IOException { - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - if (coprocessorHost != null) { - assert miniBatch != null; - // Use the same miniBatch state used to call the preBatchMutate() - coprocessorHost.postBatchMutate(miniBatch); - } - } - - @Override - public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException { - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - if (coprocessorHost != null) { - for (Mutation m : mutations) { - if (m instanceof Put) { - coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); - } else if (m instanceof Delete) { - coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); - } - } - // At the end call the CP hook postBatchMutateIndispensably - if (miniBatch != null) { - // Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a - // read only process. Then no need to call this batch based CP hook also. - coprocessorHost.postBatchMutateIndispensably(miniBatch, success); - } - } - } - - @Override - public MultiRowMutationProcessorRequest getRequestData() { - return MultiRowMutationProcessorRequest.getDefaultInstance(); - } - - @Override - public void initialize(MultiRowMutationProcessorRequest msg) { - //nothing - } - - @Override - public Durability useDurability() { - // return true when at least one mutation requested a WAL flush (default) - Durability durability = Durability.USE_DEFAULT; - for (Mutation m : mutations) { - if (m.getDurability().ordinal() > durability.ordinal()) { - durability = m.getDurability(); - } - } - return durability; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e71c1eb68fe526cef0963734ee5e872962d4d79d..d2e1ac056f7c1d58c5fa6d66e042cfbb0fe89b2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -547,45 +547,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param cellScanner if non-null, the mutation data -- the Cell content. * @throws IOException */ - private void mutateRows(final Region region, - final List actions, - final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException { - if (!region.getRegionInfo().isMetaTable()) { - regionServer.cacheFlusher.reclaimMemStoreMemory(); - } - RowMutations rm = null; - int i = 0; - ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = - ClientProtos.ResultOrException.newBuilder(); + private void mutateRows(final Region region, final OperationQuota quota, + final List actions, final CellScanner cellScanner, + final RegionActionResult.Builder builder, final ActivePolicyEnforcement spaceQuotaEnforcement) + throws IOException { for (ClientProtos.Action action: actions) { if (action.hasGet()) { throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); } - MutationType type = action.getMutation().getMutateType(); - if (rm == null) { - rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); - } - switch (type) { - case PUT: - Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); - checkCellSizeLimit(region, put); - rm.add(put); - break; - case DELETE: - rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); - break; - default: - throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); - } - // To unify the response format with doNonAtomicRegionMutation and read through client's - // AsyncProcess we have to add an empty result instance per operation - resultOrExceptionOrBuilder.clear(); - resultOrExceptionOrBuilder.setIndex(i++); - builder.addResultOrException( - resultOrExceptionOrBuilder.build()); } - region.mutateRow(rm); + doBatchOp(builder, region, quota, actions, cellScanner, spaceQuotaEnforcement, true); } /** @@ -860,7 +832,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && !mutations.isEmpty()) { // Flush out any Puts or Deletes already collected. - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); + doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); mutations.clear(); } switch (type) { @@ -921,7 +893,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // Finish up any outstanding mutations if (mutations != null && !mutations.isEmpty()) { - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); + doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); } return cellsToReturn; } @@ -955,7 +927,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private void doBatchOp(final RegionActionResult.Builder builder, final Region region, final OperationQuota quota, final List mutations, - final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { + final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) { Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; @@ -991,16 +963,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionServer.cacheFlusher.reclaimMemStoreMemory(); } - // HBASE-17924 - // sort to improve lock efficiency - Arrays.sort(mArray); + if (!atomic) { + // HBASE-17924 + // sort to improve lock efficiency + Arrays.sort(mArray); + } OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE, - HConstants.NO_NONCE); + HConstants.NO_NONCE, atomic); for (i = 0; i < codes.length; i++) { Mutation currentMutation = mArray[i]; ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); - int index = currentAction.getIndex(); + int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i; Exception e = null; switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: @@ -2604,8 +2578,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner, row, family, qualifier, op, comparator, regionActionResultBuilder, spaceQuotaEnforcement); } else { - mutateRows(region, regionAction.getActionList(), cellScanner, - regionActionResultBuilder); + mutateRows(region, quota, regionAction.getActionList(), cellScanner, + regionActionResultBuilder, spaceQuotaEnforcement); processed = Boolean.TRUE; } } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 4890f0d64baacd119b086aa52707bd1f9fa24f62..3465da02685cbfe5a1823a872a9f905f597788bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -344,6 +344,36 @@ public interface Region extends ConfigurationObserver { *

* Note this supports only Put and Delete mutations and will ignore other types passed. * @param mutations the list of mutations + * @param rowsToLock rows to lock + * @param nonceGroup + * @param nonce + * @param atomic if input mutations needs to be applied atomically + * @return an array of OperationStatus which internally contains the + * OperationStatusCode and the exceptionMessage if any. + * @throws IOException + */ + OperationStatus[] batchMutate(Mutation[] mutations, byte[][] rowsToLock, long nonceGroup, + long nonce, boolean atomic) throws IOException; + + /** + * Perform a batch of mutations. + *

+ * Note this supports only Put and Delete mutations and will ignore other types passed. + * @param mutations the list of mutations + * @param nonceGroup + * @param nonce + * @param atomic if input mutations needs to be applied atomically + * @return an array of OperationStatus which internally contains the + * OperationStatusCode and the exceptionMessage if any. + * @throws IOException + */ + OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce, boolean atomic) + throws IOException; + /** + * Perform a batch of mutations (not necessarily in atomic) + *

+ * Note this supports only Put and Delete mutations and will ignore other types passed. + * @param mutations the list of mutations * @param nonceGroup * @param nonce * @return an array of OperationStatus which internally contains the diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 0b0d651b85389c91b0fe4299499b637f7791724c..c64ea941f5a5d828b2a922c10025eb312b50d3a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1534,7 +1534,7 @@ public class TestHRegion { puts[i].addColumn(cf, qual, val); } - OperationStatus[] codes = this.region.batchMutate(puts); + OperationStatus[] codes = this.region.batchMutate(puts, false); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); @@ -1543,7 +1543,7 @@ public class TestHRegion { LOG.info("Next a batch put with one invalid family"); puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val); - codes = this.region.batchMutate(puts); + codes = this.region.batchMutate(puts, false); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, @@ -1590,7 +1590,7 @@ public class TestHRegion { @Override public void doWork() throws IOException { startingPuts.countDown(); - retFromThread.set(region.batchMutate(puts)); + retFromThread.set(region.batchMutate(puts, false)); } }; LOG.info("...starting put thread while holding locks"); @@ -1680,7 +1680,7 @@ public class TestHRegion { puts[i].addColumn(cf, qual, val); } - OperationStatus[] codes = this.region.batchMutate(puts); + OperationStatus[] codes = this.region.batchMutate(puts, false); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode()); @@ -6305,7 +6305,7 @@ public class TestHRegion { new Put(b).addImmutable(fam1, null, null), }; - OperationStatus[] status = region.batchMutate(mutations); + OperationStatus[] status = region.batchMutate(mutations, false); assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS); assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SANITY_CHECK_FAILURE); assertEquals(status[2].getOperationStatusCode(), OperationStatusCode.SUCCESS); @@ -6338,7 +6338,7 @@ public class TestHRegion { }; // this will wait for the row lock, and it will eventually succeed - OperationStatus[] status = region.batchMutate(mutations); + OperationStatus[] status = region.batchMutate(mutations, false); assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS); assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SUCCESS); return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java index 6bbb81dda0b7259b4f1c53c32cb196778cfc917a..4c562523d3a1dc9c21eb98eb4e63b02570f6535f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java @@ -218,7 +218,7 @@ public class TestParallelPut { put.addColumn(fam1, qual1, value); in[0] = put; try { - OperationStatus[] ret = region.batchMutate(in); + OperationStatus[] ret = region.batchMutate(in, false); assertEquals(1, ret.length); assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode()); assertGet(this.region, rowkey, fam1, qual1, value); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java index fabd07554ebc0d4a76e9c54c3a345d7983ca967a..c4018ec0a94811a9c019720e0d73badbcd747a92 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java @@ -118,7 +118,7 @@ public class TestWALMonotonicallyIncreasingSeqId { put.addColumn("cf".getBytes(), Bytes.toBytes(0), Bytes.toBytes("")); //put.setDurability(Durability.ASYNC_WAL); latch.await(); - region.batchMutate(new Mutation[]{put}); + region.batchMutate(new Mutation[]{put}, false); Thread.sleep(10); } -- 2.10.1 (Apple Git-78)