From 28ecbe0482019949e0a44381692c9d1b8e45257d Mon Sep 17 00:00:00 2001 From: Umesh Agashe Date: Wed, 27 Sep 2017 15:17:35 -0700 Subject: [PATCH] HBASE-18703 WIP refactor doMiniBatchMutate() for mutateRows() methods to use it. --- .../org/apache/hadoop/hbase/util/NonceKey.java | 1 - .../apache/hadoop/hbase/regionserver/HRegion.java | 868 ++++++++++++--------- .../regionserver/MultiRowMutationProcessor.java | 205 ----- .../hadoop/hbase/regionserver/RSRpcServices.java | 70 +- .../apache/hadoop/hbase/regionserver/Region.java | 30 + .../hadoop/hbase/regionserver/TestHRegion.java | 12 +- .../hadoop/hbase/regionserver/TestParallelPut.java | 2 +- .../TestWALMonotonicallyIncreasingSeqId.java | 2 +- 8 files changed, 544 insertions(+), 646 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java index 6da808ed639f3ae2268097ed3d1fb92b2be790d7..7e15c6fe6ac6bce23533ef83f53ec38f19a17d55 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java @@ -31,7 +31,6 @@ public class NonceKey { private long nonce; public NonceKey(long group, long nonce) { - assert nonce != HConstants.NO_NONCE; this.group = group; this.nonce = nonce; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 80c043379f0e8288287361270051d44abe2d1700..6d0cea0929e9fd7195669c5d539aef29dec6ff1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -70,6 +70,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -100,6 +102,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Delete; @@ -646,7 +649,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final MetricsRegion metricsRegion; private final MetricsRegionWrapperImpl metricsRegionWrapper; - private final Durability durability; + private final Durability regionDurability; private final boolean regionStatsEnabled; // Stores the replication scope of the various column families of the table // that has non-default scope @@ -772,7 +775,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ this.rowProcessorTimeout = conf.getLong( "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); - this.durability = htd.getDurability() == Durability.USE_DEFAULT + this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ? DEFAULT_DURABILITY : htd.getDurability(); if (rsServices != null) { @@ -805,11 +808,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi configurationManager = Optional.empty(); // disable stats tracking system tables, but check the config for everything else - this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals( - NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ? - false : - conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, - HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + this.regionStatsEnabled = !htd.getTableName().getNamespaceAsString().equals( + NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) && conf.getBoolean( + HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); this.maxCellSize = conf.getLong(HBASE_MAX_CELL_SIZE_KEY, DEFAULT_MAX_CELL_SIZE); } @@ -1924,13 +1925,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // upkeep. ////////////////////////////////////////////////////////////////////////////// /** - * @return returns size of largest HStore. - */ - public long getLargestHStoreSize() { - return stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L); - } - - /** * Do preparation for pending compaction. * @throws IOException */ @@ -2944,12 +2938,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; WALEdit[] walEditsFromCoprocessors; + // reference family maps directly so coprocessors can mutate them if desired + Map>[] familyMaps; + ObservedExceptionsInBatch observedExceptions; + Durability durability; public BatchOperation(T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; this.walEditsFromCoprocessors = new WALEdit[operations.length]; Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); + familyMaps = new Map[operations.length]; + observedExceptions = new ObservedExceptionsInBatch(); + durability = Durability.USE_DEFAULT; } public abstract Mutation getMutation(int index); @@ -2959,6 +2960,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public abstract Mutation[] getMutationsForCoprocs(); public abstract boolean isInReplay(); public abstract long getReplaySequenceId(); + public abstract boolean isAtomic(); public boolean isDone() { return nextIndexToProcess == operations.length; @@ -2968,10 +2970,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private static class MutationBatch extends BatchOperation { private long nonceGroup; private long nonce; - public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) { + private boolean atomic; + public MutationBatch(Mutation[] operations, long nonceGroup, long nonce, boolean atomic) { super(operations); this.nonceGroup = nonceGroup; this.nonce = nonce; + this.atomic = atomic; } @Override @@ -3003,6 +3007,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public long getReplaySequenceId() { return 0; } + + @Override + public boolean isAtomic() { + return atomic; + } } private static class ReplayBatch extends BatchOperation { @@ -3042,20 +3051,88 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public long getReplaySequenceId() { return this.replaySeqId; } + + @Override + public boolean isAtomic() { + return false; + } } + /** + * Perform a list of mutations. It supports only Put and Delete mutations and will ignore other + * types passed.If {@code atomic} is true then mutations are applied atomically on all or none + * basis by locking all rows corresponding to input mutations if {@code rowsToLock} is null. If + * {@code rowsToLock} are provided explicitly (non-null) then input rows are locked before + * applying mutations. No validation is done on {@code rowsToLock} with respect to which rows + * are being mutated. + *

+ * If {@code atomic} is false, then mutations are applied in sets as and when locks are + * acquired on rows from mutations. Specifying {@code rowsToLock} explicitly is not supported + * and this argument needs to be null. + *

+ * NOTE: Its recommended that input mutations and rows to lock be sorted by rows in ascending + * order. + * @param mutations the list of mutations + * @param rowsToLock rows to lock, supported only if {@code atomic} is true. + * @param nonceGroup + * @param nonce + * @param atomic if input mutations needs to be applied atomically + * @return an array of OperationStatus which internally contains the + * OperationStatusCode and the exceptionMessage if any. + * @throws IOException + */ @Override - public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) - throws IOException { + public OperationStatus[] batchMutate(Mutation[] mutations, byte[][] rowsToLock, long nonceGroup, + long nonce, boolean atomic) throws IOException { // As it stands, this is used for 3 things // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... - return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); + return batchMutate(new MutationBatch(mutations, nonceGroup, nonce, atomic), rowsToLock); } - public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { - return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); + /** + * Perform a list of mutations. It supports only Put and Delete mutations and will ignore + * other types passed. If {@code atomic} is true then mutations are applied atomically on all + * or none basis by locking all rows corresponding to input mutations. If {@code atomic} is + * false, then mutations are applied in sets as and when locks are acquired on rows from + * mutations. + *

+ * NOTE: Its recommended that input mutations be sorted by rows in ascending order. + * @param mutations the list of mutations + * @param nonceGroup + * @param nonce + * @param atomic if input mutations needs to be applied atomically + * @return an array of OperationStatus which internally contains the + * OperationStatusCode and the exceptionMessage if any. + * @throws IOException + */ + @Override + public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce, + boolean atomic) throws IOException { + return batchMutate(mutations, null, nonceGroup, nonce, atomic); + } + + /** + * Perform a list of mutations. It supports only Put and Delete mutations and will ignore + * other types passed. Input mutations are applied in sets as and when locks are acquired on + * rows from mutations. + * NOTE: Its recommended that input mutations be sorted by rows in ascending order. + * @param mutations the list of mutations + * @param nonceGroup + * @param nonce + * @return an array of OperationStatus which internally contains the + * OperationStatusCode and the exceptionMessage if any. + * @throws IOException + */ + @Override + public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) + throws IOException { + return batchMutate(mutations, null, nonceGroup, nonce, false); + } + + public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic) throws IOException { + return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE, atomic); } @Override @@ -3080,36 +3157,48 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } return statuses; } - return batchMutate(new ReplayBatch(mutations, replaySeqId)); + return batchMutate(new ReplayBatch(mutations, replaySeqId), null); } /** - * Perform a batch of mutations. - * It supports only Put and Delete mutations and will ignore other types passed. + * Perform a batch of mutations. It supports only Put and Delete mutations and will ignore + * other types passed. If {@code batchOp.isAtomic()} is true then mutations are applied + * atomically on all or none basis by locking all rows corresponding to input mutations if + * {@code rowsToLock} is null. If rowsToLock is provided explicitly (non-null) then specified rows + * are locked before applying mutations. No validation is done on {@code rowsToLock} with + * respect to which rows are being mutated. + *

+ * If {@code batchOp.isAtomic()} is false, then mutations are applied in sets as and when locks + * are acquired on rows from mutations. Specifying {@code rowsToLock} explicitly is not supported + * and this argument needs to be null. + *

+ * NOTE: Its recommended that input mutations and rows to lock be sorted by rows in ascending + * order. * @param batchOp contains the list of mutations + * @param rowsToLock rows to lock, supported only if {@code atomic} is true. * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - OperationStatus[] batchMutate(BatchOperation batchOp) throws IOException { - boolean initialized = false; + OperationStatus[] batchMutate(BatchOperation batchOp, byte[][] rowsToLock) + throws IOException { Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; startRegionOperation(op); try { + this.writeRequestsCount.add(batchOp.operations.length); + if (!batchOp.isInReplay()) { + doPreBatchMutateHook(batchOp); + } + // STEP 1. Validate operations and prepare batch + validateAndPrepareBatchOp(batchOp, EnvironmentEdgeManager.currentTime()); + while (!batchOp.isDone()) { if (!batchOp.isInReplay()) { checkReadOnly(); } checkResources(); - if (!initialized) { - this.writeRequestsCount.add(batchOp.operations.length); - if (!batchOp.isInReplay()) { - doPreBatchMutateHook(batchOp); - } - initialized = true; - } - doMiniBatchMutate(batchOp); + doMiniBatchMutate(batchOp, rowsToLock); long newSize = this.getMemStoreSize(); requestFlushIfNeeded(newSize); } @@ -3119,8 +3208,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return batchOp.retCodeDetails; } - private void doPreBatchMutateHook(BatchOperation batchOp) - throws IOException { + private void doPreBatchMutateHook(BatchOperation batchOp) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { @@ -3144,11 +3232,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; } } else { + final String msg = "Only Put/Delete mutations are supported in a batch"; + if (batchOp.isAtomic()) { + throw new IOException(msg); + } // In case of passing Append mutations along with the Puts and Deletes in batchMutate // mark the operation return code as failure so that it will not be considered in // the doMiniBatchMutation - batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE, - "Put/Delete mutations only supported in batchMutate() now"); + batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE, msg); } if (!walEdit.isEmpty()) { batchOp.walEditsFromCoprocessors[i] = walEdit; @@ -3159,22 +3250,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)} - * In here we also handle replay of edits on region recover. - * @return Change in size brought about by applying batchOp + * Called to do a full or a piece of the batch that came in to + * {@link #batchMutate(Mutation[], byte[][], long, long, boolean)}. In here we also handle replay + * of edits on region recover. */ - // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120 - private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { + private void doMiniBatchMutate(BatchOperation batchOp, byte[][] rowsToLock) + throws IOException { + boolean atomic = batchOp.isAtomic(); + Preconditions.checkArgument(atomic || rowsToLock == null, + "Locking custom rows for non-atomic batch mutation not supported!"); boolean replay = batchOp.isInReplay(); - long currentNonceGroup = HConstants.NO_NONCE; - long currentNonce = HConstants.NO_NONCE; WALEdit walEdit = null; boolean locked = false; - // reference family maps directly so coprocessors can mutate them if desired - Map>[] familyMaps = new Map[batchOp.operations.length]; - // We try to set up a batch in the range [firstIndex,lastIndexExclusive) - int firstIndex = batchOp.nextIndexToProcess; - int lastIndexExclusive = firstIndex; + // We try to set up a batch in the range [batchOp.nextIndexToProcess,lastIndexExclusive) + int lastIndexExclusive = batchOp.nextIndexToProcess; boolean success = false; int noOfPuts = 0; int noOfDeletes = 0; @@ -3182,278 +3271,152 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int cellCount = 0; /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); - MemStoreSize memstoreSize = new MemStoreSize(); - final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch(); + MemStoreSize memStoreSize = new MemStoreSize(); try { - // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. - int numReadyToWrite = 0; - long now = EnvironmentEdgeManager.currentTime(); - while (lastIndexExclusive < batchOp.operations.length) { - if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) { - lastIndexExclusive++; - continue; - } - Mutation mutation = batchOp.getMutation(lastIndexExclusive); - // If we haven't got any rows in our batch, we should block to get the next one. - RowLock rowLock = null; - try { - rowLock = getRowLockInternal(mutation.getRow(), true); - } catch (TimeoutIOException e) { - // We will retry when other exceptions, but we should stop if we timeout . - throw e; - } catch (IOException ioe) { - LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); - } - if (rowLock == null) { - // We failed to grab another lock - break; // Stop acquiring more rows for this batch - } else { - acquiredRowLocks.add(rowLock); - } - - lastIndexExclusive++; - numReadyToWrite++; - if (replay) { - for (List cells : mutation.getFamilyCellMap().values()) { - cellCount += cells.size(); - } + // STEP 2. Try to acquire adequate number of locks + int startLocksIndex = rowsToLock != null ? 0 : batchOp.nextIndexToProcess; + int locksLength = rowsToLock != null ? rowsToLock.length : batchOp.operations.length; + IntStream rowIndices = IntStream.range(startLocksIndex, locksLength); + if (rowsToLock == null) { + rowIndices = rowIndices.filter(i -> + batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN); + } + Stream rowStream = rowIndices.mapToObj(i -> rowsToLock != null ? rowsToLock[i] : + batchOp.getMutation(i).getRow()); + lockBatchOfRows(rowStream.iterator(), acquiredRowLocks, atomic); + + int numRowsReadyToWrite = batchOp.operations.length; + if (!atomic) { + if (acquiredRowLocks.isEmpty()) { + // Nothing to put/delete - an exception or no lock + return; + } else if (acquiredRowLocks.size() != batchOp.operations.length) { + numRowsReadyToWrite = acquiredRowLocks.size(); } } - // We've now grabbed as many mutations off the list as we can - - // STEP 2. Update any LATEST_TIMESTAMP timestamps + // STEP 3. Update any LATEST_TIMESTAMP timestamps // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp - now = EnvironmentEdgeManager.currentTime(); + long now = EnvironmentEdgeManager.currentTime(); byte[] byteNow = Bytes.toBytes(now); - - // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? - if (numReadyToWrite <= 0) { - return; - } - - for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) { - // skip invalid - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - // lastIndexExclusive was incremented above. + for (int mutationCount = 0; + lastIndexExclusive < batchOp.operations.length && mutationCount < numRowsReadyToWrite; + lastIndexExclusive++) { + if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() != + OperationStatusCode.NOT_RUN) { continue; } + mutationCount++; - Mutation mutation = batchOp.getMutation(i); - if (mutation instanceof Put) { - updateCellTimestamps(familyMaps[i].values(), byteNow); - noOfPuts++; - } else { - prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); - noOfDeletes++; - } - rewriteCellTags(familyMaps[i], mutation); - WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; - if (fromCP != null) { - cellCount += fromCP.size(); - } - if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { - for (List cells : familyMaps[i].values()) { - cellCount += cells.size(); + Mutation mutation = batchOp.getMutation(lastIndexExclusive); + if (!replay) { + if (mutation instanceof Put) { + updateCellTimestamps(batchOp.familyMaps[lastIndexExclusive].values(), byteNow); + noOfPuts++; + } else { + prepareDeleteTimestamps(mutation, batchOp.familyMaps[lastIndexExclusive], byteNow); + noOfDeletes++; + } + rewriteCellTags(batchOp.familyMaps[lastIndexExclusive], mutation); + + WALEdit fromCP = batchOp.walEditsFromCoprocessors[lastIndexExclusive]; + if (fromCP != null) { + cellCount += fromCP.size(); + } + if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { + cellCount += mutation.getFamilyCellMap().values().stream().mapToInt(List::size).sum(); } } } - lock(this.updatesLock.readLock(), numReadyToWrite); + + lock(this.updatesLock.readLock(), numRowsReadyToWrite); locked = true; // calling the pre CP hook for batch mutation if (!replay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress<>( + batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, + batchOp.walEditsFromCoprocessors, batchOp.nextIndexToProcess, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) { - return; - } else { - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - // lastIndexExclusive was incremented above. - continue; - } - // we pass (i - firstIndex) below since the call expects a relative index - Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); - if (cpMutations == null) { - continue; - } - Mutation mutation = batchOp.getMutation(i); - boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL; - // Else Coprocessor added more Mutations corresponding to the Mutation at this index. - for (int j = 0; j < cpMutations.length; j++) { - Mutation cpMutation = cpMutations[j]; - Map> cpFamilyMap = cpMutation.getFamilyCellMap(); - checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now); - - // Acquire row locks. If not, the whole batch will fail. - acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); - - // Returned mutations from coprocessor correspond to the Mutation at index i. We can - // directly add the cells from those mutations to the familyMaps of this mutation. - mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later - - // The durability of returned mutation is replaced by the corresponding mutation. - // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the - // cells of returned mutation. - if (!skipWal) { - for (List cells : cpFamilyMap.values()) { - cellCount += cells.size(); - } - } - } + for (int i = batchOp.nextIndexToProcess; i < lastIndexExclusive; i++) { + // pre hook says skip this mutation + // mark as success and skip in doMiniBatchMutation + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; } + return; } + cellCount += + validateAndMergeCPMutations(batchOp, miniBatchOp, acquiredRowLocks, now, replay); } // STEP 3. Build WAL edit - walEdit = new WALEdit(cellCount, replay); - Durability durability = Durability.USE_DEFAULT; - for (int i = firstIndex; i < lastIndexExclusive; i++) { - // Skip puts that were determined to be invalid during preprocessing - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - continue; - } - - Mutation m = batchOp.getMutation(i); - Durability tmpDur = getEffectiveDurability(m.getDurability()); - if (tmpDur.ordinal() > durability.ordinal()) { - durability = tmpDur; - } - // we use durability of the original mutation for the mutation passed by CP. - if (tmpDur == Durability.SKIP_WAL) { - recordMutationWithoutWal(m.getFamilyCellMap()); - continue; - } - - long nonceGroup = batchOp.getNonceGroup(i); - long nonce = batchOp.getNonce(i); - // In replay, the batch may contain multiple nonces. If so, write WALEdit for each. - // Given how nonces are originally written, these should be contiguous. - // They don't have to be, it will still work, just write more WALEdits than needed. - if (nonceGroup != currentNonceGroup || nonce != currentNonce) { - // Write what we have so far for nonces out to WAL - appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce); - walEdit = new WALEdit(cellCount, replay); - currentNonceGroup = nonceGroup; - currentNonce = nonce; - } - - // Add WAL edits by CP - WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; - if (fromCP != null) { - for (Cell cell : fromCP.getCells()) { - walEdit.add(cell); - } + Map walEditMap = + buildWALEdits(batchOp, lastIndexExclusive, cellCount, replay); + if (!replay && walEditMap.size() > 1) { + throw new IOException("Multiple nonces per batch and not in replay"); + } + + // STEP 4. Append edit to WAL and sync. + Mutation mutation = batchOp.getMutation(batchOp.nextIndexToProcess); + for (Map.Entry walEditEntry : walEditMap.entrySet()) { + walEdit = walEditEntry.getValue(); + NonceKey nonceKey = walEditEntry.getKey(); + writeEntry = doWALAppend(walEdit, getEffectiveDurability(batchOp.durability), + mutation.getClusterIds(), now, nonceKey.getNonceGroup(), nonceKey.getNonce(), + replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID); + if (replay && writeEntry != null) { + mvcc.complete(writeEntry); } - addFamilyMapToWALEdit(familyMaps[i], walEdit); } - // STEP 4. Append the final edit to WAL and sync. - Mutation mutation = batchOp.getMutation(firstIndex); - WALKey walKey = null; - long txid; - if (replay) { - // use wal key from the original - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); - walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); - if (!walEdit.isEmpty()) { - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - if (txid != 0) { - sync(txid, durability); - } - } - } else { - try { - if (!walEdit.isEmpty()) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, - this.getReplicationScope()); - // TODO: Use the doAppend methods below... complicated by the replay stuff above. - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - if (txid != 0) { - sync(txid, durability); - } - if (writeEntry == null) { - // if MVCC not preassigned, wait here until assigned - writeEntry = walKey.getWriteEntry(); - } - } - } catch (IOException ioe) { - if (walKey != null && writeEntry == null) { - // the writeEntry is not preassigned and error occurred during append or sync - mvcc.complete(walKey.getWriteEntry()); - } - throw ioe; - } - } - if (walKey == null) { + if (!replay && writeEntry == null) { // If no walKey, then not in replay and skipping WAL or some such. Begin an MVCC transaction // to get sequence id. writeEntry = mvcc.begin(); } - // STEP 5. Write back to memstore - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - continue; - } - // We need to update the sequence id for following reasons. - // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. - // 2) If no WAL, FSWALEntry won't be used - // we use durability of the original mutation for the mutation passed by CP. - boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; - if (updateSeqId) { - this.updateSequenceId(familyMaps[i].values(), - replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); - } - applyFamilyMapToMemStore(familyMaps[i], memstoreSize); - } - - // update memstore size - this.addAndGetMemStoreSize(memstoreSize); + // STEP 5. Write back to memStore + writeBatchToMemstore(batchOp, lastIndexExclusive, memStoreSize, writeEntry, replay); // calling the post CP hook for batch mutation if (!replay && coprocessorHost != null) { - MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress<>( + batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, + batchOp.walEditsFromCoprocessors, batchOp.nextIndexToProcess, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } // STEP 6. Complete mvcc. if (replay) { this.mvcc.advanceTo(batchOp.getReplaySequenceId()); - } else { + } else if (writeEntry != null) { // writeEntry won't be empty if not in replay mode mvcc.completeAndWait(writeEntry); writeEntry = null; } - // STEP 7. Release row locks, etc. + success = true; + } finally { + // Call complete rather than completeAndWait because we probably had error if walKey != null + if (writeEntry != null) mvcc.complete(writeEntry); + if (locked) { this.updatesLock.readLock().unlock(); locked = false; } releaseRowLocks(acquiredRowLocks); - for (int i = firstIndex; i < lastIndexExclusive; i ++) { + for (int i = batchOp.nextIndexToProcess; i < lastIndexExclusive; i ++) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + batchOp.retCodeDetails[i] = success ? OperationStatus.SUCCESS : OperationStatus.FAILURE; } } // STEP 8. Run coprocessor post hooks. This should be done after the wal is // synced so that the coprocessor contract is adhered to. if (!replay && coprocessorHost != null) { - for (int i = firstIndex; i < lastIndexExclusive; i++) { + for (int i = batchOp.nextIndexToProcess; i < lastIndexExclusive; i++) { // only for successful puts if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { @@ -3468,15 +3431,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - success = true; - } finally { - // Call complete rather than completeAndWait because we probably had error if walKey != null - if (writeEntry != null) mvcc.complete(writeEntry); - if (locked) { - this.updatesLock.readLock().unlock(); - } - releaseRowLocks(acquiredRowLocks); - // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. // null will be treated as unknown. @@ -3495,26 +3449,230 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion.updateDelete(); } } - if (!success) { - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { - batchOp.retCodeDetails[i] = OperationStatus.FAILURE; - } - } - } + if (coprocessorHost != null && !batchOp.isInReplay()) { // call the coprocessor hook to do any finalization steps // after the put is done MiniBatchOperationInProgress miniBatchOp = - new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), - batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, + batchOp.nextIndexToProcess, lastIndexExclusive); coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); } + for (; lastIndexExclusive < batchOp.operations.length && + batchOp.retCodeDetails[lastIndexExclusive] != OperationStatus.NOT_RUN; + lastIndexExclusive++) {} batchOp.nextIndexToProcess = lastIndexExclusive; } } + /** + * This method checks if batch operation is valid and stores familyMap of a valid operation into + * the input {@code familyMaps} + * @param batchOp batch of operations + * @throws IOException + */ + private void validateAndPrepareBatchOp(final BatchOperation batchOp, long timestamp) + throws IOException { + int firstValidNotRunMutation = batchOp.operations.length; + for (int i = 0; i < batchOp.operations.length; i++) { + // Skip anything that "ran" already + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + continue; + } + + Mutation mutation = batchOp.getMutation(i); + + try { + checkAndPrepareMutation(mutation, batchOp.isInReplay(), timestamp); + } catch (NoSuchColumnFamilyException nscfe) { + final String msg = "No such column family in batch mutation. "; + if (batchOp.observedExceptions.hasSeenNoSuchFamily()) { + LOG.warn(msg + nscfe.getMessage()); + } else { + LOG.warn(msg, nscfe); + batchOp.observedExceptions.sawNoSuchFamily(); + } + batchOp.retCodeDetails[i] = + new OperationStatus(OperationStatusCode.BAD_FAMILY, nscfe.getMessage()); + if (batchOp.isAtomic()) throw nscfe; + } catch (FailedSanityCheckException fsce) { + final String msg = "Batch Mutation did not pass sanity check. "; + if (batchOp.observedExceptions.hasSeenFailedSanityCheck()) { + LOG.warn(msg + fsce.getMessage()); + } else { + LOG.warn(msg, fsce); + batchOp.observedExceptions.sawFailedSanityCheck(); + } + batchOp.retCodeDetails[i] = + new OperationStatus(OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); + if (batchOp.isAtomic()) throw fsce; + } catch (WrongRegionException we) { + final String msg = "Batch mutation had a row that does not belong to this region. "; + if (batchOp.observedExceptions.hasSeenWrongRegion()) { + LOG.warn(msg + we.getMessage()); + } else { + LOG.warn(msg, we); + batchOp.observedExceptions.sawWrongRegion(); + } + batchOp.retCodeDetails[i] = + new OperationStatus(OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); + if (batchOp.isAtomic()) throw we; + } + + if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { + if (firstValidNotRunMutation > i) { + firstValidNotRunMutation = i; + } + // store the family map reference to allow for mutations + batchOp.familyMaps[i] = mutation.getFamilyCellMap(); + + // store highest level of durability seen for the batch + Durability tmpDurability = mutation.getDurability(); + if (tmpDurability.ordinal() > batchOp.durability.ordinal()) { + batchOp.durability = tmpDurability; + } + } + } + batchOp.nextIndexToProcess = firstValidNotRunMutation; + } + + /** + * Tries to get as many locks as required on input rows + * @param rowsToLock rows to lock + * @param acquiredRowLocks list of rows locked + * @param atomic if true, all rows need to be locked. Exception is thrown otherwise + * @throws IOException + */ + private void lockBatchOfRows(final Iterator rowsToLock, + final List acquiredRowLocks, boolean atomic) throws IOException { + while (rowsToLock.hasNext()) { + byte[] row = rowsToLock.next(); + RowLock rowLock = null; + try { + rowLock = getRowLockInternal(row, !atomic); + } catch (TimeoutIOException e) { + // We will retry when other exceptions, but we should stop if we timeout . + throw e; + } catch (IOException ioe) { + LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(row), ioe); + if (atomic) { + throw ioe; + } + } + if (rowLock != null) { + acquiredRowLocks.add(rowLock); + } else { + if (atomic) { + // failed to get all locks + throw new IOException("Can't apply all operations atomically!"); + } else { + break; // We failed to grab another lock, stop acquiring more rows for this batch + } + } + } + } + + private int validateAndMergeCPMutations(final BatchOperation batchOp, + final MiniBatchOperationInProgress miniBatchOp, + final List acquiredRowLocks, long timestamp, boolean replay) throws IOException { + int cellCount = 0; + int firstIndex = batchOp.nextIndexToProcess; + int lastIndexExclusive = firstIndex + miniBatchOp.size(); + for (int i = batchOp.nextIndexToProcess; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + // lastIndexExclusive was incremented above. + continue; + } + // we pass (i - firstIndex) below since the call expects a relative index + Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); + if (cpMutations == null) { + continue; + } + Mutation mutation = batchOp.getMutation(i); + boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL; + // Else Coprocessor added more Mutations corresponding to the Mutation at this index. + for (Mutation cpMutation : cpMutations) { + checkAndPrepareMutation(cpMutation, replay, timestamp); + + // Acquire row locks. If not, the whole batch will fail. + acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); + + // Returned mutations from coprocessor correspond to the Mutation at index i. We can + // directly add the cells from those mutations to the familyMaps of this mutation. + Map> cpFamilyMap = cpMutation.getFamilyCellMap(); + mergeFamilyMaps(batchOp.familyMaps[i], cpFamilyMap); // will get added to the memstore later + + // The durability of returned mutation is replaced by the corresponding mutation. + // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the + // cells of returned mutation. + if (!skipWal) { + cellCount += cpFamilyMap.values().stream().mapToInt(List::size).sum(); + } + } + } + return cellCount; + } + + private Map buildWALEdits(final BatchOperation batchOp, + int lastIndexExclusive, int cellCount, boolean replay) { + Map walEditMap = new HashMap<>(); + for (int i = batchOp.nextIndexToProcess; i < lastIndexExclusive; i++) { + // Skip puts that were determined to be invalid during pre-processing + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + continue; + } + + Mutation m = batchOp.getMutation(i); + // we use durability of the original mutation for the mutation passed by CP. + if (getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) { + recordMutationWithoutWal(m.getFamilyCellMap()); + continue; + } + + NonceKey nonceKey = new NonceKey(batchOp.getNonceGroup(i), batchOp.getNonce(i)); + WALEdit walEdit = walEditMap.get(nonceKey); + if (walEdit == null) { + walEdit = new WALEdit(cellCount, replay); + walEditMap.put(nonceKey, walEdit); + } + + // Add WAL edits by CP + WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; + if (fromCP != null) { + for (Cell cell : fromCP.getCells()) { + walEdit.add(cell); + } + } + addFamilyMapToWALEdit(batchOp.familyMaps[i], walEdit); + } + return walEditMap; + } + + private void writeBatchToMemstore(final BatchOperation batchOp, int lastIndexExclusive, + final MemStoreSize memStoreSize, final WriteEntry writeEntry, boolean replay) + throws IOException { + for (int i = batchOp.nextIndexToProcess; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + continue; + } + // We need to update the sequence id for following reasons. + // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. + // 2) If no WAL, FSWALEntry won't be used + // we use durability of the original mutation for the mutation passed by CP. + boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; + if (updateSeqId) { + this.updateSequenceId(batchOp.familyMaps[i].values(), + replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); + } + applyFamilyMapToMemStore(batchOp.familyMaps[i], memStoreSize); + } + + // update memstore size + this.addAndGetMemStoreSize(memStoreSize); + } + private void mergeFamilyMaps(Map> familyMap, Map> toBeMerged) { for (Map.Entry> entry : toBeMerged.entrySet()) { @@ -3527,76 +3685,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private void appendCurrentNonces(final Mutation mutation, final boolean replay, - final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce) - throws IOException { - if (walEdit.isEmpty()) return; - if (!replay) throw new IOException("Multiple nonces per batch and not in replay"); - WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, mutation.getClusterIds(), - currentNonceGroup, currentNonce, mvcc, this.getReplicationScope()); - this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - // Complete the mvcc transaction started down in append else it will block others - this.mvcc.complete(walKey.getWriteEntry()); - } - - private boolean checkBatchOp(BatchOperation batchOp, final int lastIndexExclusive, - final Map>[] familyMaps, final long now, - final ObservedExceptionsInBatch observedExceptions) - throws IOException { - boolean skip = false; - // Skip anything that "ran" already - if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - return true; - } - Mutation mutation = batchOp.getMutation(lastIndexExclusive); + private void checkAndPrepareMutation(Mutation mutation, boolean replay, final long now) + throws IOException { Map> familyMap = mutation.getFamilyCellMap(); - // store the family map reference to allow for mutations - familyMaps[lastIndexExclusive] = familyMap; - - try { - checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now); - } catch (NoSuchColumnFamilyException nscf) { - final String msg = "No such column family in batch mutation. "; - if (observedExceptions.hasSeenNoSuchFamily()) { - LOG.warn(msg + nscf.getMessage()); - } else { - LOG.warn(msg, nscf); - observedExceptions.sawNoSuchFamily(); - } - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.BAD_FAMILY, nscf.getMessage()); - skip = true; - } catch (FailedSanityCheckException fsce) { - final String msg = "Batch Mutation did not pass sanity check. "; - if (observedExceptions.hasSeenFailedSanityCheck()) { - LOG.warn(msg + fsce.getMessage()); - } else { - LOG.warn(msg, fsce); - observedExceptions.sawFailedSanityCheck(); - } - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); - skip = true; - } catch (WrongRegionException we) { - final String msg = "Batch mutation had a row that does not belong to this region. "; - if (observedExceptions.hasSeenWrongRegion()) { - LOG.warn(msg + we.getMessage()); - } else { - LOG.warn(msg, we); - observedExceptions.sawWrongRegion(); - } - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); - skip = true; - } - return skip; - } - - private void checkAndPrepareMutation(Mutation mutation, boolean replay, - final Map> familyMap, final long now) - throws IOException { + checkRow(mutation.getRow(), "doMiniBatchMutation"); if (mutation instanceof Put) { // Check the families in the put. If bad, skip this one. if (replay) { @@ -3604,11 +3696,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } else { checkFamilies(familyMap.keySet()); } - checkTimestamps(mutation.getFamilyCellMap(), now); + checkTimestamps(familyMap, now); } else { prepareDelete((Delete)mutation); } - checkRow(mutation.getRow(), "doMiniBatchMutation"); } /** @@ -3639,7 +3730,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * the table descriptor. */ protected Durability getEffectiveDurability(Durability d) { - return d == Durability.USE_DEFAULT ? this.durability : d; + return d == Durability.USE_DEFAULT ? this.regionDurability : d; } @Override @@ -3801,7 +3892,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private void doBatchMutate(Mutation mutation) throws IOException { // Currently this is only called for puts and deletes, so no nonces. - OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}); + OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}, false); if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { @@ -6855,7 +6946,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } - void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException { + void prepareGet(final Get get) throws IOException { checkRow(get.getRow(), "Get"); // Verify families are all valid if (get.hasFamilies()) { @@ -6917,8 +7008,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void mutateRow(RowMutations rm) throws IOException { - // Don't need nonces here - RowMutations only supports puts and deletes - mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); + batchMutate(rm.getMutations().toArray(new Mutation[0]), true); } /** @@ -6945,9 +7035,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { - writeRequestsCount.add(mutations.size()); - MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); - processRowsWithLocks(proc, -1, nonceGroup, nonce); + batchMutate(mutations.toArray(new Mutation[0]), rowsToLock.toArray(new byte[0][0]), nonceGroup, + nonce, true); } /** @@ -6977,8 +7066,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void processRowsWithLocks(RowProcessor processor) throws IOException { - processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE, - HConstants.NO_NONCE); + processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE, HConstants.NO_NONCE); } @Override @@ -7045,7 +7133,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // STEP 6. Append and sync if walEdit has data to write out. if (!walEdit.isEmpty()) { writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()), - processor.getClusterIds(), now, nonceGroup, nonce); + processor.getClusterIds(), now, nonceGroup, nonce, WALKey.NO_SEQUENCE_ID); } else { // We are here if WAL is being skipped. writeEntry = this.mvcc.begin(); @@ -7278,32 +7366,40 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long nonce) throws IOException { return doWALAppend(walEdit, durability, WALKey.EMPTY_UUIDS, System.currentTimeMillis(), - nonceGroup, nonce); + nonceGroup, nonce, WALKey.NO_SEQUENCE_ID); } /** * @return writeEntry associated with this append */ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, - long now, long nonceGroup, long nonce) - throws IOException { + long now, long nonceGroup, long nonce, long replaySeqId) throws IOException { WriteEntry writeEntry = null; - // Using default cluster id, as this can only happen in the originating cluster. - // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey - // here instead of WALKey directly to support legacy coprocessors. - WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, - nonceGroup, nonce, mvcc, this.getReplicationScope()); - try { - long txid = - this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - // Call sync on our edit. - if (txid != 0) sync(txid, durability); - writeEntry = walKey.getWriteEntry(); - } catch (IOException ioe) { - if (walKey != null) mvcc.complete(walKey.getWriteEntry()); - throw ioe; + if (!walEdit.isEmpty()) { + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, + nonce, mvcc) : + new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + clusterIds, nonceGroup, nonce, mvcc, this.getReplicationScope()); + if (walEdit.isReplay()) { + walKey.setOrigLogSeqNum(replaySeqId); + } + + try { + long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); + // Call sync on our edit. + if (txid != 0) sync(txid, durability); + writeEntry = walKey.getWriteEntry(); + } catch (IOException ioe) { + if (walKey != null) mvcc.complete(walKey.getWriteEntry()); + throw ioe; + } } + return writeEntry; } @@ -7671,13 +7767,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * Give the region a chance to prepare before it is split. - */ - protected void prepareToSplit() { - // nothing - } - - /** * Return the splitpoint. null indicates the region isn't splittable * If the splitpoint isn't explicitly specified, it will go over the stores * to find the best splitpoint. Currently the criteria of best splitpoint @@ -7941,7 +8030,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Check whether we should sync the wal from the table's durability settings */ private boolean shouldSyncWAL() { - return durability.ordinal() > Durability.ASYNC_WAL.ordinal(); + return regionDurability.ordinal() > Durability.ASYNC_WAL + .ordinal(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java deleted file mode 100644 index 09ac73d3293d3231ff6c4bd959dcabcc7b869c92..0000000000000000000000000000000000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest; -import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * A MultiRowProcessor that performs multiple puts and deletes. - */ -@InterfaceAudience.Private -class MultiRowMutationProcessor extends BaseRowProcessor { - Collection rowsToLock; - Collection mutations; - MiniBatchOperationInProgress miniBatch; - - MultiRowMutationProcessor(Collection mutations, - Collection rowsToLock) { - this.rowsToLock = rowsToLock; - this.mutations = mutations; - } - - @Override - public Collection getRowsToLock() { - return rowsToLock; - } - - @Override - public boolean readOnly() { - return false; - } - - @Override - public MultiRowMutationProcessorResponse getResult() { - return MultiRowMutationProcessorResponse.getDefaultInstance(); - } - - @Override - public void process(long now, - HRegion region, - List mutationsToApply, - WALEdit walEdit) throws IOException { - byte[] byteNow = Bytes.toBytes(now); - // Check mutations - for (Mutation m : this.mutations) { - if (m instanceof Put) { - Map> familyMap = m.getFamilyCellMap(); - region.checkFamilies(familyMap.keySet()); - region.checkTimestamps(familyMap, now); - region.updateCellTimestamps(familyMap.values(), byteNow); - } else if (m instanceof Delete) { - Delete d = (Delete) m; - region.prepareDelete(d); - region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow); - } else { - throw new DoNotRetryIOException("Action must be Put or Delete. But was: " - + m.getClass().getName()); - } - mutationsToApply.add(m); - } - // Apply edits to a single WALEdit - for (Mutation m : mutations) { - for (List cells : m.getFamilyCellMap().values()) { - boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL; - for (Cell cell : cells) { - if (writeToWAL) walEdit.add(cell); - } - } - } - } - - @Override - public void preProcess(HRegion region, WALEdit walEdit) throws IOException { - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - if (coprocessorHost != null) { - for (Mutation m : mutations) { - if (m instanceof Put) { - if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { - // by pass everything - return; - } - } else if (m instanceof Delete) { - Delete d = (Delete) m; - region.prepareDelete(d); - if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) { - // by pass everything - return; - } - } - } - } - } - - @Override - public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { - // TODO we should return back the status of this hook run to HRegion so that those Mutations - // with OperationStatus as SUCCESS or FAILURE should not get applied to memstore. - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - OperationStatus[] opStatus = new OperationStatus[mutations.size()]; - Arrays.fill(opStatus, OperationStatus.NOT_RUN); - WALEdit[] walEditsFromCP = new WALEdit[mutations.size()]; - if (coprocessorHost != null) { - miniBatch = new MiniBatchOperationInProgress<>( - mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0, - mutations.size()); - coprocessorHost.preBatchMutate(miniBatch); - } - // Apply edits to a single WALEdit - for (int i = 0; i < mutations.size(); i++) { - if (opStatus[i] == OperationStatus.NOT_RUN) { - // Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook - // itself. No need to apply again to region - if (walEditsFromCP[i] != null) { - // Add the WALEdit created by CP hook - for (Cell walCell : walEditsFromCP[i].getCells()) { - walEdit.add(walCell); - } - } - } - } - } - - @Override - public void postBatchMutate(HRegion region) throws IOException { - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - if (coprocessorHost != null) { - assert miniBatch != null; - // Use the same miniBatch state used to call the preBatchMutate() - coprocessorHost.postBatchMutate(miniBatch); - } - } - - @Override - public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException { - RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); - if (coprocessorHost != null) { - for (Mutation m : mutations) { - if (m instanceof Put) { - coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); - } else if (m instanceof Delete) { - coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); - } - } - // At the end call the CP hook postBatchMutateIndispensably - if (miniBatch != null) { - // Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a - // read only process. Then no need to call this batch based CP hook also. - coprocessorHost.postBatchMutateIndispensably(miniBatch, success); - } - } - } - - @Override - public MultiRowMutationProcessorRequest getRequestData() { - return MultiRowMutationProcessorRequest.getDefaultInstance(); - } - - @Override - public void initialize(MultiRowMutationProcessorRequest msg) { - //nothing - } - - @Override - public Durability useDurability() { - // return true when at least one mutation requested a WAL flush (default) - Durability durability = Durability.USE_DEFAULT; - for (Mutation m : mutations) { - if (m.getDurability().ordinal() > durability.ordinal()) { - durability = m.getDurability(); - } - } - return durability; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index dba314dc4f9a0adcd8ba38828b4ba1044d9dbf72..cfdfc9ef89b34c8c71ec1479c6697a3488f729a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -547,45 +547,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param cellScanner if non-null, the mutation data -- the Cell content. * @throws IOException */ - private void mutateRows(final Region region, - final List actions, - final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException { - if (!region.getRegionInfo().isMetaTable()) { - regionServer.cacheFlusher.reclaimMemStoreMemory(); - } - RowMutations rm = null; - int i = 0; - ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = - ClientProtos.ResultOrException.newBuilder(); + private void mutateRows(final Region region, final OperationQuota quota, + final List actions, final CellScanner cellScanner, + final RegionActionResult.Builder builder, final ActivePolicyEnforcement spaceQuotaEnforcement) + throws IOException { for (ClientProtos.Action action: actions) { if (action.hasGet()) { throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); } - MutationType type = action.getMutation().getMutateType(); - if (rm == null) { - rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); - } - switch (type) { - case PUT: - Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); - checkCellSizeLimit(region, put); - rm.add(put); - break; - case DELETE: - rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); - break; - default: - throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); - } - // To unify the response format with doNonAtomicRegionMutation and read through client's - // AsyncProcess we have to add an empty result instance per operation - resultOrExceptionOrBuilder.clear(); - resultOrExceptionOrBuilder.setIndex(i++); - builder.addResultOrException( - resultOrExceptionOrBuilder.build()); } - region.mutateRow(rm); + doBatchOp(builder, region, quota, actions, cellScanner, spaceQuotaEnforcement, true); } /** @@ -860,7 +832,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && !mutations.isEmpty()) { // Flush out any Puts or Deletes already collected. - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); + doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); mutations.clear(); } switch (type) { @@ -921,7 +893,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } // Finish up any outstanding mutations if (mutations != null && !mutations.isEmpty()) { - doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); + try { + doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false); + } catch (IOException ie) { + rpcServer.getMetrics().exception(ie); + hasResultOrException = true; + NameBytesPair pair = ResponseConverter.buildException(ie); + resultOrExceptionBuilder.setException(pair); + context.incrementResponseExceptionSize(pair.getSerializedSize()); + } } return cellsToReturn; } @@ -955,7 +935,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private void doBatchOp(final RegionActionResult.Builder builder, final Region region, final OperationQuota quota, final List mutations, - final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { + final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement, boolean atomic) + throws IOException { Mutation[] mArray = new Mutation[mutations.size()]; long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; @@ -991,16 +972,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionServer.cacheFlusher.reclaimMemStoreMemory(); } - // HBASE-17924 - // sort to improve lock efficiency - Arrays.sort(mArray); + if (!atomic) { + // HBASE-17924 + // sort to improve lock efficiency + Arrays.sort(mArray); + } OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE, - HConstants.NO_NONCE); + HConstants.NO_NONCE, atomic); for (i = 0; i < codes.length; i++) { Mutation currentMutation = mArray[i]; ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); - int index = currentAction.getIndex(); + int index = currentAction.hasIndex() || !atomic ? currentAction.getIndex() : i; Exception e = null; switch (codes[i].getOperationStatusCode()) { case BAD_FAMILY: @@ -1025,6 +1008,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } } catch (IOException ie) { + if (atomic) throw ie; for (int i = 0; i < mutations.size(); i++) { builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex())); } @@ -2603,8 +2587,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, cellScanner, row, family, qualifier, op, comparator, regionActionResultBuilder, spaceQuotaEnforcement); } else { - mutateRows(region, regionAction.getActionList(), cellScanner, - regionActionResultBuilder); + mutateRows(region, quota, regionAction.getActionList(), cellScanner, + regionActionResultBuilder, spaceQuotaEnforcement); processed = Boolean.TRUE; } } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index e3ba2fa8da4092db7d26b4d86bae860e929c680b..96a738a9393f73f5a4a068e5a41cc6d2a5781550 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -344,6 +344,36 @@ public interface Region extends ConfigurationObserver { *

