diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index fe9745e..b910265 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -271,6 +271,7 @@ public class Scan extends Query { this.getScan = true; this.asyncPrefetch = false; this.consistency = get.getConsistency(); + this.setIsolationLevel(get.getIsolationLevel()); for (Map.Entry attr : get.getAttributesMap().entrySet()) { setAttribute(attr.getKey(), attr.getValue()); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java index 4b9f113..c38340d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java @@ -30,7 +30,7 @@ import org.junit.experimental.categories.Category; @Category({ClientTests.class, SmallTests.class}) public class TestIncrement { @Test - public void test() { + public void testIncrementInstance() { final long expected = 13; Increment inc = new Increment(new byte [] {'r'}); int total = 0; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java index 1d55baa..c6698f5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java @@ -75,4 +75,4 @@ public interface Tag { * @return The {@link java.nio.ByteBuffer} containing the value bytes. */ ByteBuffer getValueByteBuffer(); -} +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java index 15ddfc8..d30fe33 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagUtil.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -104,7 +105,7 @@ public final class TagUtil { * @return the serialized tag data as bytes */ public static byte[] fromList(List tags) { - if (tags.isEmpty()) { + if (tags == null || tags.isEmpty()) { return HConstants.EMPTY_BYTE_ARRAY; } int length = 0; @@ -216,4 +217,43 @@ public final class TagUtil { } return StreamUtils.readRawVarint32(tag.getValueByteBuffer(), offset); } + + /** + * @return A List of any Tags found in cell else null. + */ + public static List carryForwardTags(final Cell cell) { + return carryForwardTags(null, cell); + } + + /** + * @return Add to tagsOrNull any Tags cell is carrying or null if + * it is carrying no Tags AND the passed in tagsOrNull is null (else we return new + * List with Tags found). + */ + public static List carryForwardTags(final List tagsOrNull, final Cell cell) { + List tags = tagsOrNull; + if (cell.getTagsLength() <= 0) return tags; + Iterator itr = + CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); + if (tags == null) tags = new ArrayList(); + while (itr.hasNext()) { + tags.add(itr.next()); + } + return tags; + } + + + /** + * @return Carry forward the TTL tag. + */ + public static List carryForwardTTLTag(final List tagsOrNull, final long ttl) { + if (ttl == Long.MAX_VALUE) return tagsOrNull; + List tags = tagsOrNull; + // If we are making the array in here, given we are the last thing checked, we'll be only thing + // in the array so set its size to '1' (I saw this being done in earlier version of + // tag-handling). + if (tags == null) tags = new ArrayList(1); + tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + return tags; + } } \ No newline at end of file diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index 420799f..ae07bae 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -232,6 +232,6 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver { return null; } return new StoreScanner(store, scanInfo, scan, targetCols, - ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); + ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e553fcc..8055d67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -69,7 +70,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellScanner; @@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; @@ -94,7 +93,6 @@ import org.apache.hadoop.hbase.ShareableMemory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagRewriteCell; -import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; @@ -113,7 +111,7 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; @@ -124,8 +122,6 @@ import org.apache.hadoop.hbase.filter.FilterWrapper; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; import org.apache.hadoop.hbase.ipc.RpcCallContext; @@ -149,6 +145,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger; @@ -157,7 +154,6 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputCont import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; @@ -171,7 +167,6 @@ import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; @@ -1192,52 +1187,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Reset recovering state of current region */ public void setRecovering(boolean newState) { - boolean wasRecovering = this.recovering; - // before we flip the recovering switch (enabling reads) we should write the region open - // event to WAL if needed - if (wal != null && getRegionServerServices() != null && !writestate.readOnly - && wasRecovering && !newState) { - - // force a flush only if region replication is set up for this region. Otherwise no need. - boolean forceFlush = getTableDesc().getRegionReplication() > 1; - - // force a flush first - MonitoredTask status = TaskMonitor.get().createStatus( - "Flushing region " + this + " because recovery is finished"); - try { - if (forceFlush) { - internalFlushcache(status); - } - - status.setStatus("Writing region open event marker to WAL because recovery is finished"); - try { - long seqId = openSeqNum; - // obtain a new seqId because we possibly have writes and flushes on top of openSeqNum - if (wal != null) { - seqId = getNextSequenceId(wal); - } - writeRegionOpenMarker(wal, seqId); - } catch (IOException e) { - // We cannot rethrow this exception since we are being called from the zk thread. The - // region has already opened. In this case we log the error, but continue - LOG.warn(getRegionInfo().getEncodedName() + " : was not able to write region opening " - + "event to WAL, continueing", e); - } - } catch (IOException ioe) { - // Distributed log replay semantics does not necessarily require a flush, since the replayed - // data is already written again in the WAL. So failed flush should be fine. - LOG.warn(getRegionInfo().getEncodedName() + " : was not able to flush " - + "event to WAL, continueing", ioe); - } finally { - status.cleanup(); - } - } - - this.recovering = newState; - if (wasRecovering && !recovering) { - // Call only when wal replay is over. - coprocessorHost.postLogReplay(); - } + // NOOP. DLR is no longer supported. } @Override @@ -1279,28 +1229,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - public MultiVersionConcurrencyControl getMVCC() { - return mvcc; - } + @VisibleForTesting + public MultiVersionConcurrencyControl getMVCC() { + return mvcc; + } - @Override - public long getMaxFlushedSeqId() { - return maxFlushedSeqId; - } + @Override + public long getMaxFlushedSeqId() { + return maxFlushedSeqId; + } - @Override - public long getReadpoint(IsolationLevel isolationLevel) { - if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { - // This scan can read even uncommitted transactions - return Long.MAX_VALUE; - } - return mvcc.getReadPoint(); - } + @Override + public long getReadPoint(IsolationLevel isolationLevel) { + if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { + // This scan can read even uncommitted transactions + return Long.MAX_VALUE; + } + return mvcc.getReadPoint(); + } - @Override - public boolean isLoadingCfsOnDemandDefault() { - return this.isLoadingCfsOnDemandDefault; - } + @Override + public boolean isLoadingCfsOnDemandDefault() { + return this.isLoadingCfsOnDemandDefault; + } /** * Close down this HRegion. Flush the cache, shut down each HStore, don't @@ -1819,27 +1770,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * We are trying to remove / relax the region read lock for compaction. * Let's see what are the potential race conditions among the operations (user scan, * region split, region close and region bulk load). - * + * * user scan ---> region read lock * region split --> region close first --> region write lock * region close --> region write lock * region bulk load --> region write lock - * + * * read lock is compatible with read lock. ---> no problem with user scan/read * region bulk load does not cause problem for compaction (no consistency problem, store lock * will help the store file accounting). * They can run almost concurrently at the region level. - * + * * The only remaining race condition is between the region close and compaction. * So we will evaluate, below, how region close intervenes with compaction if compaction does * not acquire region read lock. - * + * * Here are the steps for compaction: * 1. obtain list of StoreFile's * 2. create StoreFileScanner's based on list from #1 * 3. perform compaction and save resulting files under tmp dir * 4. swap in compacted files - * + * * #1 is guarded by store lock. This patch does not change this --> no worse or better * For #2, we obtain smallest read point (for region) across all the Scanners (for both default * compactor and stripe compactor). @@ -1851,7 +1802,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * This will not conflict with compaction. * For #3, it can be performed in parallel to other operations. * For #4 bulk load and compaction don't conflict with each other on the region level - * (for multi-family atomicy). + * (for multi-family atomicy). * Region close and compaction are guarded pretty well by the 'writestate'. * In HRegion#doClose(), we have : * synchronized (writestate) { @@ -2192,7 +2143,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.memstoreSize.get() <= 0) { // Take an update lock because am about to change the sequence id and we want the sequence id // to be at the border of the empty memstore. - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; + WriteEntry writeEntry = null; this.updatesLock.writeLock().lock(); try { if (this.memstoreSize.get() <= 0) { @@ -2282,7 +2233,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); long trxId = 0; - MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin(); + WriteEntry writeEntry = mvcc.begin(); try { try { if (wal != null) { @@ -2362,14 +2313,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - // wait for all in-progress transactions to commit to WAL before - // we can start the flush. This prevents - // uncommitted transactions from being written into HFiles. - // We have to block before we start the flush, otherwise keys that - // were removed via a rollbackMemstore could be written to Hfiles. + // Wait for all in-progress transactions to commit to WAL before we can start the flush. This + // prevents uncommitted transactions from being written into HFiles. mvcc.completeAndWait(writeEntry); - // set writeEntry to null to prevent mvcc.complete from being called again inside finally - // block + // Set writeEntry to null to prevent mvcc.complete from being called again inside finally writeEntry = null; } finally { if (writeEntry != null) { @@ -2970,39 +2917,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @SuppressWarnings("unchecked") private long doMiniBatchMutation(BatchOperationInProgress batchOp) throws IOException { - boolean isInReplay = batchOp.isInReplay(); - // variable to note if all Put items are for the same CF -- metrics related + // Variable to note if all Put items are for the same CF -- metrics related boolean putsCfSetConsistent = true; - //The set of columnFamilies first seen for Put. - Set putsCfSet = null; - // variable to note if all Delete items are for the same CF -- metrics related + // Variable to note if all Delete items are for the same CF -- metrics related boolean deletesCfSetConsistent = true; - //The set of columnFamilies first seen for Delete. + // The set of columnFamilies first seen for Put. + Set putsCfSet = null; + // The set of columnFamilies first seen for Delete. Set deletesCfSet = null; - - long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; - WALEdit walEdit = new WALEdit(isInReplay); - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; - long txid = 0; - boolean doRollBackMemstore = false; + long currentNonceGroup = HConstants.NO_NONCE; + long currentNonce = HConstants.NO_NONCE; + WALEdit walEdit = new WALEdit(false); boolean locked = false; - - /** Keep track of the locks we hold so we can release them in finally clause */ - List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; boolean success = false; - int noOfPuts = 0, noOfDeletes = 0; + int noOfPuts = 0; + int noOfDeletes = 0; WALKey walKey = null; long mvccNum = 0; + /** Keep track of the locks we hold so we can release them in finally clause */ + List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); try { - // ------------------------------------ - // STEP 1. Try to acquire as many locks as we can, and ensure - // we acquire at least one. - // ---------------------------------- + // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. int numReadyToWrite = 0; long now = EnvironmentEdgeManager.currentTime(); while (lastIndexExclusive < batchOp.operations.length) { @@ -3023,14 +2963,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { if (isPutMutation) { // Check the families in the put. If bad, skip this one. - if (isInReplay) { - removeNonExistentColumnFamilyForReplay(familyMap); - } else { - checkFamilies(familyMap.keySet()); - } + checkFamilies(familyMap.keySet()); checkTimestamps(mutation.getFamilyCellMap(), now); } else { - prepareDelete((Delete) mutation); + prepareDelete((Delete)mutation); } checkRow(mutation.getRow(), "doMiniBatchMutation"); } catch (NoSuchColumnFamilyException nscf) { @@ -3053,8 +2989,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi continue; } - // If we haven't got any rows in our batch, we should block to - // get the next one. + // If we haven't got any rows in our batch, we should block to get the next one. RowLock rowLock = null; try { rowLock = getRowLock(mutation.getRow(), true); @@ -3074,7 +3009,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (isPutMutation) { // If Column Families stay consistent through out all of the - // individual puts then metrics can be reported as a mutliput across + // individual puts then metrics can be reported as a multiput across // column families in the first put. if (putsCfSet == null) { putsCfSet = mutation.getFamilyCellMap().keySet(); @@ -3092,7 +3027,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - // we should record the timestamp only after we have acquired the rowLock, + // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp now = EnvironmentEdgeManager.currentTime(); byte[] byteNow = Bytes.toBytes(now); @@ -3102,10 +3037,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // We've now grabbed as many mutations off the list as we can - // ------------------------------------ // STEP 2. Update any LATEST_TIMESTAMP timestamps - // ---------------------------------- - for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) { + for (int i = firstIndex; i < lastIndexExclusive; i++) { // skip invalid if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) continue; @@ -3125,16 +3058,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi locked = true; // calling the pre CP hook for batch mutation - if (!isInReplay && coprocessorHost != null) { + if (coprocessorHost != null) { MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; } - // ------------------------------------ // STEP 3. Build WAL edit - // ---------------------------------- Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing @@ -3153,25 +3084,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i); - // In replay, the batch may contain multiple nonces. If so, write WALEdit for each. - // Given how nonces are originally written, these should be contiguous. - // They don't have to be, it will still work, just write more WALEdits than needed. if (nonceGroup != currentNonceGroup || nonce != currentNonce) { - if (walEdit.size() > 0) { - assert isInReplay; - if (!isInReplay) { - throw new IOException("Multiple nonces per batch and not in replay"); - } - // txid should always increase, so having the one from the last call is ok. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, m.getClusterIds(), - currentNonceGroup, currentNonce, mvcc); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, - walEdit, true); - walEdit = new WALEdit(isInReplay); - walKey = null; - } + if (walEdit.size() > 0) throw new UnexpectedStateException("Only if DLR?"); currentNonceGroup = nonceGroup; currentNonce = nonce; } @@ -3186,107 +3100,56 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi addFamilyMapToWALEdit(familyMaps[i], walEdit); } - // ------------------------- - // STEP 4. Append the final edit to WAL. Do not sync wal. - // ------------------------- + // STEP 4. Append the final edit to WAL and sync. + // The WALEdit starts the mvcc transaction when it gets assigned its sequenceid. Mutation mutation = batchOp.getMutation(firstIndex); - if (isInReplay) { - // use wal key from the original - walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); - long replaySeqId = batchOp.getReplaySequenceId(); - walKey.setOrigLogSeqNum(replaySeqId); - } - if (walEdit.size() > 0) { - if (!isInReplay) { + long txid = 0; + if (!walEdit.isEmpty()) { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); - } txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); } - // ------------------------------------ - // Acquire the latest mvcc number - // ---------------------------------- - if (walKey == null) { - // If this is a skip wal operation just get the read point from mvcc - walKey = this.appendEmptyEdit(this.wal); - } - if (!isInReplay) { - writeEntry = walKey.getWriteEntry(); - mvccNum = writeEntry.getWriteNumber(); - } else { - mvccNum = batchOp.getReplaySequenceId(); - } + if (walKey == null) walKey = this.appendEmptyEdit(this.wal); + if (txid != 0) sync(txid, durability); - // ------------------------------------ // STEP 5. Write back to memstore - // Write to memstore. It is ok to write to memstore - // first without syncing the WAL because we do not roll - // forward the memstore MVCC. The MVCC will be moved up when - // the complete operation is done. These changes are not yet - // visible to scanners till we update the MVCC. The MVCC is - // moved only when the sync is complete. - // ---------------------------------- long addedSize = 0; for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; } - doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay); + addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum); } - // ------------------------------- - // STEP 6. Release row locks, etc. - // ------------------------------- + // STEP 6. Complete mvcc. + WALKey.completeMvcc(walKey); + + // STEP 7. Release row locks, etc. if (locked) { this.updatesLock.readLock().unlock(); locked = false; } releaseRowLocks(acquiredRowLocks); - // ------------------------- - // STEP 7. Sync wal. - // ------------------------- - if (txid != 0) { - syncOrDefer(txid, durability); - } - - doRollBackMemstore = false; // calling the post CP hook for batch mutation - if (!isInReplay && coprocessorHost != null) { + if (coprocessorHost != null) { MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } - // ------------------------------------------------------------------ - // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. - // ------------------------------------------------------------------ - if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - writeEntry = null; - } else if (isInReplay) { - // ensure that the sequence id of the region is at least as big as orig log seq id - mvcc.advanceTo(mvccNum); - } - for (int i = firstIndex; i < lastIndexExclusive; i ++) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; } } - // ------------------------------------ - // STEP 9. Run coprocessor post hooks. This should be done after the wal is + // STEP 8. Run coprocessor post hooks. This should be done after the wal is // synced so that the coprocessor contract is adhered to. - // ------------------------------------ - if (!isInReplay && coprocessorHost != null) { + if (coprocessorHost != null) { for (int i = firstIndex; i < lastIndexExclusive; i++) { // only for successful puts if (batchOp.retCodeDetails[i].getOperationStatusCode() @@ -3305,18 +3168,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi success = true; return addedSize; } finally { - // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - for (int j = 0; j < familyMaps.length; j++) { - for(List cells:familyMaps[j].values()) { - rollbackMemstore(cells); - } - } - if (writeEntry != null) mvcc.complete(writeEntry); - } else if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - } - + WALKey.completeMvcc(walKey); if (locked) { this.updatesLock.readLock().unlock(); } @@ -3369,93 +3221,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return d == Durability.USE_DEFAULT ? this.durability : d; } - //TODO, Think that gets/puts and deletes should be refactored a bit so that - //the getting of the lock happens before, so that you would just pass it into - //the methods. So in the case of checkAndMutate you could just do lockRow, - //get, put, unlockRow or something - @Override public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOp compareOp, ByteArrayComparable comparator, Mutation w, + CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) throws IOException{ + checkMutationType(mutation, row); + return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, null, + mutation, writeToWAL); + } + + @Override + public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, + CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm, + boolean writeToWAL) + throws IOException { + return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, rm, null, + writeToWAL); + } + + /** + * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has + * switches in the few places where there is deviation. + */ + private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier, + CompareOp compareOp, ByteArrayComparable comparator, RowMutations rowMutations, + Mutation mutation, boolean writeToWAL) + throws IOException { + // Could do the below checks but seems wacky with two callers only. Just comment out for now. + // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't + // need these commented out checks. + // if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null"); + // if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set"); checkReadOnly(); - //TODO, add check for value length or maybe even better move this to the - //client if this becomes a global setting + // TODO, add check for value length also move this check to the client checkResources(); - boolean isPut = w instanceof Put; - if (!isPut && !(w instanceof Delete)) - throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " + - "be Put or Delete"); - if (!Bytes.equals(row, w.getRow())) { - throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " + - "getRow must match the passed row"); - } - startRegionOperation(); try { Get get = new Get(row); checkFamily(family); get.addColumn(family, qualifier); - // Lock row - note that doBatchMutate will relock this row if called RowLock rowLock = getRowLock(get.getRow()); - // wait for all previous transactions to complete (with lock held) - mvcc.await(); try { - if (this.getCoprocessorHost() != null) { + if (mutation != null && this.getCoprocessorHost() != null) { + // Call coprocessor. Boolean processed = null; - if (w instanceof Put) { + if (mutation instanceof Put) { processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, - qualifier, compareOp, comparator, (Put) w); - } else if (w instanceof Delete) { + qualifier, compareOp, comparator, (Put)mutation); + } else if (mutation instanceof Delete) { processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family, - qualifier, compareOp, comparator, (Delete) w); - } - if (processed != null) { - return processed; + qualifier, compareOp, comparator, (Delete)mutation); } + if (processed != null) return processed; } + // NOTE: We used to wait here until mvcc caught up: mvcc.await(); + // Supposition is that now all changes are done under row locks, then when we go to read, + // we'll get the latest on this row. List result = get(get, false); - - boolean valueIsNull = comparator.getValue() == null || - comparator.getValue().length == 0; + boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; boolean matches = false; long cellTs = 0; if (result.size() == 0 && valueIsNull) { matches = true; - } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && - valueIsNull) { + } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { matches = true; cellTs = result.get(0).getTimestamp(); } else if (result.size() == 1 && !valueIsNull) { Cell kv = result.get(0); cellTs = kv.getTimestamp(); int compareResult = CellComparator.compareValue(kv, comparator); - switch (compareOp) { - case LESS: - matches = compareResult < 0; - break; - case LESS_OR_EQUAL: - matches = compareResult <= 0; - break; - case EQUAL: - matches = compareResult == 0; - break; - case NOT_EQUAL: - matches = compareResult != 0; - break; - case GREATER_OR_EQUAL: - matches = compareResult >= 0; - break; - case GREATER: - matches = compareResult > 0; - break; - default: - throw new RuntimeException("Unknown Compare op " + compareOp.name()); - } + matches = matches(compareOp, compareResult); } - //If matches put the new put or delete the new delete + // If matches put the new put or delete the new delete if (matches) { // We have acquired the row lock already. If the system clock is NOT monotonically // non-decreasing (see HBASE-14070) we should make sure that the mutation has a @@ -3464,16 +3303,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long now = EnvironmentEdgeManager.currentTime(); long ts = Math.max(now, cellTs); // ensure write is not eclipsed byte[] byteTs = Bytes.toBytes(ts); - - if (w instanceof Put) { - updateCellTimestamps(w.getFamilyCellMap().values(), byteTs); + if (mutation != null) { + if (mutation instanceof Put) { + updateCellTimestamps(mutation.getFamilyCellMap().values(), byteTs); + } + // And else 'delete' is not needed since it already does a second get, and sets the + // timestamp from get (see prepareDeleteTimestamps). + } else { + for (Mutation m: rowMutations.getMutations()) { + if (m instanceof Put) updateCellTimestamps(m.getFamilyCellMap().values(), byteTs); + } + // And else 'delete' is not needed since it already does a second get, and sets the + // timestamp from get (see prepareDeleteTimestamps). } - // else delete is not needed since it already does a second get, and sets the timestamp - // from get (see prepareDeleteTimestamps). - - // All edits for the given row (across all column families) must - // happen atomically. - doBatchMutate(w); + // All edits for the given row (across all column families) must happen atomically. + if (mutation != null) doBatchMutate(mutation); + else mutateRow(rowMutations); this.checkAndMutateChecksPassed.increment(); return true; } @@ -3487,104 +3332,45 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - //TODO, Think that gets/puts and deletes should be refactored a bit so that - //the getting of the lock happens before, so that you would just pass it into - //the methods. So in the case of checkAndMutate you could just do lockRow, - //get, put, unlockRow or something - - @Override - public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm, - boolean writeToWAL) throws IOException { - checkReadOnly(); - //TODO, add check for value length or maybe even better move this to the - //client if this becomes a global setting - checkResources(); - - startRegionOperation(); - try { - Get get = new Get(row); - checkFamily(family); - get.addColumn(family, qualifier); - - // Lock row - note that doBatchMutate will relock this row if called - RowLock rowLock = getRowLock(get.getRow()); - // wait for all previous transactions to complete (with lock held) - mvcc.await(); - try { - List result = get(get, false); - - boolean valueIsNull = comparator.getValue() == null || - comparator.getValue().length == 0; - boolean matches = false; - long cellTs = 0; - if (result.size() == 0 && valueIsNull) { - matches = true; - } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && - valueIsNull) { - matches = true; - cellTs = result.get(0).getTimestamp(); - } else if (result.size() == 1 && !valueIsNull) { - Cell kv = result.get(0); - cellTs = kv.getTimestamp(); - int compareResult = CellComparator.compareValue(kv, comparator); - switch (compareOp) { - case LESS: - matches = compareResult < 0; - break; - case LESS_OR_EQUAL: - matches = compareResult <= 0; - break; - case EQUAL: - matches = compareResult == 0; - break; - case NOT_EQUAL: - matches = compareResult != 0; - break; - case GREATER_OR_EQUAL: - matches = compareResult >= 0; - break; - case GREATER: - matches = compareResult > 0; - break; - default: - throw new RuntimeException("Unknown Compare op " + compareOp.name()); - } - } - //If matches put the new put or delete the new delete - if (matches) { - // We have acquired the row lock already. If the system clock is NOT monotonically - // non-decreasing (see HBASE-14070) we should make sure that the mutation has a - // larger timestamp than what was observed via Get. doBatchMutate already does this, but - // there is no way to pass the cellTs. See HBASE-14054. - long now = EnvironmentEdgeManager.currentTime(); - long ts = Math.max(now, cellTs); // ensure write is not eclipsed - byte[] byteTs = Bytes.toBytes(ts); - - for (Mutation w : rm.getMutations()) { - if (w instanceof Put) { - updateCellTimestamps(w.getFamilyCellMap().values(), byteTs); - } - // else delete is not needed since it already does a second get, and sets the timestamp - // from get (see prepareDeleteTimestamps). - } + private void checkMutationType(final Mutation mutation, final byte [] row) + throws DoNotRetryIOException { + boolean isPut = mutation instanceof Put; + if (!isPut && !(mutation instanceof Delete)) { + throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must be Put or Delete"); + } + if (!Bytes.equals(row, mutation.getRow())) { + throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's getRow must match"); + } + } - // All edits for the given row (across all column families) must - // happen atomically. - mutateRow(rm); - this.checkAndMutateChecksPassed.increment(); - return true; - } - this.checkAndMutateChecksFailed.increment(); - return false; - } finally { - rowLock.release(); - } - } finally { - closeRegionOperation(); + private boolean matches(final CompareOp compareOp, final int compareResult) { + boolean matches = false; + switch (compareOp) { + case LESS: + matches = compareResult < 0; + break; + case LESS_OR_EQUAL: + matches = compareResult <= 0; + break; + case EQUAL: + matches = compareResult == 0; + break; + case NOT_EQUAL: + matches = compareResult != 0; + break; + case GREATER_OR_EQUAL: + matches = compareResult >= 0; + break; + case GREATER: + matches = compareResult > 0; + break; + default: + throw new RuntimeException("Unknown Compare op " + compareOp.name()); } + return matches; } + private void doBatchMutate(Mutation mutation) throws IOException { // Currently this is only called for puts and deletes, so no nonces. OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}); @@ -3655,40 +3441,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi void rewriteCellTags(Map> familyMap, final Mutation m) { // Check if we have any work to do and early out otherwise // Update these checks as more logic is added here - if (m.getTTL() == Long.MAX_VALUE) { return; } // From this point we know we have some work to do - for (Map.Entry> e: familyMap.entrySet()) { List cells = e.getValue(); assert cells instanceof RandomAccess; int listSize = cells.size(); for (int i = 0; i < listSize; i++) { Cell cell = cells.get(i); - List newTags = new ArrayList(); - Iterator tagIterator = CellUtil.tagsIterator(cell); - - // Carry forward existing tags - - while (tagIterator.hasNext()) { - - // Add any filters or tag specific rewrites here - - newTags.add(tagIterator.next()); - } - - // Cell TTL handling - - // Check again if we need to add a cell TTL because early out logic - // above may change when there are more tag based features in core. - if (m.getTTL() != Long.MAX_VALUE) { - // Add a cell TTL tag - newTags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL()))); - } - + List newTags = TagUtil.carryForwardTags(null, cell); + newTags = TagUtil.carryForwardTTLTag(newTags, m.getTTL()); // Rewrite the cell with the updated set of tags cells.set(i, new TagRewriteCell(cell, TagUtil.fromList(newTags))); } @@ -3770,8 +3535,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return the additional memory usage of the memstore caused by the * new entries. */ - private long applyFamilyMapToMemstore(Map> familyMap, - long mvccNum, boolean isInReplay) throws IOException { + private long applyFamilyMapToMemstore(Map> familyMap, long mvccNum) + throws IOException { long size = 0; for (Map.Entry> e : familyMap.entrySet()) { @@ -3782,7 +3547,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi int listSize = cells.size(); for (int i=0; i < listSize; i++) { Cell cell = cells.get(i); - if (cell.getSequenceId() == 0 || isInReplay) { + if (cell.getSequenceId() == 0) { CellUtil.setSequenceId(cell, mvccNum); } size += store.add(cell); @@ -5578,7 +5343,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { - this.readPt = getReadpoint(isolationLevel); + this.readPt = getReadPoint(isolationLevel); scannerReadPoints.put(this, this.readPt); } @@ -5742,7 +5507,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // As the data is obtained from two independent heaps, we need to // ensure that result list is sorted, because Result relies on that. - Collections.sort(results, comparator); + sort(results, comparator); return moreValues; } @@ -6855,7 +6620,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void processRowsWithLocks(RowProcessor processor, long timeout, long nonceGroup, long nonce) throws IOException { - for (byte[] row : processor.getRowsToLock()) { checkRow(row, "processRowsWithLocks"); } @@ -6863,23 +6627,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi checkReadOnly(); } checkResources(); - startRegionOperation(); WALEdit walEdit = new WALEdit(); - // 1. Run pre-process hook - try { - processor.preProcess(this, walEdit); - } catch (IOException e) { - closeRegionOperation(); - throw e; - } + // STEP 1. Run pre-process hook + preProcess(processor, walEdit); // Short circuit the read only case if (processor.readOnly()) { try { long now = EnvironmentEdgeManager.currentTime(); - doProcessRowWithTimeout( - processor, now, this, null, null, timeout); + doProcessRowWithTimeout(processor, now, this, null, null, timeout); processor.postProcess(this, walEdit, true); } finally { closeRegionOperation(); @@ -6887,7 +6644,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return; } - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; + WriteEntry writeEntry = null; boolean locked; boolean walSyncSuccessful = false; List acquiredRowLocks; @@ -6897,31 +6654,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long mvccNum = 0; WALKey walKey = null; try { - // 2. Acquire the row lock(s) + // STEP 2. Acquire the row lock(s) acquiredRowLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out // use a writer lock for mixed reads and writes acquiredRowLocks.add(getRowLock(row)); } - // 3. Region lock + // STEP 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); locked = true; long now = EnvironmentEdgeManager.currentTime(); try { - // 4. Let the processor scan the rows, generate mutations and add - // waledits - doProcessRowWithTimeout( - processor, now, this, mutations, walEdit, timeout); - + // STEP 4. Let the processor scan the rows, generate mutations and add waledits + doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { - // 5. Call the preBatchMutate hook + // STEP 5. Call the preBatchMutate hook processor.preBatchMutate(this, walEdit); + // STEP 6. Append no sync long txid = 0; - // 6. Append no sync if (!walEdit.isEmpty()) { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), @@ -6930,23 +6684,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, false); } - if(walKey == null){ - // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit - // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId - walKey = this.appendEmptyEdit(this.wal); - } - - // 7. Start mvcc transaction - writeEntry = walKey.getWriteEntry(); - mvccNum = walKey.getSequenceId(); - - + if (walKey == null) walKey = this.appendEmptyEdit(this.wal); + if (txid != 0) sync(txid, getEffectiveDurability(processor.useDurability())); + walSyncSuccessful = true; - // 8. Apply to memstore + // STEP 7. Apply to memstore for (Mutation m : mutations) { // Handle any tag based cell features rewriteCellTags(m.getFamilyCellMap(), m); - for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { Cell cell = cellScanner.current(); CellUtil.setSequenceId(cell, mvccNum); @@ -6958,22 +6703,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi addedSize += store.add(cell); } } + // STEP 8. Complete mvcc. + WALKey.completeMvcc(walKey); - // 9. Release region lock + // STEP 9. Release region lock if (locked) { this.updatesLock.readLock().unlock(); locked = false; } - // 10. Release row lock(s) + // STEP 10. Release row lock(s) releaseRowLocks(acquiredRowLocks); - // 11. Sync edit log - if (txid != 0) { - syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); - } - walSyncSuccessful = true; - // 12. call postBatchMutate hook + // STEP 11. call postBatchMutate hook processor.postBatchMutate(this); } } finally { @@ -6990,12 +6732,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getStore(cell).rollback(cell); } } - if (writeEntry != null) { - mvcc.complete(writeEntry); - writeEntry = null; - } + WALKey.completeMvcc(walKey); } - // 13. Roll mvcc forward + // 12. Roll mvcc forward if (writeEntry != null) { mvcc.completeAndWait(writeEntry); } @@ -7006,9 +6745,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi releaseRowLocks(acquiredRowLocks); } - // 14. Run post-process hook + // 13. Run post-process hook processor.postProcess(this, walEdit, walSyncSuccessful); - } finally { closeRegionOperation(); if (!mutations.isEmpty() && @@ -7018,6 +6756,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void preProcess(final RowProcessor processor, final WALEdit walEdit) + throws IOException { + try { + processor.preProcess(this, walEdit); + } catch (IOException e) { + closeRegionOperation(); + throw e; + } + } + private void doProcessRowWithTimeout(final RowProcessor processor, final long now, final HRegion region, @@ -7068,500 +6816,357 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - /** - * @return The passed-in {@code tags} but with the tags from {@code cell} added. - */ - private static List carryForwardTags(final Cell cell, final List tags) { - if (cell.getTagsLength() <= 0) return tags; - List newTags = tags == null? new ArrayList(): /*Append Tags*/tags; - Iterator i = CellUtil.tagsIterator(cell); - while (i.hasNext()) newTags.add(i.next()); - return newTags; + public Result append(Append append) throws IOException { + return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); } - /** - * Run a Get against passed in store on passed row, etc. - * @return Get result. - */ - private List doGet(final Store store, final byte [] row, - final Map.Entry> family, final TimeRange tr) - throws IOException { - // Sort the cells so that they match the order that they - // appear in the Get results. Otherwise, we won't be able to - // find the existing values if the cells are not specified - // in order by the client since cells are in an array list. - Collections.sort(family.getValue(), store.getComparator()); - // Get previous values for all columns in this family - Get get = new Get(row); - for (Cell cell : family.getValue()) { - get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell)); - } - if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax()); - return get(get, false); + @Override + public Result append(Append mutation, long nonceGroup, long nonce) throws IOException { + return doDelta(Operation.APPEND, mutation, nonceGroup, nonce); } - public Result append(Append append) throws IOException { - return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); + public Result increment(Increment increment) throws IOException { + return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); } - // TODO: There's a lot of boiler plate code identical to increment. - // We should refactor append and increment as local get-mutate-put - // transactions, so all stores only go through one code path for puts. - @Override - public Result append(Append mutate, long nonceGroup, long nonce) throws IOException { - Operation op = Operation.APPEND; - byte[] row = mutate.getRow(); - checkRow(row, op.toString()); - checkFamilies(mutate.getFamilyCellMap().keySet()); - boolean flush = false; - Durability durability = getEffectiveDurability(mutate.getDurability()); - boolean writeToWAL = durability != Durability.SKIP_WAL; - WALEdit walEdits = null; - List allKVs = new ArrayList(mutate.size()); - Map> tempMemstore = new HashMap>(); - long size = 0; - long txid = 0; + public Result increment(Increment mutation, long nonceGroup, long nonce) + throws IOException { + return doDelta(Operation.INCREMENT, mutation, nonceGroup, nonce); + } + + /** + * Add "deltas" to Cells. Deltas are increments or appends. Switch on op. + * + *

If increment, add deltas to current values or if an append, then + * append the deltas to the current Cell values. + * + *

Append and Increment code paths are mostly the same. They differ in just a few places. + * This method does the code path for increment and append and then in key spots, switches + * on the passed in op to do increment or append specific paths. + */ + private Result doDelta(Operation op, Mutation mutation, long nonceGroup, long nonce) + throws IOException { checkReadOnly(); checkResources(); - // Lock row - startRegionOperation(op); + checkRow(mutation.getRow(), op.toString()); + checkFamilies(mutation.getFamilyCellMap().keySet()); this.writeRequestsCount.increment(); - RowLock rowLock = null; WALKey walKey = null; - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; - boolean doRollBackMemstore = false; + startRegionOperation(op); + long accumulatedResultSize = 0; + List results = new ArrayList(mutation.size()); + RowLock rowLock = getRowLock(mutation.getRow()); try { - rowLock = getRowLock(row); - assert rowLock != null; + lock(this.updatesLock.readLock()); try { - lock(this.updatesLock.readLock()); - try { - // Wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state when we do our Get) - mvcc.await(); - if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preAppendAfterRowLock(mutate); - if (r!= null) { - return r; - } - } - long now = EnvironmentEdgeManager.currentTime(); - // Process each family - for (Map.Entry> family : mutate.getFamilyCellMap().entrySet()) { - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); - - List results = doGet(store, row, family, null); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the append value - - // Avoid as much copying as possible. We may need to rewrite and - // consolidate tags. Bytes are only copied once. - // Would be nice if KeyValue had scatter/gather logic - int idx = 0; - for (Cell cell : family.getValue()) { - Cell newCell; - Cell oldCell = null; - if (idx < results.size() - && CellUtil.matchingQualifier(results.get(idx), cell)) { - oldCell = results.get(idx); - long ts = Math.max(now, oldCell.getTimestamp()); - - // Process cell tags - // Make a union of the set of tags in the old and new KVs - List newTags = carryForwardTags(oldCell, new ArrayList()); - newTags = carryForwardTags(cell, newTags); - - // Cell TTL handling - - if (mutate.getTTL() != Long.MAX_VALUE) { - // Add the new TTL tag - newTags.add( - new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); - } - - // Rebuild tags - byte[] tagBytes = TagUtil.fromList(newTags); - - // allocate an empty cell once - newCell = new KeyValue(row.length, cell.getFamilyLength(), - cell.getQualifierLength(), ts, KeyValue.Type.Put, - oldCell.getValueLength() + cell.getValueLength(), - tagBytes.length); - // copy in row, family, and qualifier - System.arraycopy(cell.getRowArray(), cell.getRowOffset(), - newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength()); - System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(), - newCell.getFamilyArray(), newCell.getFamilyOffset(), - cell.getFamilyLength()); - System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), - newCell.getQualifierArray(), newCell.getQualifierOffset(), - cell.getQualifierLength()); - // copy in the value - CellUtil.copyValueTo(oldCell, newCell.getValueArray(), newCell.getValueOffset()); - System.arraycopy(cell.getValueArray(), cell.getValueOffset(), - newCell.getValueArray(), - newCell.getValueOffset() + oldCell.getValueLength(), - cell.getValueLength()); - // Copy in tag data - System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), - tagBytes.length); - idx++; - } else { - // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP - CellUtil.updateLatestStamp(cell, now); - - // Cell TTL handling - - if (mutate.getTTL() != Long.MAX_VALUE) { - List newTags = new ArrayList(1); - newTags.add( - new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); - // Add the new TTL tag - newCell = new TagRewriteCell(cell, TagUtil.fromList(newTags)); - } else { - newCell = cell; - } - } - - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND, - mutate, oldCell, newCell); - } - kvs.add(newCell); - - // Append update to WAL - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newCell); - } - } - - //store the kvs to the temporary memstore before writing WAL - tempMemstore.put(store, kvs); - } - - // Actually write to WAL now - if (walEdits != null && !walEdits.isEmpty()) { - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey( - getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), - WALKey.NO_SEQUENCE_ID, - nonceGroup, - nonce, - mvcc); - txid = - this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true); - } else { - recordMutationWithoutWal(mutate.getFamilyCellMap()); - } - } - if (walKey == null) { - // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned - walKey = this.appendEmptyEdit(this.wal); - } - - // now start my own transaction - writeEntry = walKey.getWriteEntry(); - - - // Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - // Is this right? It immediately becomes visible? St.Ack 20150907 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - for (Cell cell: entry.getValue()) { - CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); - size += store.add(cell); - doRollBackMemstore = true; - } - } - // We add to all KVs here whereas when doing increment, we do it - // earlier... why? - allKVs.addAll(entry.getValue()); + Result cpResult = doCoprocessorPreCall(op, mutation); + if (cpResult != null) return cpResult; + Durability effectiveDurability = getEffectiveDurability(mutation.getDurability()); + Map> forMemStore = + new HashMap>(mutation.getFamilyCellMap().size()); + // Reckon Cells to apply to WAL -- in returned walEdit -- and what to add to memstore and + // what to return back to the client (in 'forMemStore' and 'results' respectively). + WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results); + long txid = 0; + // Actually write to WAL now if a walEdit to apply. + if (walEdit != null && !walEdit.isEmpty()) { + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce, mvcc); + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdit, true); + } else { + // If walEdits is empty, it means we skipped the WAL; update counters. + recordMutationWithoutWal(mutation.getFamilyCellMap()); + } + // If walKey == null, append faked WALEdit in order for SKIP_WAL updates to get an mvccNum + if (walKey == null) walKey = this.appendEmptyEdit(this.wal); + // Call sync on our edit. + if (txid != 0) sync(txid, effectiveDurability); + // Now write to MemStore; do it a column family at a time.. + for (Map.Entry> entry: forMemStore.entrySet()) { + Store store = entry.getKey(); + List cells = entry.getValue(); + if (store.getFamily().getMaxVersions() == 1) { + // Upsert if VERSIONS for this CF == 1. + accumulatedResultSize += store.upsert(cells, walKey.getSequenceId()); + } else { + // Otherwise keep older versions around + for (Cell cell: cells) { + accumulatedResultSize += store.add(cell); } - - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); } - } finally { - this.updatesLock.readLock().unlock(); } - + WALKey.completeMvcc(walKey); + // Clear the write entry to signify success (else it gets completed again in finally below) + walKey.setWriteEntry(null); } finally { - rowLock.release(); - rowLock = null; - } - // sync the transaction log outside the rowlock - if(txid != 0){ - syncOrDefer(txid, durability); + this.updatesLock.readLock().unlock(); } - doRollBackMemstore = false; + return Result.create(results); } finally { - if (rowLock != null) { - rowLock.release(); - } - // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - rollbackMemstore(allKVs); - if (writeEntry != null) mvcc.complete(writeEntry); - } else if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - } - + rowLock.release(); + WALKey.completeMvcc(walKey); + // Request a cache flush if over the limit. Do it outside update lock. + if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush(); closeRegionOperation(op); + if (this.metricsRegion != null) this.metricsRegion.updateIncrement(); } + } - if (this.metricsRegion != null) { - this.metricsRegion.updateAppend(); - } - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); + /** + * Do coprocessor pre-increment or pre-append call. + * @return Result returned out of the coprocessor, which means bypass all further processing and + * return the proffered Result instead, or null which means proceed. + */ + private Result doCoprocessorPreCall(final Operation op, final Mutation mutation) + throws IOException { + Result result = null; + if (this.coprocessorHost != null) { + switch(op) { + case INCREMENT: + result = this.coprocessorHost.preIncrementAfterRowLock((Increment)mutation); + break; + case APPEND: + result = this.coprocessorHost.preAppendAfterRowLock((Append)mutation); + break; + default: throw new UnsupportedOperationException(op.toString()); + } } - - return mutate.isReturnResults() ? Result.create(allKVs) : null; + return result; } - public Result increment(Increment increment) throws IOException { - return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); + /** + * Reckon the Cells to apply to WAL, memstore, and to return to the Client; these Sets are not + * always the same dependent on whether to write WAL or if the amount to increment is zero (in + * this case we write back nothing, just return latest Cell value to the client). + * + * @param results Fill in here what goes back to the Client + * @param forMemStore Fill in here what to apply to the MemStore (by Store). + * @return A WALEdit to apply to WAL or null if we are to skip the WAL. + * @throws IOException + */ + private WALEdit reckonDeltas(final Operation op, final Mutation mutation, + final Durability effectiveDurability, final Map> forMemStore, + final List results) + throws IOException { + WALEdit walEdit = null; + long now = EnvironmentEdgeManager.currentTime(); + final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; + // Process a Store/family at a time. + for (Map.Entry> entry: mutation.getFamilyCellMap().entrySet()) { + final byte [] columnFamilyName = entry.getKey(); + List deltas = entry.getValue(); + Store store = this.stores.get(columnFamilyName); + // Reckon for the Store what to apply to WAL and MemStore. + List toApply = + reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results); + if (!toApply.isEmpty()) { + forMemStore.put(store, toApply); + if (writeToWAL) { + if (walEdit == null) walEdit = new WALEdit(); + walEdit.getCells().addAll(toApply); + } + } + } + return walEdit; } - // TODO: There's a lot of boiler plate code identical to append. - // We should refactor append and increment as local get-mutate-put - // transactions, so all stores only go through one code path for puts. + /** + * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed + * column family/Store. + * + * Does Get of current value and then adds passed in deltas for this Store returning the result. + * + * @param op Whether Increment or Append + * @param mutation The encompassing Mutation object + * @param deltas Changes to apply to this Store; either increment amount or data to append + * @param results In here we accumulate all the Cells we are to return to the client; this List + * can be larger than what we return in case where delta is zero; i.e. don't write + * out new values, just return current value. + * @return Resulting Cells after deltas have been applied to current + * values. Side effect is our filling out of the results List. + * @throws IOException + */ + private List reckonDeltasByStore(final Store store, final Operation op, + final Mutation mutation, final Durability effectiveDurability, final long now, + final List deltas, final List results) + throws IOException { + byte [] columnFamily = store.getFamily().getName(); + List toApply = new ArrayList(deltas.size()); + // Get previous values for all columns in this family + List currentValues = get(mutation, store, deltas, null, + op == Operation.INCREMENT? ((Increment)mutation).getTimeRange(): null); + // Iterate the input columns and update existing values if they were found, otherwise + // add new column initialized to the delta amount + int currentValuesIndex = 0; + for (int i = 0; i < deltas.size(); i++) { + Cell delta = deltas.get(i); + Cell currentValue = null; + if (currentValuesIndex < currentValues.size() && + CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) { + currentValue = currentValues.get(currentValuesIndex); + if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) { + currentValuesIndex++; + } + } + // Switch on whether this an increment or an append building the new Cell to apply. + Cell newCell = null; + MutationType mutationType = null; + boolean apply = true; + switch (op) { + case INCREMENT: + mutationType = MutationType.INCREMENT; + // If delta amount to apply is 0, don't write WAL or MemStore. + long deltaAmount = getLongValue(delta); + apply = deltaAmount != 0; + newCell = reckonIncrement(delta, deltaAmount, currentValue, columnFamily, now, + (Increment)mutation); + break; + case APPEND: + mutationType = MutationType.APPEND; + // Always apply Append. TODO: Does empty delta value mean reset Cell? It seems to. + newCell = reckonAppend(delta, currentValue, now, (Append)mutation); + break; + default: throw new UnsupportedOperationException(op.toString()); + } - // They are subtley different in quiet a few ways. This came out only - // after study. I am not sure that many of the differences are intentional. - // TODO: St.Ack 20150907 + // Give coprocessors a chance to update the new cell + if (coprocessorHost != null) { + newCell = + coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell); + } + // If apply, we need to update memstore/WAL with new value; add it toApply. + if (apply) toApply.add(newCell); + // Add to results to get returned to the Client. + results.add(newCell); + } + return toApply; + } - @Override - public Result increment(Increment mutation, long nonceGroup, long nonce) + /** + * Calculate new Increment Cell. + * @return New Increment Cell with delta applied to currentValue if currentValue is not null; + * otherwise, a new Cell with the delta set as its value. + */ + private Cell reckonIncrement(final Cell delta, final long deltaAmount, final Cell currentValue, + byte [] columnFamily, final long now, Mutation mutation) throws IOException { - Operation op = Operation.INCREMENT; + // Forward any tags found on the delta. + List tags = TagUtil.carryForwardTags(delta); + long newValue = deltaAmount; + long ts = now; + if (currentValue != null) { + tags = TagUtil.carryForwardTags(tags, currentValue); + ts = Math.max(now, currentValue.getTimestamp()); + newValue += getLongValue(currentValue); + } + // Now make up the new Cell. TODO: FIX. This is carnel knowledge of how KeyValues are made... + // doesn't work well with offheaping or if we are doing a different Cell type. + byte [] incrementAmountInBytes = Bytes.toBytes(newValue); + tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); byte [] row = mutation.getRow(); - checkRow(row, op.toString()); - checkFamilies(mutation.getFamilyCellMap().keySet()); - boolean flush = false; - Durability durability = getEffectiveDurability(mutation.getDurability()); - boolean writeToWAL = durability != Durability.SKIP_WAL; - WALEdit walEdits = null; - List allKVs = new ArrayList(mutation.size()); - - Map> tempMemstore = new HashMap>(); - long size = 0; - long txid = 0; - checkReadOnly(); - checkResources(); - // Lock row - startRegionOperation(op); - this.writeRequestsCount.increment(); - RowLock rowLock = null; - WALKey walKey = null; - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; - boolean doRollBackMemstore = false; - TimeRange tr = mutation.getTimeRange(); - try { - rowLock = getRowLock(row); - assert rowLock != null; - try { - lock(this.updatesLock.readLock()); - try { - // wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state) - mvcc.await(); - if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation); - if (r != null) { - return r; - } - } - long now = EnvironmentEdgeManager.currentTime(); - // Process each family - for (Map.Entry> family: mutation.getFamilyCellMap().entrySet()) { - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); - - List results = doGet(store, row, family, tr); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the increment amount - - // Avoid as much copying as possible. We may need to rewrite and - // consolidate tags. Bytes are only copied once. - // Would be nice if KeyValue had scatter/gather logic - int idx = 0; - // HERE WE DIVERGE FROM APPEND - List edits = family.getValue(); - for (int i = 0; i < edits.size(); i++) { - Cell cell = edits.get(i); - long amount = Bytes.toLong(CellUtil.cloneValue(cell)); - boolean noWriteBack = (amount == 0); - - List newTags = carryForwardTags(cell, new ArrayList()); - - Cell c = null; - long ts = now; - if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { - c = results.get(idx); - ts = Math.max(now, c.getTimestamp()); - if(c.getValueLength() == Bytes.SIZEOF_LONG) { - amount += CellUtil.getValueAsLong(c); - } else { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new org.apache.hadoop.hbase.DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); - } - // Carry tags forward from previous version - newTags = carryForwardTags(c, newTags); - if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) { - idx++; - } - } - - // Append new incremented KeyValue to list - byte[] q = CellUtil.cloneQualifier(cell); - byte[] val = Bytes.toBytes(amount); - - // Add the TTL tag if the mutation carried one - if (mutation.getTTL() != Long.MAX_VALUE) { - newTags.add( - new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL()))); - } - - Cell newKV = new KeyValue(row, 0, row.length, - family.getKey(), 0, family.getKey().length, - q, 0, q.length, - ts, - KeyValue.Type.Put, - val, 0, val.length, - newTags); - - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newKV = coprocessorHost.postMutationBeforeWAL( - RegionObserver.MutationType.INCREMENT, mutation, c, newKV); - } - allKVs.add(newKV); - - if (!noWriteBack) { - kvs.add(newKV); - - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } - } - } - - //store the kvs to the temporary memstore before writing WAL - if (!kvs.isEmpty()) { - tempMemstore.put(store, kvs); - } - } - - // Actually write to WAL now - if (walEdits != null && !walEdits.isEmpty()) { - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), - WALKey.NO_SEQUENCE_ID, - nonceGroup, - nonce, - mvcc); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdits, true); - } else { - recordMutationWithoutWal(mutation.getFamilyCellMap()); - } - } - if (walKey == null) { - // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned - walKey = this.appendEmptyEdit(this.wal); - } + return new KeyValue(row, 0, row.length, + columnFamily, 0, columnFamily.length, + delta.getQualifierArray(), delta.getQualifierOffset(), delta.getQualifierLength(), + ts, KeyValue.Type.Put, + incrementAmountInBytes, 0, incrementAmountInBytes.length, + tags); + } - // now start my own transaction - writeEntry = walKey.getWriteEntry(); - - // Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - // Is this right? It immediately becomes visible? St.Ack 20150907 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - for (Cell cell : entry.getValue()) { - CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); - size += store.add(cell); - doRollBackMemstore = true; - } - } - } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); - } - } finally { - this.updatesLock.readLock().unlock(); - } - } finally { - rowLock.release(); - rowLock = null; - } - // sync the transaction log outside the rowlock - if(txid != 0){ - syncOrDefer(txid, durability); - } - doRollBackMemstore = false; - } finally { - if (rowLock != null) { - rowLock.release(); - } - // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - for(List cells: tempMemstore.values()) { - rollbackMemstore(cells); - } - if (writeEntry != null) mvcc.complete(writeEntry); - } else if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - } - closeRegionOperation(Operation.INCREMENT); - if (this.metricsRegion != null) { - this.metricsRegion.updateIncrement(); + private Cell reckonAppend(final Cell delta, final Cell currentValue, final long now, + Append mutation) + throws IOException { + // Forward any tags found on the delta. + List tags = TagUtil.carryForwardTags(delta); + long ts = now; + Cell newCell = null; + byte [] row = mutation.getRow(); + if (currentValue != null) { + tags = TagUtil.carryForwardTags(tags, currentValue); + ts = Math.max(now, currentValue.getTimestamp()); + tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); + byte[] tagBytes = TagUtil.fromList(tags); + // Allocate an empty cell and copy in all parts. + // TODO: This is intimate knowledge of how a KeyValue is made. Undo!!! Prevents our doing + // other Cell types. Copying on-heap too if an off-heap Cell. + newCell = new KeyValue(row.length, delta.getFamilyLength(), + delta.getQualifierLength(), ts, KeyValue.Type.Put, + delta.getValueLength() + currentValue.getValueLength(), + tagBytes == null? 0: tagBytes.length); + // Copy in row, family, and qualifier + System.arraycopy(row, 0, newCell.getRowArray(), newCell.getRowOffset(), row.length); + System.arraycopy(delta.getFamilyArray(), delta.getFamilyOffset(), + newCell.getFamilyArray(), newCell.getFamilyOffset(), delta.getFamilyLength()); + System.arraycopy(delta.getQualifierArray(), delta.getQualifierOffset(), + newCell.getQualifierArray(), newCell.getQualifierOffset(), delta.getQualifierLength()); + // Copy in the value + CellUtil.copyValueTo(currentValue, newCell.getValueArray(), newCell.getValueOffset()); + System.arraycopy(delta.getValueArray(), delta.getValueOffset(), + newCell.getValueArray(), newCell.getValueOffset() + currentValue.getValueLength(), + delta.getValueLength()); + // Copy in tag data + if (tagBytes != null) { + System.arraycopy(tagBytes, 0, + newCell.getTagsArray(), newCell.getTagsOffset(), tagBytes.length); } + } else { + // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP + CellUtil.updateLatestStamp(delta, now); + newCell = delta; + tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); + if (tags != null) newCell = new TagRewriteCell(delta, TagUtil.fromList(tags)); } + return newCell; + } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } - return mutation.isReturnResults() ? Result.create(allKVs) : null; + /** + * @return Get the long out of the passed in Cell + * @throws DoNotRetryIOException + */ + private static long getLongValue(final Cell cell) throws DoNotRetryIOException { + int len = cell.getValueLength(); + if (len != Bytes.SIZEOF_LONG) { + // throw DoNotRetryIOException instead of IllegalArgumentException + throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide"); + } + return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len); + } + + /** + * Do a specific Get on passed columnFamily and column qualifiers. + * @param mutation Mutation we are doing this Get for. + * @param columnFamily Which column family on row (TODO: Go all Gets in one go) + * @param coordinates Cells from mutation used as coordinates applied to Get. + * @return Return list of Cells found. + */ + private List get(final Mutation mutation, final Store store, + final List coordinates, final IsolationLevel isolation, final TimeRange tr) + throws IOException { + // Sort the cells so that they match the order that they appear in the Get results. Otherwise, + // we won't be able to find the existing values if the cells are not specified in order by the + // client since cells are in an array list. + // TODO: I don't get why we are sorting. St.Ack 20150107 + sort(coordinates, store.getComparator()); + Get get = new Get(mutation.getRow()); + if (isolation != null) get.setIsolationLevel(isolation); + for (Cell cell: coordinates) { + get.addColumn(store.getFamily().getName(), CellUtil.cloneQualifier(cell)); + } + // Increments carry time range. If an Increment instance, put it on the Get. + if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax()); + return get(get, false); + } + + /** + * @return Sorted list of cells using comparator + */ + private static List sort(List cells, final Comparator comparator) { + Collections.sort(cells, comparator); + return cells; } // @@ -7704,53 +7309,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return responseBuilder.build(); } - /* - * Process table. - * Do major compaction or list content. - * @throws IOException - */ - private static void processTable(final FileSystem fs, final Path p, - final WALFactory walFactory, final Configuration c, - final boolean majorCompact) - throws IOException { - HRegion region; - FSTableDescriptors fst = new FSTableDescriptors(c); - // Currently expects tables have one region only. - if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) { - final WAL wal = walFactory.getMetaWAL( - HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); - region = HRegion.newHRegion(p, wal, fs, c, - HRegionInfo.FIRST_META_REGIONINFO, - fst.get(TableName.META_TABLE_NAME), null); - } else { - throw new IOException("Not a known catalog table: " + p.toString()); - } - try { - region.mvcc.advanceTo(region.initialize(null)); - if (majorCompact) { - region.compact(true); - } else { - // Default behavior - Scan scan = new Scan(); - // scan.addFamily(HConstants.CATALOG_FAMILY); - RegionScanner scanner = region.getScanner(scan); - try { - List kvs = new ArrayList(); - boolean done; - do { - kvs.clear(); - done = scanner.next(kvs); - if (kvs.size() > 0) LOG.info(kvs); - } while (done); - } finally { - scanner.close(); - } - } - } finally { - region.close(); - } - } - boolean shouldForceSplit() { return this.splitRequest; } @@ -8001,12 +7559,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * Calls sync with the given transaction ID if the region's table is not - * deferring it. + * Calls sync with the given transaction ID * @param txid should sync up to which transaction * @throws IOException If anything goes wrong with DFS */ - private void syncOrDefer(long txid, Durability durability) throws IOException { + private void sync(long txid, Durability durability) throws IOException { if (this.getRegionInfo().isMetaRegion()) { this.wal.sync(txid); } else { @@ -8067,45 +7624,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } }; - /** - * Facility for dumping and compacting catalog tables. - * Only does catalog tables since these are only tables we for sure know - * schema on. For usage run: - *

-   *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
-   * 
- * @throws IOException - */ - public static void main(String[] args) throws IOException { - if (args.length < 1) { - printUsageAndExit(null); - } - boolean majorCompact = false; - if (args.length > 1) { - if (!args[1].toLowerCase().startsWith("major")) { - printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">"); - } - majorCompact = true; - } - final Path tableDir = new Path(args[0]); - final Configuration c = HBaseConfiguration.create(); - final FileSystem fs = FileSystem.get(c); - final Path logdir = new Path(c.get("hbase.tmp.dir")); - final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis(); - - final Configuration walConf = new Configuration(c); - FSUtils.setRootDir(walConf, logdir); - final WALFactory wals = new WALFactory(walConf, null, logname); - try { - processTable(fs, tableDir, wals, c, majorCompact); - } finally { - wals.close(); - // TODO: is this still right? - BlockCache bc = new CacheConfig(c).getBlockCache(); - if (bc != null) bc.shutdown(); - } - } - @Override public long getOpenSeqNum() { return this.openSeqNum; @@ -8143,16 +7661,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi assert newValue >= 0; } - /** - * Do not change this sequence id. - * @return sequenceId - */ @VisibleForTesting - public long getSequenceId() { - return this.mvcc.getReadPoint(); + public long getReadPoint() { + return getReadPoint(IsolationLevel.READ_COMMITTED); } - /** * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore * the WALEdit append later. @@ -8166,14 +7679,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC()); - + // Call append but with an empty WALEdit. The returned sequence id will not be associated // with any edit and we can be sure it went in after all outstanding appends. try { wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false); } catch (Throwable t) { // If exception, our mvcc won't get cleaned up by client, so do it here. - getMVCC().complete(key.getWriteEntry()); + WALKey.completeMvcc(key); } return key; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 6d87057..7b285fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -157,7 +157,7 @@ public interface Region extends ConfigurationObserver { boolean isLoadingCfsOnDemandDefault(); /** @return readpoint considering given IsolationLevel */ - long getReadpoint(IsolationLevel isolationLevel); + long getReadPoint(IsolationLevel isolationLevel); /** * @return The earliest time a store in the region was flushed. All @@ -217,8 +217,8 @@ public interface Region extends ConfigurationObserver { // Region read locks /** - * Operation enum is used in {@link Region#startRegionOperation} to provide context for - * various checks before any region operation begins. + * Operation enum is used in {@link Region#startRegionOperation} and elsewhere to provide + * context for various checks. */ enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, @@ -323,9 +323,11 @@ public interface Region extends ConfigurationObserver { OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException; /** - * Atomically checks if a row/family/qualifier value matches the expected val - * If it does, it performs the row mutations. If the passed value is null, t - * is for the lack of column (ie: non-existence) + * Atomically checks if a row/family/qualifier value matches the expected value and if it does, + * it performs the mutation. If the passed value is null, the lack of column value + * (ie: non-existence) is used. See + * {@link #checkAndRowMutate(byte[], byte[], byte[], CompareOp, ByteArrayComparable, + * RowMutations, boolean)} to do many checkAndPuts at a time on a single row. * @param row to check * @param family column family to check * @param qualifier column qualifier to check @@ -340,9 +342,11 @@ public interface Region extends ConfigurationObserver { ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) throws IOException; /** - * Atomically checks if a row/family/qualifier value matches the expected val - * If it does, it performs the row mutations. If the passed value is null, t - * is for the lack of column (ie: non-existence) + * Atomically checks if a row/family/qualifier value matches the expected values and if it does, + * it performs the row mutations. If the passed value is null, the lack of column value + * (ie: non-existence) is used. Use to do many mutations on a single row. Use + * {@link #checkAndMutate(byte[], byte[], byte[], CompareOp, ByteArrayComparable, Mutation, + * boolean)} to do one checkAndMutate at a time. * @param row to check * @param family column family to check * @param qualifier column qualifier to check @@ -350,7 +354,7 @@ public interface Region extends ConfigurationObserver { * @param comparator * @param mutations * @param writeToWAL - * @return true if mutation was applied, false otherwise + * @return true if mutations were applied, false otherwise * @throws IOException */ boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOp compareOp, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 3049608..913f6bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -260,7 +260,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { this(store, scan, scanInfo, null, - ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false); + ((HStore)store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false); if (dropDeletesFromRow == null) { matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint, earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java deleted file mode 100644 index f7ae208..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.wal; - -import java.io.IOException; -import java.util.List; -import java.util.UUID; - -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; - -/** - * An HLogKey specific to WalEdits coming from replay. - */ -@InterfaceAudience.Private -public class ReplayHLogKey extends HLogKey { - - public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename, - final long now, List clusterIds, long nonceGroup, long nonce, - MultiVersionConcurrencyControl mvcc) { - super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc); - } - - public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename, - long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce, - MultiVersionConcurrencyControl mvcc) { - super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc); - } - - /** - * Returns the original sequence id - * @return long the new assigned sequence number - */ - @Override - public long getSequenceId() throws IOException { - return this.getOrigLogSeqNum(); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 4091a82..8b9b5ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType; import org.apache.hadoop.hbase.regionserver.SequenceId; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -664,4 +665,16 @@ public class WALKey implements SequenceId, Comparable { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } } -} + + /** + * Complete the mvcc on the passed in walKey. + * Utility method. + */ + public static void completeMvcc(final WALKey walKey) throws InterruptedIOException { + if (walKey == null) return; + WriteEntry we = walKey.getWriteEntry(); + if (we == null) return; + walKey.getMvcc().complete(we); + walKey.setWriteEntry(null); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java new file mode 100644 index 0000000..aed3d0a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; + + +/** + * Simple Increments Performance Test. Run this from main. It is to go against a cluster. + * Presumption is the table exists already. Defaults are a zk ensemble of localhost:2181, + * a tableName of 'tableName', a column famly name of 'columnFamilyName', with 80 threads by + * default and 10000 increments per thread. To change any of these configs, pass -DNAME=VALUE as + * in -DtableName="newTableName". It prints out configuration it is running with at the start and + * on the end it prints out percentiles. + */ +public class IncrementPerformanceTest implements Tool { + private static final Log LOG = LogFactory.getLog(IncrementPerformanceTest.class); + private static final byte [] QUALIFIER = new byte [] {'q'}; + private Configuration conf; + private final MetricRegistry metrics = new MetricRegistry(); + private static final String TABLENAME = "tableName"; + private static final String COLUMN_FAMILY = "columnFamilyName"; + private static final String THREAD_COUNT = "threadCount"; + private static final int DEFAULT_THREAD_COUNT = 80; + private static final String INCREMENT_COUNT = "incrementCount"; + private static final int DEFAULT_INCREMENT_COUNT = 10000; + + IncrementPerformanceTest() {} + + public int run(final String [] args) throws Exception { + Configuration conf = getConf(); + final TableName tableName = TableName.valueOf(conf.get(TABLENAME), TABLENAME); + final byte [] columnFamilyName = Bytes.toBytes(conf.get(COLUMN_FAMILY, COLUMN_FAMILY)); + int threadCount = conf.getInt(THREAD_COUNT, DEFAULT_THREAD_COUNT); + final int incrementCount = conf.getInt(INCREMENT_COUNT, DEFAULT_INCREMENT_COUNT); + LOG.info("Running test with " + HConstants.ZOOKEEPER_QUORUM + "=" + + getConf().get(HConstants.ZOOKEEPER_QUORUM) + ", tableName=" + tableName + + ", columnFamilyName=" + columnFamilyName + ", threadCount=" + threadCount + + ", incrementCount=" + incrementCount); + + ExecutorService service = Executors.newFixedThreadPool(threadCount); + Set> futures = new HashSet>(); + final AtomicInteger integer = new AtomicInteger(0); // needed a simple "final" counter + while (integer.incrementAndGet() <= threadCount) { + futures.add(service.submit(new Runnable() { + @Override + public void run() { + try { + try (Connection connection = ConnectionFactory.createConnection(getConf())) { + try (Table table = connection.getTable(tableName)) { + Timer timer = metrics.timer("increments"); + for (int i = 0; i < incrementCount; i++) { + byte[] row = Bytes.toBytes(i); + Timer.Context context = timer.time(); + try { + table.incrementColumnValue(row, columnFamilyName, QUALIFIER, 1l); + } catch (IOException e) { + // swallow..it's a test. + } finally { + context.stop(); + } + } + } + } + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + })); + } + + for(Future future : futures) future.get(); + service.shutdown(); + Snapshot s = metrics.timer("increments").getSnapshot(); + LOG.info(String.format("75th=%s, 95th=%s, 99th=%s", s.get75thPercentile(), + s.get95thPercentile(), s.get99thPercentile())); + return 0; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new IncrementPerformanceTest(), args)); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 8734aea..63d9cd0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -48,7 +48,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -119,6 +118,7 @@ import org.junit.experimental.categories.Category; @Category({LargeTests.class, ClientTests.class}) @SuppressWarnings ("deprecation") public class TestFromClientSide { + // NOTE: Increment tests were moved to their own class, TestIncrementsFromClientSide. private static final Log LOG = LogFactory.getLog(TestFromClientSide.class); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static byte [] ROW = Bytes.toBytes("testRow"); @@ -3046,7 +3046,7 @@ public class TestFromClientSide { equals(value, CellUtil.cloneValue(key))); } - private void assertIncrementKey(Cell key, byte [] row, byte [] family, + static void assertIncrementKey(Cell key, byte [] row, byte [] family, byte [] qualifier, long value) throws Exception { assertTrue("Expected row [" + Bytes.toString(row) + "] " + @@ -3270,7 +3270,7 @@ public class TestFromClientSide { return stamps; } - private boolean equals(byte [] left, byte [] right) { + static boolean equals(byte [] left, byte [] right) { if (left == null && right == null) return true; if (left == null && right.length == 0) return true; if (right == null && left.length == 0) return true; @@ -4399,264 +4399,6 @@ public class TestFromClientSide { } @Test - public void testIncrementWithDeletes() throws Exception { - LOG.info("Starting testIncrementWithDeletes"); - final TableName TABLENAME = - TableName.valueOf("testIncrementWithDeletes"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - TEST_UTIL.flush(TABLENAME); - - Delete del = new Delete(ROW); - ht.delete(del); - - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - - Get get = new Get(ROW); - Result r = ht.get(get); - assertEquals(1, r.size()); - assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN))); - } - - @Test - public void testIncrementingInvalidValue() throws Exception { - LOG.info("Starting testIncrementingInvalidValue"); - final TableName TABLENAME = TableName.valueOf("testIncrementingInvalidValue"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - Put p = new Put(ROW); - // write an integer here (not a Long) - p.addColumn(FAMILY, COLUMN, Bytes.toBytes(5)); - ht.put(p); - try { - ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); - fail("Should have thrown DoNotRetryIOException"); - } catch (DoNotRetryIOException iox) { - // success - } - Increment inc = new Increment(ROW); - inc.addColumn(FAMILY, COLUMN, 5); - try { - ht.increment(inc); - fail("Should have thrown DoNotRetryIOException"); - } catch (DoNotRetryIOException iox) { - // success - } - } - - @Test - public void testIncrementInvalidArguments() throws Exception { - LOG.info("Starting testIncrementInvalidArguments"); - final TableName TABLENAME = TableName.valueOf("testIncrementInvalidArguments"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - final byte[] COLUMN = Bytes.toBytes("column"); - try { - // try null row - ht.incrementColumnValue(null, FAMILY, COLUMN, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - try { - // try null family - ht.incrementColumnValue(ROW, null, COLUMN, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - try { - // try null qualifier - ht.incrementColumnValue(ROW, FAMILY, null, 5); - fail("Should have thrown IOException"); - } catch (IOException iox) { - // success - } - // try null row - try { - Increment incNoRow = new Increment((byte [])null); - incNoRow.addColumn(FAMILY, COLUMN, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } catch (NullPointerException npe) { - // success - } - // try null family - try { - Increment incNoFamily = new Increment(ROW); - incNoFamily.addColumn(null, COLUMN, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } - // try null qualifier - try { - Increment incNoQualifier = new Increment(ROW); - incNoQualifier.addColumn(FAMILY, null, 5); - fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException iax) { - // success - } - } - - @Test - public void testIncrementOutOfOrder() throws Exception { - LOG.info("Starting testIncrementOutOfOrder"); - final TableName TABLENAME = TableName.valueOf("testIncrementOutOfOrder"); - Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); - - byte [][] QUALIFIERS = new byte [][] { - Bytes.toBytes("B"), Bytes.toBytes("A"), Bytes.toBytes("C") - }; - - Increment inc = new Increment(ROW); - for (int i=0; iThere is similar test up in TestAtomicOperation. It does a test where it has 100 threads + * doing increments across two column families all on one row and the increments are connected to + * prove atomicity on row. + */ +@Category(MediumTests.class) +public class TestRegionIncrement { + private static final Log LOG = LogFactory.getLog(TestRegionIncrement.class); + @Rule public TestName name = new TestName(); + @Rule public final TestRule timeout = + CategoryBasedTimeout.builder().withTimeout(this.getClass()). + withLookingForStuckThread(true).build(); + private static HBaseTestingUtility TEST_UTIL; + private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment"); + private static final int THREAD_COUNT = 10; + private static final int INCREMENT_COUNT = 10000; + + @Before + public void setUp() throws Exception { + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.cleanupTestDir(); + } + + private HRegion getRegion(final Configuration conf, final String tableName) throws IOException { + WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), + TEST_UTIL.getDataTestDir().toString(), conf); + return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf, + false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); + } + + private void closeRegion(final HRegion region) throws IOException { + region.close(); + region.getWAL().close(); + } + + /** + * Increments a single cell a bunch of times. + */ + private static class SingleCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment increment; + + SingleCellIncrementer(final int i, final int count, final HRegion region, + final Increment increment) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increment = increment; + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + this.region.increment(this.increment); + // LOG.info(getName() + " " + i); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Increments a random row's Cell count times. + */ + private static class CrossRowCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment [] increments; + + CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increments = new Increment[range]; + for (int ii = 0; ii < range; ii++) { + this.increments[ii] = new Increment(Bytes.toBytes(i)); + this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + } + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + int index = ThreadLocalRandom.current().nextInt(0, this.increments.length); + this.region.increment(this.increments[index]); + // LOG.info(getName() + " " + index); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testUnContendedSingleCellIncrement() + throws IOException, InterruptedException { + final HRegion region = getRegion(TEST_UTIL.getConfiguration(), + TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + byte [] rowBytes = Bytes.toBytes(i); + Increment increment = new Increment(rowBytes); + increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List cells = new ArrayList(THREAD_COUNT); + while(regionScanner.next(cells)) continue; + assertEquals(THREAD_COUNT, cells.size()); + long total = 0; + for (Cell cell: cells) total += + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * This is + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testContendedAcrossCellsIncrement() + throws IOException, InterruptedException { + final HRegion region = getRegion(TEST_UTIL.getConfiguration(), + TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List cells = new ArrayList(100); + while(regionScanner.next(cells)) continue; + assertEquals(THREAD_COUNT, cells.size()); + long total = 0; + for (Cell cell: cells) total += + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java index 0f7f23a..d99643d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java @@ -551,7 +551,7 @@ public class TestTags { public static class TestCoprocessorForTags extends BaseRegionObserver { - public static boolean checkTagPresence = false; + public static volatile boolean checkTagPresence = false; public static List tags = null; @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index af47465..f7994f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -408,13 +408,13 @@ public class TestFSHLog { } region.flush(true); // FlushResult.flushSequenceId is not visible here so go get the current sequence id. - long currentSequenceId = region.getSequenceId(); + long currentSequenceId = region.getReadPoint(); // Now release the appends goslow.setValue(false); synchronized (goslow) { goslow.notifyAll(); } - assertTrue(currentSequenceId >= region.getSequenceId()); + assertTrue(currentSequenceId >= region.getReadPoint()); } finally { region.close(true); wal.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 549a018..51e2340 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -882,7 +882,7 @@ public class TestWALReplay { for (HColumnDescriptor hcd : htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); } - long lastestSeqNumber = region.getSequenceId(); + long lastestSeqNumber = region.getReadPoint(); // get the current seq no wal.doCompleteCacheFlush = true; // allow complete cache flush with the previous seq number got after first diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index c988761..b4eb798 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -298,7 +298,7 @@ public class TestCoprocessorScanPolicy { newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); return new StoreScanner(store, scanInfo, scan, targetCols, - ((HStore) store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED)); + ((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED)); } else { return s; }