From 63150b8b12e8ece4c7b13b543dd49dee32b08694 Mon Sep 17 00:00:00 2001 From: chenheng Date: Wed, 13 Jan 2016 23:05:10 +0800 Subject: [PATCH] HBASE-15091 Forward-port HBASE-15031 "Fix merge of MVCC and SequenceID performance regression in branch-1.0" --- .../java/org/apache/hadoop/hbase/client/Scan.java | 1 + .../apache/hadoop/hbase/regionserver/HRegion.java | 346 ++++++++++++----- .../hadoop/hbase/IncrementPerformanceTest.java | 129 +++++++ .../hadoop/hbase/client/TestFromClientSide.java | 4 +- ...TestIncrementFromClientSideWithCoprocessor.java | 49 +++ .../hbase/client/TestIncrementsFromClientSide.java | 418 +++++++++++++++++++++ .../hbase/regionserver/TestRegionIncrement.java | 254 +++++++++++++ 7 files changed, 1096 insertions(+), 105 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 4825cca..6139c40 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -262,6 +262,7 @@ public class Scan extends Query { this.familyMap = get.getFamilyMap(); this.getScan = true; this.consistency = get.getConsistency(); + this.setIsolationLevel(get.getIsolationLevel()); for (Map.Entry attr : get.getAttributesMap().entrySet()) { setAttribute(attr.getKey(), attr.getValue()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index cfd057a..7895d64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -214,6 +214,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms"; private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000; + + /** + * Set region to take the fast increment path. Constraint is that caller can only access the + * Cell via Increment; intermixing Increment with other Mutations will give indeterminate + * results. A Get with {@link IsolationLevel#READ_UNCOMMITTED} will get the latest increment + * or an Increment of zero will do the same. + */ + public static final String INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY = + "hbase.increment.fast.but.narrow.consistency"; + private final boolean incrementFastButNarrowConsistency; + /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. @@ -745,6 +756,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi false : conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + + + // See #INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY for what this flag is about. + this.incrementFastButNarrowConsistency = + this.conf.getBoolean(INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, false); } void setHTableSpecificConf() { @@ -7084,7 +7100,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException */ private List doGet(final Store store, final byte [] row, - final Map.Entry> family, final TimeRange tr) + final Map.Entry> family, final TimeRange tr, IsolationLevel level) throws IOException { // Sort the cells so that they match the order that they // appear in the Get results. Otherwise, we won't be able to @@ -7093,6 +7109,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Collections.sort(family.getValue(), store.getComparator()); // Get previous values for all columns in this family Get get = new Get(row); + if (level != null) { + get.setIsolationLevel(level); + } for (Cell cell : family.getValue()) { get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell)); } @@ -7151,7 +7170,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Store store = stores.get(family.getKey()); List kvs = new ArrayList(family.getValue().size()); - List results = doGet(store, row, family, null); + List results = doGet(store, row, family, null, null); // Iterate the input columns and update existing values if they were // found, otherwise add new column initialized to the append value @@ -7350,6 +7369,41 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); } + + @Override + public Result increment(Increment increment, long nonceGroup, long nonce) throws IOException { + checkReadOnly(); + checkResources(); + checkRow(increment.getRow(), "increment"); + startRegionOperation(Operation.INCREMENT); + this.writeRequestsCount.increment(); + try { + // Which Increment is it? Narrow increment-only consistency or slow (default) and general + // row-wide consistency. + + // So, difference between fastAndNarrowConsistencyIncrement and slowButConsistentIncrement is + // that the former holds the row lock until the sync completes; this allows us to reason that + // there are no other writers afoot when we read the current increment value. The row lock + // means that we do not need to wait on mvcc reads to catch up to writes before we proceed + // with the read, the root of the slowdown seen in HBASE-14460. The fast-path also does not + // wait on mvcc to complete before returning to the client. We also reorder the write so that + // the update of memstore happens AFTER sync returns; i.e. the write pipeline does less + // zigzagging now. + // + // See the comment on INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY + // for the constraints that apply when you take this code path; it is correct but only if + // Increments are used mutating an Increment Cell; mixing concurrent PutDelete and Increment + // will yield indeterminate results. + return this.incrementFastButNarrowConsistency? + fastAndNarrowConsistencyIncrement(increment, nonceGroup, nonce): + slowButConsistentIncrement(increment, nonceGroup, nonce); + } finally { + if (this.metricsRegion != null) this.metricsRegion.updateIncrement(); + closeRegionOperation(Operation.INCREMENT); + } + + } + // TODO: There's a lot of boiler plate code identical to append. // We should refactor append and increment as local get-mutate-put // transactions, so all stores only go through one code path for puts. @@ -7357,13 +7411,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // They are subtley different in quiet a few ways. This came out only // after study. I am not sure that many of the differences are intentional. // TODO: St.Ack 20150907 - - @Override - public Result increment(Increment mutation, long nonceGroup, long nonce) + public Result slowButConsistentIncrement(Increment mutation, long nonceGroup, long nonce) throws IOException { - Operation op = Operation.INCREMENT; - byte [] row = mutation.getRow(); - checkRow(row, op.toString()); boolean flush = false; Durability durability = getEffectiveDurability(mutation.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; @@ -7373,16 +7422,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Map> tempMemstore = new HashMap>(); long size = 0; long txid = 0; - checkReadOnly(); - checkResources(); - // Lock row - startRegionOperation(op); - this.writeRequestsCount.increment(); RowLock rowLock = null; WALKey walKey = null; MultiVersionConcurrencyControl.WriteEntry writeEntry = null; boolean doRollBackMemstore = false; TimeRange tr = mutation.getTimeRange(); + byte[] row = mutation.getRow(); try { rowLock = getRowLock(row); assert rowLock != null; @@ -7398,92 +7443,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return r; } } - long now = EnvironmentEdgeManager.currentTime(); - // Process each family - for (Map.Entry> family: mutation.getFamilyCellMap().entrySet()) { - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); - - List results = doGet(store, row, family, tr); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the increment amount - - // Avoid as much copying as possible. We may need to rewrite and - // consolidate tags. Bytes are only copied once. - // Would be nice if KeyValue had scatter/gather logic - int idx = 0; - // HERE WE DIVERGE FROM APPEND - List edits = family.getValue(); - for (int i = 0; i < edits.size(); i++) { - Cell cell = edits.get(i); - long amount = Bytes.toLong(CellUtil.cloneValue(cell)); - boolean noWriteBack = (amount == 0); - - List newTags = carryForwardTags(cell, new ArrayList()); - - Cell c = null; - long ts = now; - if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { - c = results.get(idx); - ts = Math.max(now, c.getTimestamp()); - if(c.getValueLength() == Bytes.SIZEOF_LONG) { - amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG); - } else { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new org.apache.hadoop.hbase.DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); - } - // Carry tags forward from previous version - newTags = carryForwardTags(c, newTags); - if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) { - idx++; - } - } - - // Append new incremented KeyValue to list - byte[] q = CellUtil.cloneQualifier(cell); - byte[] val = Bytes.toBytes(amount); - - // Add the TTL tag if the mutation carried one - if (mutation.getTTL() != Long.MAX_VALUE) { - newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL()))); - } - - Cell newKV = new KeyValue(row, 0, row.length, - family.getKey(), 0, family.getKey().length, - q, 0, q.length, - ts, - KeyValue.Type.Put, - val, 0, val.length, - newTags); - - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newKV = coprocessorHost.postMutationBeforeWAL( - RegionObserver.MutationType.INCREMENT, mutation, c, newKV); - } - allKVs.add(newKV); - - if (!noWriteBack) { - kvs.add(newKV); - - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } - } - } - - //store the kvs to the temporary memstore before writing WAL - if (!kvs.isEmpty()) { - tempMemstore.put(store, kvs); - } - } - + walEdits = processFamilyMemstore(mutation, allKVs, tempMemstore, tr, writeToWAL, null); // Actually write to WAL now if (walEdits != null && !walEdits.isEmpty()) { if (writeToWAL) { @@ -7556,10 +7516,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } else if (writeEntry != null) { mvcc.completeAndWait(writeEntry); } - closeRegionOperation(Operation.INCREMENT); - if (this.metricsRegion != null) { - this.metricsRegion.updateIncrement(); - } } if (flush) { @@ -7569,6 +7525,190 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return mutation.isReturnResults() ? Result.create(allKVs) : null; } + + public Result fastAndNarrowConsistencyIncrement(Increment increment, long nonceGroup, long nonce) + throws IOException { + Durability durability = getEffectiveDurability(increment.getDurability()); + boolean writeToWAL = durability != Durability.SKIP_WAL; + WALEdit walEdits = null; + List allKVs = new ArrayList(increment.size()); + Map> tempMemstore = new HashMap>(); + RowLock rowLock = null; + WALKey walKey = null; + long size = 0; + long txid = 0; + MultiVersionConcurrencyControl.WriteEntry writeEntry = null; + boolean flush = false; + TimeRange tr = increment.getTimeRange(); + try { + rowLock = getRowLock(increment.getRow()); + assert rowLock != null; + lock(this.updatesLock.readLock()); + if (this.coprocessorHost != null) { + Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); + if (r != null) { + return r; + } + } + walEdits = processFamilyMemstore(increment, allKVs, + tempMemstore, tr, writeToWAL, IsolationLevel.READ_UNCOMMITTED); + // Actually write to WAL now + if (walEdits != null && !walEdits.isEmpty()) { + if (writeToWAL) { + // Using default cluster id, as this can only happen in the originating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), + WALKey.NO_SEQUENCE_ID, + nonceGroup, + nonce, + mvcc); + txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdits, true); + } else { + recordMutationWithoutWal(increment.getFamilyCellMap()); + } + } + if (walKey == null) { + // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned + walKey = this.appendEmptyEdit(this.wal); + } + + if(txid != 0){ + syncOrDefer(txid, durability); + } + // now start my own transaction + + writeEntry = walKey.getWriteEntry(); + this.mvcc.complete(writeEntry); + + // Actually write to Memstore now + if (!tempMemstore.isEmpty()) { + for (Map.Entry> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + if (store.getFamily().getMaxVersions() == 1) { + // upsert if VERSIONS for this CF == 1 + // Is this right? It immediately becomes visible? St.Ack 20150907 + size += store.upsert(entry.getValue(), getSmallestReadPoint()); + } else { + // otherwise keep older versions around + for (Cell cell : entry.getValue()) { + CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); + size += store.add(cell); + } + } + } + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); + } + } finally { + this.updatesLock.readLock().unlock(); + if (rowLock != null) { + rowLock.release(); + } + } + if (flush) { + // Request a cache flush. Do it outside update lock. + requestFlush(); + } + return increment.isReturnResults() ? Result.create(allKVs) : null; + } + + private WALEdit processFamilyMemstore(Increment increment, List allKVs, + Map> tempMemstore, TimeRange tr, + boolean writeToWAL, IsolationLevel level) + throws IOException { + WALEdit walEdits = null; + long now = EnvironmentEdgeManager.currentTime(); + // Process each family + for (Map.Entry> family: increment.getFamilyCellMap().entrySet()) { + Store store = stores.get(family.getKey()); + List kvs = new ArrayList(family.getValue().size()); + + List results = doGet(store, increment.getRow(), family, tr, level); + + // Iterate the input columns and update existing values if they were + // found, otherwise add new column initialized to the increment amount + + // Avoid as much copying as possible. We may need to rewrite and + // consolidate tags. Bytes are only copied once. + // Would be nice if KeyValue had scatter/gather logic + int idx = 0; + // HERE WE DIVERGE FROM APPEND + List edits = family.getValue(); + for (int i = 0; i < edits.size(); i++) { + Cell cell = edits.get(i); + long amount = Bytes.toLong(CellUtil.cloneValue(cell)); + boolean noWriteBack = (amount == 0); + + List newTags = carryForwardTags(cell, new ArrayList()); + + Cell c = null; + long ts = now; + if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { + c = results.get(idx); + ts = Math.max(now, c.getTimestamp()); + if(c.getValueLength() == Bytes.SIZEOF_LONG) { + amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG); + } else { + // throw DoNotRetryIOException instead of IllegalArgumentException + throw new org.apache.hadoop.hbase.DoNotRetryIOException( + "Attempted to increment field that isn't 64 bits wide"); + } + // Carry tags forward from previous version + newTags = carryForwardTags(c, newTags); + if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) { + idx++; + } + } + + // Append new incremented KeyValue to list + byte[] q = CellUtil.cloneQualifier(cell); + byte[] val = Bytes.toBytes(amount); + + // Add the TTL tag if the mutation carried one + if (increment.getTTL() != Long.MAX_VALUE) { + newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL()))); + } + + Cell newKV = new KeyValue(increment.getRow(), 0, increment.getRow().length, + family.getKey(), 0, family.getKey().length, + q, 0, q.length, + ts, + KeyValue.Type.Put, + val, 0, val.length, + newTags); + + // Give coprocessors a chance to update the new cell + if (coprocessorHost != null) { + newKV = coprocessorHost.postMutationBeforeWAL( + RegionObserver.MutationType.INCREMENT, increment, c, newKV); + } + allKVs.add(newKV); + + if (!noWriteBack) { + kvs.add(newKV); + + // Prepare WAL updates + if (writeToWAL) { + if (walEdits == null) { + walEdits = new WALEdit(); + } + walEdits.add(newKV); + } + } + } + + //store the kvs to the temporary memstore before writing WAL + if (!kvs.isEmpty()) { + tempMemstore.put(store, kvs); + } + } + return walEdits; + } + // // New HBASE-880 Helpers // diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java new file mode 100644 index 0000000..bf3a44f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +// import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.Timer; +import com.yammer.metrics.core.TimerContext; +import com.yammer.metrics.stats.Snapshot; + +/** + * Simple Increments Performance Test. Run this from main. It is to go against a cluster. + * Presumption is the table exists already. Defaults are a zk ensemble of localhost:2181, + * a tableName of 'tableName', a column famly name of 'columnFamilyName', with 80 threads by + * default and 10000 increments per thread. To change any of these configs, pass -DNAME=VALUE as + * in -DtableName="newTableName". It prints out configuration it is running with at the start and + * on the end it prints out percentiles. + */ +public class IncrementPerformanceTest implements Tool { + private static final Log LOG = LogFactory.getLog(IncrementPerformanceTest.class); + private static final byte [] QUALIFIER = new byte [] {'q'}; + private Configuration conf; + private final MetricName metricName = new MetricName(this.getClass(), "increment"); + private static final String TABLENAME = "tableName"; + private static final String COLUMN_FAMILY = "columnFamilyName"; + private static final String THREAD_COUNT = "threadCount"; + private static final int DEFAULT_THREAD_COUNT = 80; + private static final String INCREMENT_COUNT = "incrementCount"; + private static final int DEFAULT_INCREMENT_COUNT = 10000; + + IncrementPerformanceTest() {} + + public int run(final String [] args) throws Exception { + Configuration conf = getConf(); + final TableName tableName = TableName.valueOf(conf.get(TABLENAME), TABLENAME); + final byte [] columnFamilyName = Bytes.toBytes(conf.get(COLUMN_FAMILY, COLUMN_FAMILY)); + int threadCount = conf.getInt(THREAD_COUNT, DEFAULT_THREAD_COUNT); + final int incrementCount = conf.getInt(INCREMENT_COUNT, DEFAULT_INCREMENT_COUNT); + LOG.info("Running test with " + HConstants.ZOOKEEPER_QUORUM + "=" + + getConf().get(HConstants.ZOOKEEPER_QUORUM) + ", tableName=" + tableName + + ", columnFamilyName=" + columnFamilyName + ", threadCount=" + threadCount + + ", incrementCount=" + incrementCount); + + ExecutorService service = Executors.newFixedThreadPool(threadCount); + Set> futures = new HashSet>(); + final AtomicInteger integer = new AtomicInteger(0); // needed a simple "final" counter + while (integer.incrementAndGet() <= threadCount) { + futures.add(service.submit(new Runnable() { + @Override + public void run() { + HTable table; + try { + // ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(TABLE_NAME)); + table = new HTable(getConf(), tableName.getName()); + } catch (Exception e) { + throw new RuntimeException(e); + } + Timer timer = Metrics.newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS); + for (int i = 0; i < incrementCount; i++) { + byte[] row = Bytes.toBytes(i); + TimerContext context = timer.time(); + try { + table.incrementColumnValue(row, columnFamilyName, QUALIFIER, 1l); + } catch (IOException e) { + // swallow..it's a test. + } finally { + context.stop(); + } + } + } + })); + } + + for(Future future : futures) future.get(); + service.shutdown(); + Snapshot s = Metrics.newTimer(this.metricName, + TimeUnit.MILLISECONDS, TimeUnit.SECONDS).getSnapshot(); + LOG.info(String.format("75th=%s, 95th=%s, 99th=%s", s.get75thPercentile(), + s.get95thPercentile(), s.get99thPercentile())); + return 0; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new IncrementPerformanceTest(), args)); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 81253a5..ca6cf7b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -3139,7 +3139,7 @@ public class TestFromClientSide { equals(value, CellUtil.cloneValue(key))); } - private void assertIncrementKey(Cell key, byte [] row, byte [] family, + public static void assertIncrementKey(Cell key, byte [] row, byte [] family, byte [] qualifier, long value) throws Exception { assertTrue("Expected row [" + Bytes.toString(row) + "] " + @@ -3363,7 +3363,7 @@ public class TestFromClientSide { return stamps; } - private boolean equals(byte [] left, byte [] right) { + private static boolean equals(byte [] left, byte [] right) { if (left == null && right == null) return true; if (left == null && right.length == 0) return true; if (right == null && left.length == 0) return true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java new file mode 100644 index 0000000..a67cc45 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.Before; +import org.junit.experimental.categories.Category; + +/** + * Test all {@link Increment} client operations with a coprocessor that + * just implements the default flush/compact/scan policy. + * + * This test takes a long time. The test it derives from is parameterized so we run through both + * options of the test. + */ +@Category(LargeTests.class) +public class TestIncrementFromClientSideWithCoprocessor extends TestIncrementsFromClientSide { + public TestIncrementFromClientSideWithCoprocessor(final boolean fast) { + super(fast); + } + + @Before + public void before() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests + super.before(); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java new file mode 100644 index 0000000..0d8c5f1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java @@ -0,0 +1,418 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Run Increment tests that use the HBase clients; {@link HTable}. + * + * Test is parameterized to run the slow and fast increment code paths. If fast, in the @before, we + * do a rolling restart of the single regionserver so that it can pick up the go fast configuration. + * Doing it this way should be faster than starting/stopping a cluster per test. + * + * Test takes a long time because spin up a cluster between each run -- ugh. + */ +@RunWith(Parameterized.class) +@Category(LargeTests.class) +@SuppressWarnings ("deprecation") +public class TestIncrementsFromClientSide { + final Log LOG = LogFactory.getLog(getClass()); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte [] ROW = Bytes.toBytes("testRow"); + private static byte [] FAMILY = Bytes.toBytes("testFamily"); + // This test depends on there being only one slave running at at a time. See the @Before + // method where we do rolling restart. + protected static int SLAVES = 1; + private String oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY; + @Rule public TestName name = new TestName(); + @Parameters(name = "fast={0}") + public static Collection data() { + return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE}); + } + private final boolean fast; + + public TestIncrementsFromClientSide(final boolean fast) { + this.fast = fast; + } + + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests + + } + + @Before + public void before() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + if (this.fast) { + // If fast is set, set our configuration and then do a rolling restart of the one + // regionserver so it picks up the new config. Doing this should be faster than starting + // and stopping a cluster for each test. + this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY = + conf.get(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY); + conf.setBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, this.fast); + } + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } + + @After + public void after() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + if (this.fast) { + if (this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY != null) { + conf.set(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, + this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY); + } + } + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testIncrementWithDeletes() throws Exception { + LOG.info("Starting " + this.name.getMethodName()); + final TableName TABLENAME = + TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] COLUMN = Bytes.toBytes("column"); + + ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); + TEST_UTIL.flush(TABLENAME); + + Delete del = new Delete(ROW); + ht.delete(del); + + ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); + + Get get = new Get(ROW); + if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + Result r = ht.get(get); + assertEquals(1, r.size()); + assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN))); + } + + @Test + public void testIncrementingInvalidValue() throws Exception { + LOG.info("Starting " + this.name.getMethodName()); + final TableName TABLENAME = + TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] COLUMN = Bytes.toBytes("column"); + Put p = new Put(ROW); + // write an integer here (not a Long) + p.add(FAMILY, COLUMN, Bytes.toBytes(5)); + ht.put(p); + try { + ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5); + fail("Should have thrown DoNotRetryIOException"); + } catch (DoNotRetryIOException iox) { + // success + } + Increment inc = new Increment(ROW); + inc.addColumn(FAMILY, COLUMN, 5); + try { + ht.increment(inc); + fail("Should have thrown DoNotRetryIOException"); + } catch (DoNotRetryIOException iox) { + // success + } + } + + @Test + public void testIncrementInvalidArguments() throws Exception { + LOG.info("Starting " + this.name.getMethodName()); + final TableName TABLENAME = + TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] COLUMN = Bytes.toBytes("column"); + try { + // try null row + ht.incrementColumnValue(null, FAMILY, COLUMN, 5); + fail("Should have thrown IOException"); + } catch (IOException iox) { + // success + } + try { + // try null family + ht.incrementColumnValue(ROW, null, COLUMN, 5); + fail("Should have thrown IOException"); + } catch (IOException iox) { + // success + } + try { + // try null qualifier + ht.incrementColumnValue(ROW, FAMILY, null, 5); + fail("Should have thrown IOException"); + } catch (IOException iox) { + // success + } + // try null row + try { + Increment incNoRow = new Increment((byte [])null); + incNoRow.addColumn(FAMILY, COLUMN, 5); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException iax) { + // success + } catch (NullPointerException npe) { + // success + } + // try null family + try { + Increment incNoFamily = new Increment(ROW); + incNoFamily.addColumn(null, COLUMN, 5); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException iax) { + // success + } + // try null qualifier + try { + Increment incNoQualifier = new Increment(ROW); + incNoQualifier.addColumn(FAMILY, null, 5); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException iax) { + // success + } + } + + @Test + public void testIncrementOutOfOrder() throws Exception { + LOG.info("Starting " + this.name.getMethodName()); + final TableName TABLENAME = + TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName())); + Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + + byte [][] QUALIFIERS = new byte [][] { + Bytes.toBytes("B"), Bytes.toBytes("A"), Bytes.toBytes("C") + }; + + Increment inc = new Increment(ROW); + for (int i=0; iThere is similar test up in TestAtomicOperation. It does a test where it has 100 threads + * doing increments across two column families all on one row and the increments are connected to + * prove atomicity on row. + */ +@Category(MediumTests.class) +@RunWith(Parameterized.class) +public class TestRegionIncrement { + private static final Log LOG = LogFactory.getLog(TestRegionIncrement.class); + @Rule public TestName name = new TestName(); + @Rule public final TestRule timeout = + CategoryBasedTimeout.builder().withTimeout(this.getClass()). + withLookingForStuckThread(true).build(); + private static HBaseTestingUtility TEST_UTIL; + private final static byte [] INCREMENT_BYTES = Bytes.toBytes("increment"); + private static final int THREAD_COUNT = 10; + private static final int INCREMENT_COUNT = 10000; + + @Parameters(name = "fast={0}") + public static Collection data() { + return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE}); + } + + private final boolean fast; + + public TestRegionIncrement(final boolean fast) { + this.fast = fast; + } + + @Before + public void setUp() throws Exception { + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + if (this.fast) { + TEST_UTIL.getConfiguration(). + setBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, this.fast); + } + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.cleanupTestDir(); + } + + private HRegion getRegion(final Configuration conf, final String tableName) throws IOException { + WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), + TEST_UTIL.getDataTestDir().toString(), conf); + return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, conf, + false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); + } + + private void closeRegion(final HRegion region) throws IOException { + region.close(); + region.getWAL().close(); + } + + /** + * Increments a single cell a bunch of times. + */ + private static class SingleCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment increment; + + SingleCellIncrementer(final int i, final int count, final HRegion region, + final Increment increment) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increment = increment; + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + this.region.increment(this.increment); + // LOG.info(getName() + " " + i); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Increments a random row's Cell count times. + */ + private static class CrossRowCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment [] increments; + + CrossRowCellIncrementer(final int i, final int count, final HRegion region, final int range) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increments = new Increment[range]; + for (int ii = 0; ii < range; ii++) { + this.increments[ii] = new Increment(Bytes.toBytes(i)); + this.increments[ii].addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + } + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + int index = ThreadLocalRandom.current().nextInt(0, this.increments.length); + this.region.increment(this.increments[index]); + // LOG.info(getName() + " " + index); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testUnContendedSingleCellIncrement() + throws IOException, InterruptedException { + final HRegion region = getRegion(TEST_UTIL.getConfiguration(), + TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + SingleCellIncrementer [] threads = new SingleCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + byte [] rowBytes = Bytes.toBytes(i); + Increment increment = new Increment(rowBytes); + increment.addColumn(INCREMENT_BYTES, INCREMENT_BYTES, 1); + threads[i] = new SingleCellIncrementer(i, INCREMENT_COUNT, region, increment); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List cells = new ArrayList(THREAD_COUNT); + while(regionScanner.next(cells)) continue; + assertEquals(THREAD_COUNT, cells.size()); + long total = 0; + for (Cell cell: cells) total += Bytes.toLong(cell.getValue()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * This is + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testContendedAcrossCellsIncrement() + throws IOException, InterruptedException { + final HRegion region = getRegion(TEST_UTIL.getConfiguration(), + TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName())); + long startTime = System.currentTimeMillis(); + try { + CrossRowCellIncrementer [] threads = new CrossRowCellIncrementer[THREAD_COUNT]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new CrossRowCellIncrementer(i, INCREMENT_COUNT, region, THREAD_COUNT); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List cells = new ArrayList(100); + while(regionScanner.next(cells)) continue; + assertEquals(THREAD_COUNT, cells.size()); + long total = 0; + for (Cell cell: cells) total += Bytes.toLong(cell.getValue()); + assertEquals(INCREMENT_COUNT * THREAD_COUNT, total); + } finally { + closeRegion(region); + LOG.info(this.name.getMethodName() + " " + (System.currentTimeMillis() - startTime) + "ms"); + } + } +} \ No newline at end of file -- 2.5.4 (Apple Git-61)