* Note this supports only Put and Delete mutations and will ignore other types passed. * @param mutations the list of mutations + * @param rowsToLock rows to lock + * @param nonceGroup + * @param nonce + * @param atomic if input mutations needs to be applied atomically + * @return an array of OperationStatus which internally contains the + * OperationStatusCode and the exceptionMessage if any. + * @throws IOException + */ + OperationStatus[] batchMutate(Mutation[] mutations, byte[][] rowsToLock, long nonceGroup, + long nonce, boolean atomic) throws IOException; + + /** + * Perform a batch of mutations. + *

+ * Note this supports only Put and Delete mutations and will ignore other types passed. + * @param mutations the list of mutations + * @param nonceGroup + * @param nonce + * @param atomic if input mutations needs to be applied atomically + * @return an array of OperationStatus which internally contains the + * OperationStatusCode and the exceptionMessage if any. + * @throws IOException + */ + OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce, boolean atomic) + throws IOException; + /** + * Perform a batch of mutations (not necessarily in atomic) + *

+ * Note this supports only Put and Delete mutations and will ignore other types passed. + * @param mutations the list of mutations * @param nonceGroup * @param nonce * @return an array of OperationStatus which internally contains the diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a7793f6045ed3a988d5b737191a4cebfed8ee945..36556042f283fdf16bca0c7edc561ea4d73f793e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1535,7 +1535,7 @@ public class TestHRegion { puts[i].addColumn(cf, qual, val); } - OperationStatus[] codes = this.region.batchMutate(puts); + OperationStatus[] codes = this.region.batchMutate(puts, false); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); @@ -1544,7 +1544,7 @@ public class TestHRegion { LOG.info("Next a batch put with one invalid family"); puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, val); - codes = this.region.batchMutate(puts); + codes = this.region.batchMutate(puts, false); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, @@ -1591,7 +1591,7 @@ public class TestHRegion { @Override public void doWork() throws IOException { startingPuts.countDown(); - retFromThread.set(region.batchMutate(puts)); + retFromThread.set(region.batchMutate(puts, false)); } }; LOG.info("...starting put thread while holding locks"); @@ -1681,7 +1681,7 @@ public class TestHRegion { puts[i].addColumn(cf, qual, val); } - OperationStatus[] codes = this.region.batchMutate(puts); + OperationStatus[] codes = this.region.batchMutate(puts, false); assertEquals(10, codes.length); for (int i = 0; i < 10; i++) { assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode()); @@ -6306,7 +6306,7 @@ public class TestHRegion { new Put(b).addImmutable(fam1, null, null), }; - OperationStatus[] status = region.batchMutate(mutations); + OperationStatus[] status = region.batchMutate(mutations, false); assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS); assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SANITY_CHECK_FAILURE); assertEquals(status[2].getOperationStatusCode(), OperationStatusCode.SUCCESS); @@ -6339,7 +6339,7 @@ public class TestHRegion { }; // this will wait for the row lock, and it will eventually succeed - OperationStatus[] status = region.batchMutate(mutations); + OperationStatus[] status = region.batchMutate(mutations, false); assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS); assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SUCCESS); return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java index 6bbb81dda0b7259b4f1c53c32cb196778cfc917a..4c562523d3a1dc9c21eb98eb4e63b02570f6535f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java @@ -218,7 +218,7 @@ public class TestParallelPut { put.addColumn(fam1, qual1, value); in[0] = put; try { - OperationStatus[] ret = region.batchMutate(in); + OperationStatus[] ret = region.batchMutate(in, false); assertEquals(1, ret.length); assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode()); assertGet(this.region, rowkey, fam1, qual1, value); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java index fabd07554ebc0d4a76e9c54c3a345d7983ca967a..c4018ec0a94811a9c019720e0d73badbcd747a92 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java @@ -118,7 +118,7 @@ public class TestWALMonotonicallyIncreasingSeqId { put.addColumn("cf".getBytes(), Bytes.toBytes(0), Bytes.toBytes("")); //put.setDurability(Durability.ASYNC_WAL); latch.await(); - region.batchMutate(new Mutation[]{put}); + region.batchMutate(new Mutation[]{put}, false); Thread.sleep(10); } -- 2.10.1 (Apple Git-78)