From 5ab79b93c3ef4b4d2df8f498c25718398b02c841 Mon Sep 17 00:00:00 2001 From: stack Date: Tue, 22 Dec 2015 10:35:31 -0800 Subject: [PATCH] HBASE-14460 [Perf Regression] Merge of MVCC and SequenceId (HBASE-HBASE-8763) slowed Increments, CheckAndPuts, batch operations Patch for branch-1.0 first. Will address later branches with a different approach (a more radical fixup). Here we are trying to be safe making minimal change. This patch adds a fast increment. To enable it you set the below configuration to true in your hbase-site.xml configuration: hbase.increment.fast.but.narrow.consistency This sets region to take the fast increment path. Constraint is that caller can only access the Cell via Increment; intermixing Increment with other Mutations will give indeterminate results. Get will work or an Increment of zero will return current value. So, to add the above, we effectively copy/paste current Increment after doing a bunch of work to try and move common code out into methods that can be shared. Current increment becomes a switch and dependent on config we take the slow but consistent or the fast but narrowly consistent code path. Increment code path has too much state that it needs to keep up so hard to shrink it down more than what we have here without radical refactor (TODO in master patch; the refactor is needed because even cursory exploration has us DUPLICATING lists of Cells ... some of which is addressed on fast path here but more to do; fast path also simplifies the write to hbase so am able to drop some of the state keeping). Adds a carryForward set of methods for Tags handling which allows us clean up some duplicated code. So, difference between fastAndNarrowConsistencyIncrement and slowButConsistentIncrement is that the former holds the row lock until the sync completes; this allows us to reason that there are no other writers afoot when we read the current increment value. This means we do not wait on mvcc reads to catch up to writes before we proceed with the read, the root of the slowdown seen in HBASE-14460. The fast-path also does not wait on mvcc to complete before returning to the client and we reorder the write so that the update of memstore happens AFTER sync returns; i.e. the write pipeline is less zigzagging now. Added some simple concurrency testing and then a performance testing tool for Increments. Added test that Increment of zero amount returns the current Increment value. --- .../apache/hadoop/hbase/client/TestIncrement.java | 2 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 577 +++++++++++++-------- .../MultiVersionConsistencyControl.java | 2 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 4 +- .../hadoop/hbase/IncrementPerformanceTest.java | 129 +++++ .../hadoop/hbase/client/TestFromClientSide.java | 15 +- .../hbase/regionserver/TestRegionIncrement.java | 287 ++++++++++ 7 files changed, 793 insertions(+), 223 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java index 8a2c447..39cde45 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java @@ -29,7 +29,7 @@ import org.junit.experimental.categories.Category; @Category(SmallTests.class) public class TestIncrement { @Test - public void test() { + public void testIncrementInstance() { final long expected = 13; Increment inc = new Increment(new byte [] {'r'}); int total = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index be41deb..d0815ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -220,6 +221,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // "hbase.hregion.scan.loadColumnFamiliesOnDemand"; /** + * Set region to take the fast increment path. Constraint is that caller can only access the + * Cell via Increment; intermixing Increment with other Mutations will give indeterminate + * results. Get will work or an Increment of zero will return current value. + */ + public static final String INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY = + "hbase.increment.fast.but.narrow.consistency"; + + /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. */ @@ -3250,30 +3259,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // int listSize = cells.size(); for (int i = 0; i < listSize; i++) { Cell cell = cells.get(i); - List newTags = new ArrayList(); - Iterator tagIterator = CellUtil.tagsIterator(cell.getTagsArray(), - cell.getTagsOffset(), cell.getTagsLength()); - - // Carry forward existing tags - - while (tagIterator.hasNext()) { - - // Add any filters or tag specific rewrites here - - newTags.add(tagIterator.next()); - } - - // Cell TTL handling - - // Check again if we need to add a cell TTL because early out logic - // above may change when there are more tag based features in core. - if (m.getTTL() != Long.MAX_VALUE) { - // Add a cell TTL tag - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL()))); - } + List newTags = carryForwardTags(null, cell); + newTags = carryForwardTTLTag(newTags, m); // Rewrite the cell with the updated set of tags - cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), @@ -5668,34 +5657,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // long ts = Math.max(now, oldCell.getTimestamp()); // Process cell tags - List newTags = new ArrayList(); - - // Make a union of the set of tags in the old and new KVs - - if (oldCell.getTagsLength() > 0) { - Iterator i = CellUtil.tagsIterator(oldCell.getTagsArray(), - oldCell.getTagsOffset(), oldCell.getTagsLength()); - while (i.hasNext()) { - newTags.add(i.next()); - } - } - if (cell.getTagsLength() > 0) { - Iterator i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), - cell.getTagsLength()); - while (i.hasNext()) { - newTags.add(i.next()); - } - } - - // Cell TTL handling - - if (append.getTTL() != Long.MAX_VALUE) { - // Add the new TTL tag - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL()))); - } + List tags = carryForwardTags(null, oldCell); + tags = carryForwardTags(tags, cell); + tags = carryForwardTTLTag(tags, append); // Rebuild tags - byte[] tagBytes = Tag.fromList(newTags); + byte[] tagBytes = Tag.fromList(tags); // allocate an empty cell once newCell = new KeyValue(row.length, cell.getFamilyLength(), @@ -5730,8 +5697,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Cell TTL handling if (append.getTTL() != Long.MAX_VALUE) { - List newTags = new ArrayList(1); - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL()))); // Add the new TTL tag newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), @@ -5741,7 +5706,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // cell.getQualifierLength(), cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), - newTags); + carryForwardTTLTag(append)); } else { newCell = cell; } @@ -5860,185 +5825,215 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // */ public Result increment(Increment increment, long nonceGroup, long nonce) throws IOException { - byte [] row = increment.getRow(); - checkRow(row, "increment"); - TimeRange tr = increment.getTimeRange(); - boolean flush = false; - Durability durability = getEffectiveDurability(increment.getDurability()); - boolean writeToWAL = durability != Durability.SKIP_WAL; - WALEdit walEdits = null; - List allKVs = new ArrayList(increment.size()); - Map> tempMemstore = new HashMap>(); - - long size = 0; - long txid = 0; - checkReadOnly(); checkResources(); - // Lock row + checkRow(increment.getRow(), "increment"); startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); + try { + // Which Increment is it? Narrow increment-only consistency or slow (default) and general + // row-wide consistency. + + // So, difference between fastAndNarrowConsistencyIncrement and slowButConsistentIncrement is + // that the former holds the row lock until the sync completes; this allows us to reason that + // there are no other writers afoot when we read the current increment value. This means + // we do not wait on mvcc reads to catch up to writes before we proceed with the read, the + // root of the slowdown seen in HBASE-14460. The fast-path also does not wait on mvcc to + // complete before returning to the client and we reorder the write so that the update of + // memstore happens AFTER sync returns; i.e. the write pipeline is less zigzagging now. + // + // See the comment on INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY + // for the constraints that apply when you take this code path; it is correct but only if + // Increments are used mutating an Increment Cell; mixing concurrent Put+Delete and Increment + // will yield indeterminate results. + return this.conf.getBoolean(INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, false)? + fastAndNarrowConsistencyIncrement(increment, nonceGroup, nonce): + slowButConsistentIncrement(increment, nonceGroup, nonce); + } finally { + if (this.metricsRegion != null) this.metricsRegion.updateIncrement(); + closeRegionOperation(Operation.INCREMENT); + } + } + + /** + * The bulk of this method is a bulk-and-paste of the slowButConsistentIncrement but with some + * reordering to enable the fast increment (reordering allows us to also drop some state + * carrying Lists and variables so the flow here is more straight-forward). We copy-and-paste + * because cannot break down the method further into smaller pieces; too much state. Will redo + * in trunk and tip of branch-1 to undo duplication here and in append, checkAnd*, etc. For why + * this route is 'faster' than the alterative slowButConsistentIncrement path, see the comment + * in calling method. + * @return Resulting increment + * @throws IOException + */ + private Result fastAndNarrowConsistencyIncrement(Increment increment, long nonceGroup, + long nonce) + throws IOException { + boolean flush = false; RowLock rowLock = null; - WriteEntry w = null; WALKey walKey = null; - long mvccNum = 0; - List memstoreCells = new ArrayList(); - boolean doRollBackMemstore = false; + // This is all kvs accumulated during this increment processing. Includes increments where the + // increment is zero: i.e. client just wants to get current state of the increment w/o + // changing it. These latter increments by zero are NOT added to the WAL. + List allKVs = new ArrayList(increment.size()); + Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); + long txid = 0; + rowLock = getRowLock(increment.getRow()); try { - rowLock = getRowLock(row); + lock(this.updatesLock.readLock()); try { - lock(this.updatesLock.readLock()); - try { - // wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state) - mvcc.waitForPreviousTransactionsComplete(); - if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); - if (r != null) { - return r; + if (this.coprocessorHost != null) { + Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); + if (r != null) return r; + } + // Process increments a Store/family at a time. + long now = EnvironmentEdgeManager.currentTime(); + final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; + WALEdit walEdits = null; + // Accumulate edits for memstore to add later after we've added to WAL. + Map> forMemStore = new HashMap>(); + for (Map.Entry> entry: increment.getFamilyCellMap().entrySet()) { + byte [] columnFamilyName = entry.getKey(); + List increments = entry.getValue(); + Store store = this.stores.get(columnFamilyName); + // Do increment for this store; be sure to 'sort' the increments first so increments + // match order in which we get back current Cells when we get. + List results = increment(increment, columnFamilyName, + sort(increments, store.getComparator()), now, + MultiVersionConsistencyControl.NO_WRITE_NUMBER, allKVs, + IsolationLevel.READ_UNCOMMITTED); + if (!results.isEmpty()) { + forMemStore.put(store, results); + // Prepare WAL updates + if (writeToWAL) { + if (walEdits == null) walEdits = new WALEdit(); + walEdits.getCells().addAll(results); } } - // now start my own transaction - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); - long now = EnvironmentEdgeManager.currentTime(); - // Process each family - for (Map.Entry> family: - increment.getFamilyCellMap().entrySet()) { + } - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); + // Actually write to WAL now. If walEdits is non-empty, we write the WAL. + if (walEdits != null && !walEdits.isEmpty()) { + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce); + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdits, getSequenceId(), true, null/*walEdits has the List to apply*/); + } else { + // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned + walKey = this.appendEmptyEdit(this.wal, null/*walEdits has the List to apply*/); + } - // Sort the cells so that they match the order that they - // appear in the Get results. Otherwise, we won't be able to - // find the existing values if the cells are not specified - // in order by the client since cells are in an array list. - Collections.sort(family.getValue(), store.getComparator()); - // Get previous values for all columns in this family - Get get = new Get(row); - for (Cell cell: family.getValue()) { - get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell)); + // sync the transaction log outside the rowlock + if (txid != 0) syncOrDefer(txid, effectiveDurability); + + // Now write to memstore. + long accumulatedResultSize = 0; + for (Map.Entry> entry: forMemStore.entrySet()) { + Store store = entry.getKey(); + List results = entry.getValue(); + if (store.getFamily().getMaxVersions() == 1) { + // Upsert if VERSIONS for this CF == 1 + accumulatedResultSize += store.upsert(results, getSmallestReadPoint()); + } else { + // Otherwise keep older versions around + for (Cell cell: results) { + Pair ret = store.add(cell); + accumulatedResultSize += ret.getFirst(); } - get.setTimeRange(tr.getMin(), tr.getMax()); - List results = get(get, false); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the increment amount - int idx = 0; - List edits = family.getValue(); - for (int i = 0; i < edits.size(); i++) { - Cell cell = edits.get(i); - long amount = Bytes.toLong(CellUtil.cloneValue(cell)); - boolean noWriteBack = (amount == 0); - List newTags = new ArrayList(); - - // Carry forward any tags that might have been added by a coprocessor - if (cell.getTagsLength() > 0) { - Iterator itr = CellUtil.tagsIterator(cell.getTagsArray(), - cell.getTagsOffset(), cell.getTagsLength()); - while (itr.hasNext()) { - newTags.add(itr.next()); - } - } - - Cell c = null; - long ts = now; - if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { - c = results.get(idx); - ts = Math.max(now, c.getTimestamp()); - if(c.getValueLength() == Bytes.SIZEOF_LONG) { - amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG); - } else { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new org.apache.hadoop.hbase.DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); - } - // Carry tags forward from previous version - if (c.getTagsLength() > 0) { - Iterator itr = CellUtil.tagsIterator(c.getTagsArray(), - c.getTagsOffset(), c.getTagsLength()); - while (itr.hasNext()) { - newTags.add(itr.next()); - } - } - if (i < ( edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) - idx++; - } - - // Append new incremented KeyValue to list - byte[] q = CellUtil.cloneQualifier(cell); - byte[] val = Bytes.toBytes(amount); - - // Add the TTL tag if the mutation carried one - if (increment.getTTL() != Long.MAX_VALUE) { - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL()))); - } - - Cell newKV = new KeyValue(row, 0, row.length, - family.getKey(), 0, family.getKey().length, - q, 0, q.length, - ts, - KeyValue.Type.Put, - val, 0, val.length, - newTags); - - CellUtil.setSequenceId(newKV, mvccNum); - - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newKV = coprocessorHost.postMutationBeforeWAL( - RegionObserver.MutationType.INCREMENT, increment, c, newKV); - } - allKVs.add(newKV); + } + flush = isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize)); + } + } finally { + this.updatesLock.readLock().unlock(); + } + } finally { + rowLock.release(); + } + // Request a cache flush. Do it outside update lock. + if (flush) requestFlush(); + return Result.create(allKVs); + } - if (!noWriteBack) { - kvs.add(newKV); + private Result slowButConsistentIncrement(Increment increment, long nonceGroup, long nonce) + throws IOException { + boolean flush = false; + RowLock rowLock = null; + WriteEntry writeEntry = null; + WALKey walKey = null; + boolean doRollBackMemstore = false; + List allKVs = new ArrayList(increment.size()); + List memstoreCells = new ArrayList(); + Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); + try { + rowLock = getRowLock(increment.getRow()); + long txid = 0; + try { + lock(this.updatesLock.readLock()); + try { + // Wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest increment) + this.mvcc.waitForPreviousTransactionsComplete(); + if (this.coprocessorHost != null) { + Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); + if (r != null) return r; + } + // Now start my own transaction + long mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + writeEntry = this.mvcc.beginMemstoreInsertWithSeqNum(mvccNum); - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); + // Process increments a Store/family at a time. + long now = EnvironmentEdgeManager.currentTime(); + final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; + WALEdit walEdits = null; + long accumulatedResultSize = 0; + for (Map.Entry> entry: increment.getFamilyCellMap().entrySet()) { + byte [] columnFamilyName = entry.getKey(); + List increments = entry.getValue(); + Store store = this.stores.get(columnFamilyName); + // Do increment for this store; be sure to 'sort' the increments first so increments + // match order in which we get back current Cells when we get. + List results = increment(increment, columnFamilyName, + sort(increments, store.getComparator()), now, mvccNum, allKVs, null); + if (!results.isEmpty()) { + // Prepare WAL updates + if (writeToWAL) { + // Handmade loop on arraylist is faster than enhanced for-loop. + // See http://developer.android.com/training/articles/perf-tips.html + int resultsSize = results.size(); + for (int i = 0; i < resultsSize; i++) { + if (walEdits == null) walEdits = new WALEdit(); + walEdits.add(results.get(i)); } } - } - - //store the kvs to the temporary memstore before writing WAL - if (!kvs.isEmpty()) { - tempMemstore.put(store, kvs); - } - } - - //Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); + // Now write to this Store's memstore. if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - memstoreCells.addAll(entry.getValue()); + // Upsert if VERSIONS for this CF == 1 + accumulatedResultSize += store.upsert(results, getSmallestReadPoint()); + memstoreCells.addAll(results); + // TODO: St.Ack 20151222 Why no rollback in this case? } else { - // otherwise keep older versions around - for (Cell cell : entry.getValue()) { + // Otherwise keep older versions around + for (Cell cell: results) { Pair ret = store.add(cell); - size += ret.getFirst(); + accumulatedResultSize += ret.getFirst(); memstoreCells.add(ret.getSecond()); doRollBackMemstore = true; } } + accumulatedResultSize = this.addAndGetGlobalMemstoreSize(accumulatedResultSize); + flush = isFlushSize(accumulatedResultSize); } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); } // Actually write to WAL now if (walEdits != null && !walEdits.isEmpty()) { if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), @@ -6047,7 +6042,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // recordMutationWithoutWal(increment.getFamilyCellMap()); } } - if(walKey == null){ + if (walKey == null) { // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned walKey = this.appendEmptyEdit(this.wal, memstoreCells); } @@ -6059,33 +6054,177 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // rowLock = null; } // sync the transaction log outside the rowlock - if(txid != 0){ - syncOrDefer(txid, durability); - } + if (txid != 0) syncOrDefer(txid, effectiveDurability); doRollBackMemstore = false; } finally { - if (rowLock != null) { - rowLock.release(); - } + if (rowLock != null) rowLock.release(); // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - rollbackMemstore(memstoreCells); - } - if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + if (doRollBackMemstore) rollbackMemstore(memstoreCells); + if (writeEntry != null) mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); + } + // Request a cache flush. Do it outside update lock. + if (flush) requestFlush(); + return Result.create(allKVs); + } + + /** + * @return Sorted list of cells using comparator + */ + private static List sort(List cells, final Comparator comparator) { + Collections.sort(cells, comparator); + return cells; + } + + /** + * Apply increments to a column family. + * @param sortedIncrements The passed in increments to apply MUST be sorted so that they match + * the order that they appear in the Get results (get results will be sorted on return). + * Otherwise, we won't be able to find the existing values if the cells are not specified in + * order by the client since cells are in an array list. + * @islation Isolation level to use when running the 'get'. Pass null for default. + * @return Resulting increments after sortedIncrements have been applied to current + * values (if any -- else passed increment is the final result). + * @throws IOException + */ + private List increment(Increment increment, byte[] columnFamilyName, + List sortedIncrements, long now, long mvccNum, List allKVs, + final IsolationLevel isolation) + throws IOException { + List results = new ArrayList(sortedIncrements.size()); + byte [] row = increment.getRow(); + // Get previous values for all columns in this family + List currentValues = get(increment, columnFamilyName, sortedIncrements, isolation); + // Iterate the input columns and update existing values if they were found, otherwise + // add new column initialized to the increment amount + int idx = 0; + for (int i = 0; i < sortedIncrements.size(); i++) { + Cell inc = sortedIncrements.get(i); + long incrementAmount = getLongValue(inc); + Bytes.toLong(inc.getValueArray(), inc.getValueOffset(), inc.getValueLength()); + boolean writeBack = (incrementAmount != 0); + // Carry forward any tags that might have been added by a coprocessor. + List tags = carryForwardTags(inc); + + Cell currentValue = null; + long ts = now; + if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) { + currentValue = currentValues.get(idx); + ts = Math.max(now, currentValue.getTimestamp()); + incrementAmount += getLongValue(currentValue); + // Carry forward all tags + tags = carryForwardTags(tags, currentValue); + if (i < (sortedIncrements.size() - 1) && + !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) idx++; + } + + // Append new incremented KeyValue to list + byte[] qualifier = CellUtil.cloneQualifier(inc); + byte[] incrementAmountInBytes = Bytes.toBytes(incrementAmount); + tags = carryForwardTTLTag(tags, increment); + + Cell newValue = new KeyValue(row, 0, row.length, + columnFamilyName, 0, columnFamilyName.length, + qualifier, 0, qualifier.length, + ts, KeyValue.Type.Put, + incrementAmountInBytes, 0, incrementAmountInBytes.length, + tags); + + // Don't set an mvcc if none specified. The mvcc may be assigned later in case where we + // write the memstore AFTER we sync our edit to the log. + if (mvccNum != MultiVersionConsistencyControl.NO_WRITE_NUMBER) { + CellUtil.setSequenceId(newValue, mvccNum); + } + + // Give coprocessors a chance to update the new cell + if (coprocessorHost != null) { + newValue = coprocessorHost.postMutationBeforeWAL( + RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue); } - closeRegionOperation(Operation.INCREMENT); - if (this.metricsRegion != null) { - this.metricsRegion.updateIncrement(); + allKVs.add(newValue); + + // If increment amount == 0, then don't write this Increment to the WAL. + if (writeBack) { + results.add(newValue); } } + return results; + } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); + /** + * @return Get the long out of the passed in Cell + * @throws DoNotRetryIOException + */ + private static long getLongValue(final Cell cell) throws DoNotRetryIOException { + int len = cell.getValueLength(); + if (len != Bytes.SIZEOF_LONG) { + // throw DoNotRetryIOException instead of IllegalArgumentException + throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide"); } + return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len); + } - return Result.create(allKVs); + /** + * Do a specific Get on passed columnFamily and column qualifiers + * from incrementCoordinates only. + * @param increment + * @param columnFamily + * @param incrementCoordinates + * @return Return the Cells to Increment + * @throws IOException + */ + private List get(final Increment increment, byte [] columnFamily, + final List increments, final IsolationLevel isolation) + throws IOException { + Get get = new Get(increment.getRow()); + if (isolation != null) get.setIsolationLevel(isolation); + for (Cell cell: increments) { + get.addColumn(columnFamily, CellUtil.cloneQualifier(cell)); + } + TimeRange tr = increment.getTimeRange(); + get.setTimeRange(tr.getMin(), tr.getMax()); + return get(get, false); + } + + /** + * @return Any Tags the Cell is carrying as a List or null if none in cell + */ + private static List carryForwardTags(final Cell cell) { + return carryForwardTags(null, cell); + } + + /** + * @return Carry forward Tags the Cell cell is carrying if any. + */ + private static List carryForwardTags(final List tagsOrNull, final Cell cell) { + if (cell.getTagsLength() <= 0) return null; + List tags = tagsOrNull; + Iterator itr = + CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); + while (itr.hasNext()) { + if (tags == null) tags = new ArrayList(); + tags.add(itr.next()); + } + return tags; + } + + private static List carryForwardTTLTag(final Mutation mutation) { + return carryForwardTTLTag(null, mutation); + } + + /** + * @return Carry forward the TTL tag if the increment is carrying one + */ + private static List carryForwardTTLTag(final List tagsOrNull, + final Mutation mutation) { + long ttl = mutation.getTTL(); + if (ttl == Long.MAX_VALUE) return tagsOrNull; + List tags = tagsOrNull; + // If we are making the array in here, given we are the last thing checked, we'll be only thing + // in the array so set its size to '1' (I saw this being done in earlier version of + // tag-handling). + if (tags == null) tags = new ArrayList(1); + tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); + return tags; } // diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index 96af2c3..fffd7c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.ClassSize; */ @InterfaceAudience.Private public class MultiVersionConsistencyControl { - private static final long NO_WRITE_NUMBER = 0; + static final long NO_WRITE_NUMBER = 0; private volatile long memstoreRead = 0; private final Object readWaiters = new Object(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 9376f99..d2cd2f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1156,7 +1156,9 @@ public class FSHLog implements WAL { // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells); + entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, + /* Passing memstoreCells seems redundant when they are in edits.getCells already */ + (memstoreCells == null)? edits.getCells(): memstoreCells); truck.loadPayload(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java new file mode 100644 index 0000000..bf3a44f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +// import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import com.yammer.metrics.core.TimerContext; +import com.yammer.metrics.stats.Snapshot; + +/** + * Simple Increments Performance Test. Run this from main. It is to go against a cluster. + * Presumption is the table exists already. Defaults are a zk ensemble of localhost:2181, + * a tableName of 'tableName', a column famly name of 'columnFamilyName', with 80 threads by + * default and 10000 increments per thread. To change any of these configs, pass -DNAME=VALUE as + * in -DtableName="newTableName". It prints out configuration it is running with at the start and + * on the end it prints out percentiles. + */ +public class IncrementPerformanceTest implements Tool { + private static final Log LOG = LogFactory.getLog(IncrementPerformanceTest.class); + private static final byte [] QUALIFIER = new byte [] {'q'}; + private Configuration conf; + private final MetricName metricName = new MetricName(this.getClass(), "increment"); + private static final String TABLENAME = "tableName"; + private static final String COLUMN_FAMILY = "columnFamilyName"; + private static final String THREAD_COUNT = "threadCount"; + private static final int DEFAULT_THREAD_COUNT = 80; + private static final String INCREMENT_COUNT = "incrementCount"; + private static final int DEFAULT_INCREMENT_COUNT = 10000; + + IncrementPerformanceTest() {} + + public int run(final String [] args) throws Exception { + Configuration conf = getConf(); + final TableName tableName = TableName.valueOf(conf.get(TABLENAME), TABLENAME); + final byte [] columnFamilyName = Bytes.toBytes(conf.get(COLUMN_FAMILY, COLUMN_FAMILY)); + int threadCount = conf.getInt(THREAD_COUNT, DEFAULT_THREAD_COUNT); + final int incrementCount = conf.getInt(INCREMENT_COUNT, DEFAULT_INCREMENT_COUNT); + LOG.info("Running test with " + HConstants.ZOOKEEPER_QUORUM + "=" + + getConf().get(HConstants.ZOOKEEPER_QUORUM) + ", tableName=" + tableName + + ", columnFamilyName=" + columnFamilyName + ", threadCount=" + threadCount + + ", incrementCount=" + incrementCount); + + ExecutorService service = Executors.newFixedThreadPool(threadCount); + Set> futures = new HashSet>(); + final AtomicInteger integer = new AtomicInteger(0); // needed a simple "final" counter + while (integer.incrementAndGet() <= threadCount) { + futures.add(service.submit(new Runnable() { + @Override + public void run() { + HTable table; + try { + // ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(TABLE_NAME)); + table = new HTable(getConf(), tableName.getName()); + } catch (Exception e) { + throw new RuntimeException(e); + } + Timer timer = Metrics.newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS); + for (int i = 0; i < incrementCount; i++) { + byte[] row = Bytes.toBytes(i); + TimerContext context = timer.time(); + try { + table.incrementColumnValue(row, columnFamilyName, QUALIFIER, 1l); + } catch (IOException e) { + // swallow..it's a test. + } finally { + context.stop(); + } + } + } + })); + } + + for(Future future : futures) future.get(); + service.shutdown(); + Snapshot s = Metrics.newTimer(this.metricName, + TimeUnit.MILLISECONDS, TimeUnit.SECONDS).getSnapshot(); + LOG.info(String.format("75th=%s, 95th=%s, 99th=%s", s.get75thPercentile(), + s.get95thPercentile(), s.get99thPercentile())); + return 0; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new IncrementPerformanceTest(), args)); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 891f92a..977213e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -4734,8 +4734,21 @@ public class TestFromClientSide { for (int i=0;i data() { + return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE}); + } + + private final boolean fast; + + public TestRegionIncrement(final boolean fast) { + this.fast = fast; + } + + @Before + public void setUp() throws Exception { + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + if (this.fast) { + TEST_UTIL.getConfiguration(). + setBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, this.fast); + } + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.cleanupTestDir(); + } + + private HRegion getRegion(final Configuration conf, final String tableName) throws IOException { + WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), + TEST_UTIL.getDataTestDir().toString(), conf); + return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf, + false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); + } + + private void closeRegion(final HRegion region) throws IOException { + region.close(); + region.getWAL().close(); + } + + /** + * Increments a single cell a bunch of times. + */ + private static class SingleCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment increment; + + SingleCellIncrementer(final int i, final int count, final HRegion region, + final Increment increment) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increment = increment; + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + this.region.increment(this.increment); + // LOG.info(getName() + " " + i); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Increments a random row's Cell count times. + */ + private static class CrossRowCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment [] increments; + + CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increments = new Increment[range]; + for (int ii = 0; ii < range; ii++) { + this.increments[ii] = new Increment(Bytes.toBytes(i)); + this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + } + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + int index = ThreadLocalRandom.current().nextInt(0, this.increments.length); + this.region.increment(this.increments[index]); + // LOG.info(getName() + " " + index); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Have all threads updating the same Cell. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testContendedSingleCellIncrement() throws IOException, InterruptedException { + Increment increment = new Increment(INCREMENT_BYTES); + increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + HRegion region = getRegion(TEST_UTIL.getConfiguration(), + filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + increment = new Increment(INCREMENT_BYTES); + Result r = region.get(new Get(increment.getRow())); + long total = Bytes.toLong(r.listCells().get(0).getValue()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testUnContendedSingleCellIncrement() + throws IOException, InterruptedException { + final HRegion region = getRegion(TEST_UTIL.getConfiguration(), + filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + byte [] rowBytes = Bytes.toBytes(i); + Increment increment = new Increment(rowBytes); + increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List cells = new ArrayList(THREAD_COUNT); + while(regionScanner.next(cells)) continue; + assertEquals(THREAD_COUNT, cells.size()); + long total = 0; + for (Cell cell: cells) total += Bytes.toLong(cell.getValue()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * This is + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testContendedAcrossCellsIncrement() + throws IOException, InterruptedException { + final HRegion region = getRegion(TEST_UTIL.getConfiguration(), + filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List cells = new ArrayList(100); + while(regionScanner.next(cells)) continue; + assertEquals(THREAD_COUNT, cells.size()); + long total = 0; + for (Cell cell: cells) total += Bytes.toLong(cell.getValue()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } + + private static String filterStringSoTableNameSafe(final String str) { + return str.replaceAll("\\[fast\\=(.*)\\]", ".FAST.is.$1"); + } + + public static void main(String[] args) { + + } +} \ No newline at end of file -- 2.6.1