.../apache/hadoop/hbase/regionserver/HRegion.java | 194 ++++++++++++++++++--- .../MultiVersionConcurrencyControl.java | 48 ++++- .../hadoop/hbase/regionserver/wal/FSWALEntry.java | 2 +- .../java/org/apache/hadoop/hbase/wal/WALKey.java | 13 ++ .../hadoop/hbase/regionserver/TestIncrement.java | 186 ++++++++++++++++++++ 5 files changed, 415 insertions(+), 28 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 484d5ee..e84da0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -263,8 +263,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // - the thread that owns the lock (allow reentrancy) // - reference count of (reentrant) locks held by the thread // - the row itself - private final ConcurrentHashMap lockedRows = - new ConcurrentHashMap(); + private final ConcurrentHashMap lockedRows = + new ConcurrentHashMap(); protected final Map stores = new ConcurrentSkipListMap( Bytes.BYTES_RAWCOMPARATOR); @@ -2987,6 +2987,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); + RowOperationContext[] rowOpContexts = new RowOperationContext[batchOp.operations.length]; // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) @@ -3053,18 +3054,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If we haven't got any rows in our batch, we should block to // get the next one. - RowLock rowLock = null; + Pair pair = null; try { - rowLock = getRowLock(mutation.getRow(), true); + pair = getRowLockPair(mutation.getRow(), true); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); } - if (rowLock == null) { + if (pair == null) { // We failed to grab another lock break; // stop acquiring more rows for this batch } else { - acquiredRowLocks.add(rowLock); + acquiredRowLocks.add(pair.getSecond()); + rowOpContexts[lastIndexExclusive] = pair.getFirst().getRowOperationContext(); } lastIndexExclusive++; @@ -3202,6 +3204,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); + walKey.setRowOperationContexts(rowOpContexts); } txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); } @@ -3210,7 +3213,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // ---------------------------------- if (walKey == null) { // If this is a skip wal operation just get the read point from mvcc - walKey = this.appendEmptyEdit(this.wal); + walKey = this.appendEmptyEdit(this.wal, rowOpContexts); } if (!isInReplay) { writeEntry = walKey.getWriteEntry(); @@ -3397,9 +3400,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi get.addColumn(family, qualifier); // Lock row - note that doBatchMutate will relock this row if called - RowLock rowLock = getRowLock(get.getRow()); + Pair pair = getRowLockPair(get.getRow(), false); + RowLock rowLock = pair.getSecond(); // wait for all previous transactions to complete (with lock held) - mvcc.await(); + mvcc.await(pair.getFirst().getRowOperationContext()); try { if (this.getCoprocessorHost() != null) { Boolean processed = null; @@ -3506,9 +3510,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi get.addColumn(family, qualifier); // Lock row - note that doBatchMutate will relock this row if called - RowLock rowLock = getRowLock(get.getRow()); + Pair pair = getRowLockPair(get.getRow(), false); + RowLock rowLock = pair.getSecond(); // wait for all previous transactions to complete (with lock held) - mvcc.await(); + mvcc.await(pair.getFirst().getRowOperationContext()); try { List result = get(get, false); @@ -5141,6 +5146,113 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return getRowLock(row, false); } + public static class RowContext { + private RowOperationContext rowOperationContext; + private RowLockContext rowLockContext; + + public RowContext(RowLockContext rowLockContext) { + this.rowLockContext = rowLockContext; + this.rowOperationContext = new RowOperationContext(); + } + + public RowOperationContext getRowOperationContext() { + return rowOperationContext; + } + + public void setRowOperationContext(RowOperationContext rowOperationContext) { + this.rowOperationContext = rowOperationContext; + } + + public RowLockContext getRowLockContext() { + return rowLockContext; + } + + public void setRowLockContext(RowLockContext rowLockContext) { + this.rowLockContext = rowLockContext; + } + } + + public static class RowOperationContext { + private long nextWriteNumber = -1; + + public long getNextWriteNumber() { + return nextWriteNumber; + } + + public void setNextWriteNumber(long nextWriteNumber) { + this.nextWriteNumber = nextWriteNumber; + } + } + + public Pair getRowLockPair(byte[] row, boolean readLock) + throws IOException { + // Make sure the row is inside of this region before getting the lock for it. + checkRow(row, "row lock"); + // create an object to use a a key in the row lock map + HashedBytes rowKey = new HashedBytes(row); + + RowLockContext rowLockContext = null; + RowLockImpl result = null; + RowContext rowContext = null; + TraceScope traceScope = null; + + // If we're tracing start a span to show how long this took. + if (Trace.isTracing()) { + traceScope = Trace.startSpan("HRegion.getRowLock"); + traceScope.getSpan().addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock")); + } + + try { + // Keep trying until we have a lock or error out. + // TODO: do we need to add a time component here? + while (result == null) { + + // Try adding a RowLockContext to the lockedRows. + // If we can add it then there's no other transactions currently running. + rowLockContext = new RowLockContext(rowKey); + rowContext = new RowContext(rowLockContext); + RowContext existingContext = lockedRows.putIfAbsent(rowKey, rowContext); + + // if there was a running transaction then there's already a context. + if (existingContext != null) { + rowContext = existingContext; + } + + // Now try an get the lock. + // + // This can fail as + if (readLock) { + result = rowContext.getRowLockContext().newReadLock(); + } else { + result = rowContext.getRowLockContext().newWriteLock(); + } + } + if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { + if (traceScope != null) { + traceScope.getSpan().addTimelineAnnotation("Failed to get row lock"); + } + result = null; + // Clean up the counts just in case this was the thing keeping the context alive. + rowLockContext.cleanUp(); + throw new IOException("Timed out waiting for lock for row: " + rowKey); + } + return new Pair(rowContext, result); + } catch (InterruptedException ie) { + LOG.warn("Thread interrupted waiting for lock on row: " + rowKey); + InterruptedIOException iie = new InterruptedIOException(); + iie.initCause(ie); + if (traceScope != null) { + traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock"); + } + Thread.currentThread().interrupt(); + throw iie; + } finally { + if (traceScope != null) { + traceScope.close(); + } + } + } + /** * * Get a row lock for the specified row. All locks are reentrant. @@ -5175,20 +5287,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Try adding a RowLockContext to the lockedRows. // If we can add it then there's no other transactions currently running. rowLockContext = new RowLockContext(rowKey); - RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext); + RowContext rowContext = new RowContext(rowLockContext); + RowContext existingContext = lockedRows.putIfAbsent(rowKey, rowContext); // if there was a running transaction then there's already a context. if (existingContext != null) { - rowLockContext = existingContext; + rowContext = existingContext; } // Now try an get the lock. // // This can fail as if (readLock) { - result = rowLockContext.newReadLock(); + result = rowContext.getRowLockContext().newReadLock(); } else { - result = rowLockContext.newWriteLock(); + result = rowContext.getRowLockContext().newWriteLock(); } } if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { @@ -5265,8 +5378,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi synchronized (lock) { if (count.get() <= 0 ){ usable.set(false); - RowLockContext removed = lockedRows.remove(row); - assert removed == this: "we should never remove a different context"; + RowContext removed = lockedRows.remove(row); + assert removed.getRowLockContext() == + this: "we should never remove a different context"; } } } @@ -6872,10 +6986,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { // 2. Acquire the row lock(s) acquiredRowLocks = new ArrayList(rowsToLock.size()); + RowOperationContext[] rowOpContexts = new RowOperationContext[rowsToLock.size()]; + int index = 0; for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out // use a writer lock for mixed reads and writes - acquiredRowLocks.add(getRowLock(row)); + Pair pair = getRowLockPair(row, false); + acquiredRowLocks.add(pair.getSecond()); + rowOpContexts[index++] = pair.getFirst().getRowOperationContext(); } // 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); @@ -6900,13 +7018,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, processor.getClusterIds(), nonceGroup, nonce, mvcc); + walKey.setRowOperationContexts(rowOpContexts); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, false); } if(walKey == null){ // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId - walKey = this.appendEmptyEdit(this.wal); + walKey = this.appendEmptyEdit(this.wal, rowOpContexts); } // 7. Start mvcc transaction @@ -7106,14 +7225,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MultiVersionConcurrencyControl.WriteEntry writeEntry = null; boolean doRollBackMemstore = false; try { - rowLock = getRowLock(row); +// rowLock = getRowLock(row); + Pair pair = getRowLockPair(row, false); + rowLock = pair.getSecond(); assert rowLock != null; try { lock(this.updatesLock.readLock()); try { // Wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state when we do our Get) - mvcc.await(); + mvcc.await(pair.getFirst().getRowOperationContext()); if (this.coprocessorHost != null) { Result r = this.coprocessorHost.preAppendAfterRowLock(mutate); if (r!= null) { @@ -7232,6 +7353,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi nonceGroup, nonce, mvcc); + walKey.setRowOperationContexts(pair.getFirst().getRowOperationContext()); txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true); } else { @@ -7240,7 +7362,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (walKey == null) { // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned - walKey = this.appendEmptyEdit(this.wal); + walKey = this.appendEmptyEdit(this.wal, pair.getFirst().getRowOperationContext()); } // now start my own transaction @@ -7350,14 +7472,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean doRollBackMemstore = false; TimeRange tr = mutation.getTimeRange(); try { - rowLock = getRowLock(row); + Pair rowLockPair = getRowLockPair(row, false); + rowLock = rowLockPair.getSecond(); assert rowLock != null; try { lock(this.updatesLock.readLock()); try { // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.await(); + mvcc.await(rowLockPair.getFirst().getRowOperationContext()); if (this.coprocessorHost != null) { Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation); if (r != null) { @@ -7463,6 +7586,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi nonceGroup, nonce, mvcc); + walKey.setRowOperationContexts(rowLockPair.getFirst().getRowOperationContext()); txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true); } else { @@ -7471,7 +7595,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (walKey == null) { // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned - walKey = this.appendEmptyEdit(this.wal); + walKey = this + .appendEmptyEdit(this.wal, rowLockPair.getFirst().getRowOperationContext()); } // now start my own transaction @@ -8149,6 +8274,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return key; } + private WALKey appendEmptyEdit(final WAL wal, RowOperationContext... rowOperationContexts) + throws IOException { + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + @SuppressWarnings("deprecation") + WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), + getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, + HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC()); + key.setRowOperationContexts(rowOperationContexts); + // Call append but with an empty WALEdit. The returned sequence id will not be associated + // with any edit and we can be sure it went in after all outstanding appends. + try { + wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false); + } catch (Throwable t) { + // If exception, our mvcc won't get cleaned up by client, so do it here. + getMVCC().complete(key.getWriteEntry()); + } + return key; + } + /** * {@inheritDoc} */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index eba99e0..d653beb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -24,9 +24,11 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HRegion.RowOperationContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -128,6 +130,27 @@ public class MultiVersionConcurrencyControl { } } + public WriteEntry begin(RowOperationContext... rowOperationContexts) { + synchronized (writeQueue) { + long nextWriteNumber = writePoint.incrementAndGet(); + WriteEntry e = new WriteEntry(nextWriteNumber); + if (rowOperationContexts != null && rowOperationContexts.length > 0) { + long lastWriterNumber = -1; + for (RowOperationContext context : rowOperationContexts) { + if (context != null) { + if (lastWriterNumber < context.getNextWriteNumber()) { + lastWriterNumber = context.getNextWriteNumber(); + } + context.setNextWriteNumber(nextWriteNumber); + } + } + e.setLastWriteNumber(lastWriterNumber); + } + writeQueue.add(e); + return e; + } + } + /** * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs * to complete. @@ -137,6 +160,13 @@ public class MultiVersionConcurrencyControl { completeAndWait(begin()); } + public void await(RowOperationContext rowOperationContext) { + // Add a write and then wait on reads to catch up to it. + WriteEntry writeEntry = begin(rowOperationContext); + complete(writeEntry); + waitForRead(writeEntry, true); + } + /** * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the * read point catches up to our write. @@ -202,14 +232,19 @@ public class MultiVersionConcurrencyControl { } } + void waitForRead(WriteEntry e) { + waitForRead(e, false); + } + /** * Wait for the global readPoint to advance up to the passed in write entry number. */ - void waitForRead(WriteEntry e) { + void waitForRead(WriteEntry e, boolean useLastWriteNumber) { boolean interrupted = false; int count = 0; synchronized (readWaiters) { - while (readPoint.get() < e.getWriteNumber()) { + long writeNumber = useLastWriteNumber ? e.getLastWriteNumber() : e.getWriteNumber(); + while (readPoint.get() < writeNumber) { if (count % 100 == 0 && count > 0) { LOG.warn("STUCK: " + this); } @@ -252,6 +287,7 @@ public class MultiVersionConcurrencyControl { public static class WriteEntry { private final long writeNumber; private boolean completed = false; + private long lastWriteNumber = -1; WriteEntry(long writeNumber) { this.writeNumber = writeNumber; @@ -269,6 +305,14 @@ public class MultiVersionConcurrencyControl { return this.writeNumber; } + public long getLastWriteNumber() { + return lastWriteNumber; + } + + public void setLastWriteNumber(long lastWriteNumber) { + this.lastWriteNumber = lastWriteNumber; + } + @Override public String toString() { return this.writeNumber + ", " + this.completed; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 7f3eb61..1ae69a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -115,7 +115,7 @@ class FSWALEntry extends Entry { MultiVersionConcurrencyControl.WriteEntry we = null; if (mvcc != null) { - we = mvcc.begin(); + we = mvcc.begin(getKey().getRowOperationContexts()); regionSequenceId = we.getWriteNumber(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 4091a82..4041b24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; + +import org.apache.hadoop.hbase.regionserver.HRegion.RowOperationContext; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; @@ -76,12 +78,23 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public class WALKey implements SequenceId, Comparable { private static final Log LOG = LogFactory.getLog(WALKey.class); + private RowOperationContext[] rowOperationContexts; @InterfaceAudience.Private // For internal use only. public MultiVersionConcurrencyControl getMvcc() { return mvcc; } + @InterfaceAudience.Private // For internal use only. + public RowOperationContext[] getRowOperationContexts() { + return rowOperationContexts; + } + + @InterfaceAudience.Private // For internal use only. + public void setRowOperationContexts(RowOperationContext... rowOperationContexts) { + this.rowOperationContexts = rowOperationContexts; + } + /** * Will block until a write entry has been assigned by they WAL subsystem. * @return A WriteEntry gotten from local WAL subsystem. Must be completed by calling diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java new file mode 100644 index 0000000..1d22a6a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class TestIncrement { + private static final Log LOG = LogFactory.getLog(TestIncrement.class); + @Rule + public TestName name = new TestName(); + private static HBaseTestingUtility TEST_UTIL; + + @Before + public void setUp() throws Exception { + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.cleanupTestDir(); + } + + /** + * Increments a single cell a bunch of times. + */ + private static class SingleCellIncrementer extends Thread { + private final int count; + private final HRegion region; + private final Increment increment; + + SingleCellIncrementer(final int i, final int count, final HRegion region, + final Increment increment) { + super("" + i); + setDaemon(true); + this.count = count; + this.region = region; + this.increment = increment; + } + + @Override + public void run() { + for (int i = 0; i < this.count; i++) { + try { + this.region.increment(this.increment); + // LOG.info(getName() + " " + i); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + Increment getIncrement() { + return this.increment; + } + } + + @Test + public void testContendedSingleCellIncrementer() throws IOException, InterruptedException { + String methodName = this.name.getMethodName(); + TableName tableName = TableName.valueOf(methodName); + byte[] incrementBytes = Bytes.toBytes("increment"); + Increment increment = new Increment(incrementBytes); + increment.addColumn(incrementBytes, incrementBytes, 1); + Configuration conf = TEST_UTIL.getConfiguration(); + final WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), TEST_UTIL + .getDataTestDir().toString(), conf); + final HRegion region = (HRegion) TEST_UTIL.createLocalHRegion(tableName, + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, false, Durability.SKIP_WAL, wal, + incrementBytes); + try { + final int threadCount = 100; + final int incrementCount = 25000; + SingleCellIncrementer[] threads = new SingleCellIncrementer[threadCount]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new SingleCellIncrementer(i, incrementCount, region, increment); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + increment = new Increment(incrementBytes); + Result r = region.get(new Get(increment.getRow())); + long total = CellUtil.getValueAsLong(r.listCells().get(0)); + assertEquals(incrementCount * incrementCount + 1, total); + } finally { + region.close(); + wal.close(); + } + } + + /** + * Have each thread update its own Cell. Avoid contention with another thread. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testUnContendedSingleCellIncrementer() throws IOException, InterruptedException { + String methodName = this.name.getMethodName(); + TableName tableName = TableName.valueOf(methodName); + + byte[] incrementBytes = Bytes.toBytes("increment"); + + Configuration conf = TEST_UTIL.getConfiguration(); + final WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(), TEST_UTIL + .getDataTestDir().toString(), conf); + final HRegion region = (HRegion) TEST_UTIL.createLocalHRegion(tableName, + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, false, Durability.SKIP_WAL, wal, + incrementBytes); + try { + final int threadCount = 100; + final int incrementCount = 25000; + SingleCellIncrementer[] threads = new SingleCellIncrementer[threadCount]; + for (int i = 0; i < threads.length; i++) { + byte[] rowBytes = Bytes.toBytes(i); + Increment increment = new Increment(rowBytes); + increment.addColumn(incrementBytes, incrementBytes, 1); + threads[i] = new SingleCellIncrementer(i, incrementCount, region, increment); + } + for (int i = 0; i < threads.length; i++) { + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + threads[i].join(); + } + RegionScanner regionScanner = region.getScanner(new Scan()); + List cells = new ArrayList(100); + while (regionScanner.next(cells)) + continue; + LOG.info("After " + cells.size()); + long total = 0; + for (Cell cell : cells) + total += CellUtil.getValueAsLong(cell); + assertEquals(incrementCount * threadCount, total); + } finally { + region.close(); + wal.close(); + } + } + + private void assertEquals(int i, long total) { + // TODO Auto-generated method stub + + } +} \ No newline at end of file