From 4853963a0c391af965203190c3ba8cbcab7b3564 Mon Sep 17 00:00:00 2001 From: Umesh Agashe Date: Fri, 6 Oct 2017 15:40:05 -0700 Subject: [PATCH] HBASE-18960 A few bug fixes and minor improvements around batchMutate * batch validation and preparation is done before we start iterating over operations for writes * durability, familyCellMaps and observedExceptions are batch wide and are now sotred in BatchOperation, as a result durability is consistent across all operations in a batch * for all operations done by preBatchMutate() CP hook, operation status is updated to success * doWALAppend() is modified to habdle replay and is used from doMiniBatchMutate() * minor improvements --- .../org/apache/hadoop/hbase/util/NonceKey.java | 1 - .../apache/hadoop/hbase/regionserver/HRegion.java | 391 +++++++++------------ 2 files changed, 170 insertions(+), 222 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java index 6da808ed639f3ae2268097ed3d1fb92b2be790d7..7e15c6fe6ac6bce23533ef83f53ec38f19a17d55 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java @@ -31,7 +31,6 @@ public class NonceKey { private long nonce; public NonceKey(long group, long nonce) { - assert nonce != HConstants.NO_NONCE; this.group = group; this.nonce = nonce; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2d35fb96a72a1dd09b21dd6af07c7b31ed14a679..6dc2667712071500adf4d518b739479aa09be03e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -645,7 +645,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final MetricsRegion metricsRegion; private final MetricsRegionWrapperImpl metricsRegionWrapper; - private final Durability durability; + private final Durability regionDurability; private final boolean regionStatsEnabled; // Stores the replication scope of the various column families of the table // that has non-default scope @@ -771,9 +771,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ this.rowProcessorTimeout = conf.getLong( "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); - this.durability = htd.getDurability() == Durability.USE_DEFAULT - ? DEFAULT_DURABILITY - : htd.getDurability(); + this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ? + DEFAULT_DURABILITY : htd.getDurability(); if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -1923,13 +1922,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // upkeep. ////////////////////////////////////////////////////////////////////////////// /** - * @return returns size of largest HStore. - */ - public long getLargestHStoreSize() { - return stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L); - } - - /** * Do preparation for pending compaction. * @throws IOException */ @@ -2943,12 +2935,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; WALEdit[] walEditsFromCoprocessors; + // reference family cell maps directly so coprocessors can mutate them if desired + Map>[] familyCellMaps; + ObservedExceptionsInBatch observedExceptions; + Durability durability; //Durability of the batch (highest durability of all operations) public BatchOperation(T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; this.walEditsFromCoprocessors = new WALEdit[operations.length]; Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); + familyCellMaps = new Map[operations.length]; + observedExceptions = new ObservedExceptionsInBatch(); + durability = Durability.USE_DEFAULT; } public abstract Mutation getMutation(int index); @@ -2964,10 +2963,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private static class MutationBatch extends BatchOperation { + private static class MutationBatchOperation extends BatchOperation { private long nonceGroup; private long nonce; - public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) { + public MutationBatchOperation(Mutation[] operations, long nonceGroup, long nonce) { super(operations); this.nonceGroup = nonceGroup; this.nonce = nonce; @@ -3004,9 +3003,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private static class ReplayBatch extends BatchOperation { + private static class ReplayBatchOperation extends BatchOperation { private long replaySeqId = 0; - public ReplayBatch(MutationReplay[] operations, long seqId) { + public ReplayBatchOperation(MutationReplay[] operations, long seqId) { super(operations); this.replaySeqId = seqId; } @@ -3050,7 +3049,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... - return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); + return batchMutate(new MutationBatchOperation(mutations, nonceGroup, nonce)); } public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { @@ -3079,7 +3078,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } return statuses; } - return batchMutate(new ReplayBatch(mutations, replaySeqId)); + return batchMutate(new ReplayBatchOperation(mutations, replaySeqId)); } /** @@ -3103,9 +3102,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!initialized) { this.writeRequestsCount.add(batchOp.operations.length); - if (!batchOp.isInReplay()) { - doPreBatchMutateHook(batchOp); - } + // run coprocessor pre-hooks, validate and prepare batch for write + prepareMutationBatch(batchOp); initialized = true; } doMiniBatchMutate(batchOp); @@ -3118,13 +3116,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return batchOp.retCodeDetails; } - private void doPreBatchMutateHook(BatchOperation batchOp) - throws IOException { - /* Run coprocessor pre hook outside of locks to avoid deadlock */ + /** + * Runs prePut/ preDelete coprocessor hooks for each mutation in a batch. Validates each + * mutation and prepares a batch for write + * @param batchOp + * @throws IOException + */ + private void prepareMutationBatch(BatchOperation batchOp) throws IOException { WALEdit walEdit = new WALEdit(); - if (coprocessorHost != null) { - for (int i = 0 ; i < batchOp.operations.length; i++) { - Mutation m = batchOp.getMutation(i); + for (int i = 0 ; i < batchOp.operations.length; i++) { + Mutation m = batchOp.getMutation(i); + /* Run coprocessor pre hook outside of locks to avoid deadlock */ + if (!batchOp.isInReplay() && coprocessorHost != null) { if (m instanceof Put) { if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { // pre hook says skip this Put @@ -3154,6 +3157,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi walEdit = new WALEdit(); } } + checkAndPrepareBatchOp(batchOp, i, EnvironmentEdgeManager.currentTime()); } } @@ -3169,26 +3173,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long currentNonce = HConstants.NO_NONCE; WALEdit walEdit = null; boolean locked = false; - // reference family maps directly so coprocessors can mutate them if desired - Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; boolean success = false; + boolean doneByCoprocessor = false; int noOfPuts = 0; int noOfDeletes = 0; WriteEntry writeEntry = null; int cellCount = 0; /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); - MemStoreSize memstoreSize = new MemStoreSize(); - final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch(); + MemStoreSize memStoreSize = new MemStoreSize(); try { // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. int numReadyToWrite = 0; - long now = EnvironmentEdgeManager.currentTime(); while (lastIndexExclusive < batchOp.operations.length) { - if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) { + if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { lastIndexExclusive++; continue; } @@ -3212,7 +3214,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi lastIndexExclusive++; numReadyToWrite++; - if (replay) { + if (replay || getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { for (List cells : mutation.getFamilyCellMap().values()) { cellCount += cells.size(); } @@ -3220,42 +3222,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // We've now grabbed as many mutations off the list as we can - - // STEP 2. Update any LATEST_TIMESTAMP timestamps - // We should record the timestamp only after we have acquired the rowLock, - // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp - now = EnvironmentEdgeManager.currentTime(); - byte[] byteNow = Bytes.toBytes(now); - // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? if (numReadyToWrite <= 0) { return; } - for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) { - // skip invalid - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - // lastIndexExclusive was incremented above. - continue; - } + // STEP 2. Update any LATEST_TIMESTAMP timestamps + // We should record the timestamp only after we have acquired the rowLock, + // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp + long now = EnvironmentEdgeManager.currentTime(); + if (!replay) { + byte[] byteNow = Bytes.toBytes(now); + for (int i = firstIndex; i < lastIndexExclusive; i++) { + // skip invalid + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + // lastIndexExclusive was incremented above. + continue; + } - Mutation mutation = batchOp.getMutation(i); - if (mutation instanceof Put) { - updateCellTimestamps(familyMaps[i].values(), byteNow); - noOfPuts++; - } else { - prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); - noOfDeletes++; - } - rewriteCellTags(familyMaps[i], mutation); - WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; - if (fromCP != null) { - cellCount += fromCP.size(); - } - if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { - for (List cells : familyMaps[i].values()) { - cellCount += cells.size(); + Mutation mutation = batchOp.getMutation(i); + if (mutation instanceof Put) { + updateCellTimestamps(batchOp.familyCellMaps[i].values(), byteNow); + noOfPuts++; + } else { + prepareDeleteTimestamps(mutation, batchOp.familyCellMaps[i], byteNow); + noOfDeletes++; + } + rewriteCellTags(batchOp.familyCellMaps[i], mutation); + WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; + if (fromCP != null) { + cellCount += fromCP.size(); } } } @@ -3268,6 +3264,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) { + doneByCoprocessor = true; return; } else { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -3285,15 +3282,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Else Coprocessor added more Mutations corresponding to the Mutation at this index. for (int j = 0; j < cpMutations.length; j++) { Mutation cpMutation = cpMutations[j]; - Map> cpFamilyMap = cpMutation.getFamilyCellMap(); - checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now); + checkAndPrepareMutation(cpMutation, replay, now); // Acquire row locks. If not, the whole batch will fail. acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); // Returned mutations from coprocessor correspond to the Mutation at index i. We can // directly add the cells from those mutations to the familyMaps of this mutation. - mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later + Map> cpFamilyMap = cpMutation.getFamilyCellMap(); + // will get added to the memStore later + mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap); // The durability of returned mutation is replaced by the corresponding mutation. // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the @@ -3310,7 +3308,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // STEP 3. Build WAL edit walEdit = new WALEdit(cellCount, replay); - Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { @@ -3318,12 +3315,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } Mutation m = batchOp.getMutation(i); - Durability tmpDur = getEffectiveDurability(m.getDurability()); - if (tmpDur.ordinal() > durability.ordinal()) { - durability = tmpDur; - } // we use durability of the original mutation for the mutation passed by CP. - if (tmpDur == Durability.SKIP_WAL) { + if (getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) { recordMutationWithoutWal(m.getFamilyCellMap()); continue; } @@ -3348,58 +3341,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi walEdit.add(cell); } } - addFamilyMapToWALEdit(familyMaps[i], walEdit); + addFamilyMapToWALEdit(batchOp.familyCellMaps[i], walEdit); } // STEP 4. Append the final edit to WAL and sync. Mutation mutation = batchOp.getMutation(firstIndex); - WALKey walKey = null; - long txid; - if (replay) { - // use wal key from the original - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); - walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); - if (!walEdit.isEmpty()) { - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - if (txid != 0) { - sync(txid, durability); - } - } - } else { - try { - if (!walEdit.isEmpty()) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, - this.getReplicationScope()); - // TODO: Use the doAppend methods below... complicated by the replay stuff above. - txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - if (txid != 0) { - sync(txid, durability); - } - if (writeEntry == null) { - // if MVCC not preassigned, wait here until assigned - writeEntry = walKey.getWriteEntry(); - } - } - } catch (IOException ioe) { - if (walKey != null && writeEntry == null) { - // the writeEntry is not preassigned and error occurred during append or sync - mvcc.complete(walKey.getWriteEntry()); - } - throw ioe; - } - } - if (walKey == null) { - // If no walKey, then not in replay and skipping WAL or some such. Begin an MVCC transaction - // to get sequence id. + writeEntry = doWALAppend(walEdit, batchOp.durability, mutation.getClusterIds(), now, + currentNonceGroup, currentNonce, + replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID); + if (!replay && writeEntry == null) { + // If no writeEntry, then not in replay and skipping WAL or some such. Begin an MVCC + // transaction to get sequence id. writeEntry = mvcc.begin(); } - // STEP 5. Write back to memstore + // STEP 5. Write back to memStore for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; @@ -3410,14 +3366,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // we use durability of the original mutation for the mutation passed by CP. boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; if (updateSeqId) { - this.updateSequenceId(familyMaps[i].values(), + this.updateSequenceId(batchOp.familyCellMaps[i].values(), replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); } - applyFamilyMapToMemStore(familyMaps[i], memstoreSize); + applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreSize); } // update memstore size - this.addAndGetMemStoreSize(memstoreSize); + this.addAndGetMemStoreSize(memStoreSize); // calling the post CP hook for batch mutation if (!replay && coprocessorHost != null) { @@ -3436,22 +3392,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi writeEntry = null; } - // STEP 7. Release row locks, etc. + success = true; + } finally { + // Call complete rather than completeAndWait because we probably had error if walKey != null + if (writeEntry != null) mvcc.complete(writeEntry); + if (locked) { this.updatesLock.readLock().unlock(); - locked = false; } releaseRowLocks(acquiredRowLocks); - for (int i = firstIndex; i < lastIndexExclusive; i ++) { + for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + batchOp.retCodeDetails[i] = + success || doneByCoprocessor ? OperationStatus.SUCCESS : OperationStatus.FAILURE; } } - // STEP 8. Run coprocessor post hooks. This should be done after the wal is // synced so that the coprocessor contract is adhered to. - if (!replay && coprocessorHost != null) { + if (!replay && coprocessorHost != null && !doneByCoprocessor) { for (int i = firstIndex; i < lastIndexExclusive; i++) { // only for successful puts if (batchOp.retCodeDetails[i].getOperationStatusCode() @@ -3467,15 +3426,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - success = true; - } finally { - // Call complete rather than completeAndWait because we probably had error if walKey != null - if (writeEntry != null) mvcc.complete(writeEntry); - if (locked) { - this.updatesLock.readLock().unlock(); - } - releaseRowLocks(acquiredRowLocks); - // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. // null will be treated as unknown. @@ -3494,13 +3444,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion.updateDelete(); } } - if (!success) { - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { - batchOp.retCodeDetails[i] = OperationStatus.FAILURE; - } - } - } + if (coprocessorHost != null && !batchOp.isInReplay()) { // call the coprocessor hook to do any finalization steps // after the put is done @@ -3539,75 +3483,73 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.mvcc.complete(walKey.getWriteEntry()); } - private boolean checkBatchOp(BatchOperation batchOp, final int lastIndexExclusive, - final Map>[] familyMaps, final long now, - final ObservedExceptionsInBatch observedExceptions) - throws IOException { - boolean skip = false; + private boolean checkAndPrepareBatchOp(BatchOperation batchOp, final int index, + final long now) throws IOException { + boolean skip = true; // Skip anything that "ran" already - if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - return true; - } - Mutation mutation = batchOp.getMutation(lastIndexExclusive); - Map> familyMap = mutation.getFamilyCellMap(); - // store the family map reference to allow for mutations - familyMaps[lastIndexExclusive] = familyMap; + if (batchOp.retCodeDetails[index].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { + Mutation mutation = batchOp.getMutation(index); - try { - checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now); - } catch (NoSuchColumnFamilyException nscf) { - final String msg = "No such column family in batch mutation. "; - if (observedExceptions.hasSeenNoSuchFamily()) { - LOG.warn(msg + nscf.getMessage()); - } else { - LOG.warn(msg, nscf); - observedExceptions.sawNoSuchFamily(); - } - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.BAD_FAMILY, nscf.getMessage()); - skip = true; - } catch (FailedSanityCheckException fsce) { - final String msg = "Batch Mutation did not pass sanity check. "; - if (observedExceptions.hasSeenFailedSanityCheck()) { - LOG.warn(msg + fsce.getMessage()); - } else { - LOG.warn(msg, fsce); - observedExceptions.sawFailedSanityCheck(); - } - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); - skip = true; - } catch (WrongRegionException we) { - final String msg = "Batch mutation had a row that does not belong to this region. "; - if (observedExceptions.hasSeenWrongRegion()) { - LOG.warn(msg + we.getMessage()); - } else { - LOG.warn(msg, we); - observedExceptions.sawWrongRegion(); + try { + checkAndPrepareMutation(mutation, batchOp.isInReplay(), now); + skip = false; + + // store the family map reference to allow for mutations + batchOp.familyCellMaps[index] = mutation.getFamilyCellMap(); + // store durability for the batch (highest durability of all operations in the batch) + Durability tmpDur = getEffectiveDurability(mutation.getDurability()); + if (tmpDur.ordinal() > batchOp.durability.ordinal()) { + batchOp.durability = tmpDur; + } + } catch (NoSuchColumnFamilyException nscf) { + final String msg = "No such column family in batch mutation. "; + if (batchOp.observedExceptions.hasSeenNoSuchFamily()) { + LOG.warn(msg + nscf.getMessage()); + } else { + LOG.warn(msg, nscf); + batchOp.observedExceptions.sawNoSuchFamily(); + } + batchOp.retCodeDetails[index] = new OperationStatus( + OperationStatusCode.BAD_FAMILY, nscf.getMessage()); + } catch (FailedSanityCheckException fsce) { + final String msg = "Batch Mutation did not pass sanity check. "; + if (batchOp.observedExceptions.hasSeenFailedSanityCheck()) { + LOG.warn(msg + fsce.getMessage()); + } else { + LOG.warn(msg, fsce); + batchOp.observedExceptions.sawFailedSanityCheck(); + } + batchOp.retCodeDetails[index] = new OperationStatus( + OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); + } catch (WrongRegionException we) { + final String msg = "Batch mutation had a row that does not belong to this region. "; + if (batchOp.observedExceptions.hasSeenWrongRegion()) { + LOG.warn(msg + we.getMessage()); + } else { + LOG.warn(msg, we); + batchOp.observedExceptions.sawWrongRegion(); + } + batchOp.retCodeDetails[index] = new OperationStatus( + OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); } - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); - skip = true; } return skip; } - private void checkAndPrepareMutation(Mutation mutation, boolean replay, - final Map> familyMap, final long now) - throws IOException { + private void checkAndPrepareMutation(Mutation mutation, boolean replay, final long now) + throws IOException { + checkRow(mutation.getRow(), "doMiniBatchMutation"); if (mutation instanceof Put) { // Check the families in the put. If bad, skip this one. if (replay) { - removeNonExistentColumnFamilyForReplay(familyMap); + removeNonExistentColumnFamilyForReplay(mutation.getFamilyCellMap()); } else { - checkFamilies(familyMap.keySet()); + checkFamilies(mutation.getFamilyCellMap().keySet()); } checkTimestamps(mutation.getFamilyCellMap(), now); } else { prepareDelete((Delete)mutation); } - checkRow(mutation.getRow(), "doMiniBatchMutation"); } /** @@ -3638,7 +3580,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * the table descriptor. */ protected Durability getEffectiveDurability(Durability d) { - return d == Durability.USE_DEFAULT ? this.durability : d; + return d == Durability.USE_DEFAULT ? this.regionDurability : d; } @Override @@ -7282,28 +7224,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi nonceGroup, nonce); } + private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, + long now, long nonceGroup, long nonce) throws IOException { + return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce, + WALKey.NO_SEQUENCE_ID); + } + /** * @return writeEntry associated with this append */ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, - long now, long nonceGroup, long nonce) - throws IOException { + long now, long nonceGroup, long nonce, long replaySeqId) throws IOException { + Preconditions.checkArgument(!walEdit.isReplay() || replaySeqId != WALKey.NO_SEQUENCE_ID, + "Invalid replay sequence Id for replay WALEdit!"); WriteEntry writeEntry = null; - // Using default cluster id, as this can only happen in the originating cluster. - // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey - // here instead of WALKey directly to support legacy coprocessors. - WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, - nonceGroup, nonce, mvcc, this.getReplicationScope()); - try { - long txid = - this.wal.append(this.getRegionInfo(), walKey, walEdit, true); - // Call sync on our edit. - if (txid != 0) sync(txid, durability); - writeEntry = walKey.getWriteEntry(); - } catch (IOException ioe) { - if (walKey != null) mvcc.complete(walKey.getWriteEntry()); - throw ioe; + if (!walEdit.isEmpty()) { + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, + nonce, mvcc) : + new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, + nonceGroup, nonce, mvcc, this.getReplicationScope()); + if (walEdit.isReplay()) { + walKey.setOrigLogSeqNum(replaySeqId); + } + try { + long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); + // Call sync on our edit. + if (txid != 0) sync(txid, durability); + writeEntry = walKey.getWriteEntry(); + } catch (IOException ioe) { + if (walKey != null) mvcc.complete(walKey.getWriteEntry()); + throw ioe; + } } return writeEntry; } @@ -7672,13 +7628,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * Give the region a chance to prepare before it is split. - */ - protected void prepareToSplit() { - // nothing - } - - /** * Return the splitpoint. null indicates the region isn't splittable * If the splitpoint isn't explicitly specified, it will go over the stores * to find the best splitpoint. Currently the criteria of best splitpoint @@ -7942,7 +7891,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Check whether we should sync the wal from the table's durability settings */ private boolean shouldSyncWAL() { - return durability.ordinal() > Durability.ASYNC_WAL.ordinal(); + return regionDurability.ordinal() > Durability.ASYNC_WAL.ordinal(); } /** -- 2.10.1 (Apple Git-78)