diff --git src/java/org/apache/hadoop/hbase/KeyValue.java src/java/org/apache/hadoop/hbase/KeyValue.java
index 81a6ccd..f2d8538 100644
--- src/java/org/apache/hadoop/hbase/KeyValue.java
+++ src/java/org/apache/hadoop/hbase/KeyValue.java
@@ -200,6 +200,23 @@ public class KeyValue implements Writable, HeapSize {
private int offset = 0;
private int length = 0;
+ /** Here be dragons **/
+
+ // used to achieve atomic operations in the memstore.
+ public long getMemstoreTS() {
+ return memstoreTS;
+ }
+
+ public void setMemstoreTS(long memstoreTS) {
+ this.memstoreTS = memstoreTS;
+ }
+
+ // default value is 0, aka DNC
+ private long memstoreTS = 0;
+
+ /** Dragon time over, return to normal business */
+
+
/** Writable Constructor -- DO NOT USE */
public KeyValue() {}
@@ -1503,6 +1520,21 @@ public class KeyValue implements Writable, HeapSize {
}
/**
+ * Creates a KeyValue that is last on the specified row id. That is,
+ * every other possible KeyValue for the given row would compareTo()
+ * less than the result of this call.
+ * @param row row key
+ * @return Last possible KeyValue on passed row
+ */
+ public static KeyValue createLastOnRow(final byte[] row) {
+ return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
+ }
+
+ /**
+ * Create a KeyValue that is smaller than all other possible KeyValues
+ * for the given row. That is any (valid) KeyValue on 'row' would sort
+ * _after_ the result.
+ *
* @param row - row key (arbitrary byte array)
* @return First possible KeyValue on passed row
*/
@@ -1511,6 +1543,8 @@ public class KeyValue implements Writable, HeapSize {
}
/**
+ * Creates a KeyValue that is smaller than all other KeyValues that
+ * are older than the passed timestamp.
* @param row - row key (arbitrary byte array)
* @param ts - timestamp
* @return First possible key on passed row and timestamp.
@@ -1522,8 +1556,11 @@ public class KeyValue implements Writable, HeapSize {
/**
* @param row - row key (arbitrary byte array)
+ * @param c column - {@link #parseColumn(byte[])} is called to split
+ * the column.
* @param ts - timestamp
* @return First possible key on passed row, column and timestamp
+ * @deprecated
*/
public static KeyValue createFirstOnRow(final byte [] row, final byte [] c,
final long ts) {
@@ -1532,14 +1569,17 @@ public class KeyValue implements Writable, HeapSize {
}
/**
+ * Create a KeyValue for the specified row, family and qualifier that would be
+ * smaller than all other possible KeyValues that have the same row,family,qualifier.
+ * Used for seeking.
* @param row - row key (arbitrary byte array)
- * @param f - family name
- * @param q - column qualifier
+ * @param family - family name
+ * @param qualifier - column qualifier
* @return First possible key on passed row, and column.
*/
- public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
- final byte [] q) {
- return new KeyValue(row, f, q, HConstants.LATEST_TIMESTAMP, Type.Maximum);
+ public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
+ final byte [] qualifier) {
+ return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
}
/**
@@ -1706,9 +1746,6 @@ public class KeyValue implements Writable, HeapSize {
return compare;
}
- // if row matches, and no column in the 'left' AND put type is 'minimum',
- // then return that left is larger than right.
-
// Compare column family. Start compare past row and family length.
int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset;
int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset;
@@ -1717,17 +1754,25 @@ public class KeyValue implements Writable, HeapSize {
int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE -
(rcolumnoffset - roffset);
+ // if row matches, and no column in the 'left' AND put type is 'minimum',
+ // then return that left is larger than right.
+
// This supports 'last key on a row' - the magic is if there is no column in the
// left operand, and the left operand has a type of '0' - magical value,
// then we say the left is bigger. This will let us seek to the last key in
// a row.
byte ltype = left[loffset + (llength - 1)];
+ byte rtype = right[roffset + (rlength - 1)];
if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
return 1; // left is bigger.
}
+ if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
+ return -1;
+ }
+ // TODO the family and qualifier should be compared separately
compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right,
rcolumnoffset, rcolumnlength);
if (compare != 0) {
@@ -1749,9 +1794,6 @@ public class KeyValue implements Writable, HeapSize {
if (!this.ignoreType) {
// Compare types. Let the delete types sort ahead of puts; i.e. types
// of higher numbers sort before those of lesser numbers
-
- // ltype is defined above
- byte rtype = right[roffset + (rlength - 1)];
return (0xff & rtype) - (0xff & ltype);
}
return 0;
@@ -1791,7 +1833,8 @@ public class KeyValue implements Writable, HeapSize {
public long heapSize() {
return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE +
ClassSize.align(ClassSize.ARRAY + length) +
- (2 * Bytes.SIZEOF_INT));
+ (2 * Bytes.SIZEOF_INT) +
+ Bytes.SIZEOF_LONG);
}
// this overload assumes that the length bytes have already been read,
diff --git src/java/org/apache/hadoop/hbase/client/HTable.java src/java/org/apache/hadoop/hbase/client/HTable.java
index 3b7b3c0..2bc571c 100644
--- src/java/org/apache/hadoop/hbase/client/HTable.java
+++ src/java/org/apache/hadoop/hbase/client/HTable.java
@@ -660,6 +660,7 @@ public class HTable {
*/
public void close() throws IOException{
flushCommits();
+ this.pool.shutdownNow();
}
/**
diff --git src/java/org/apache/hadoop/hbase/client/Scan.java src/java/org/apache/hadoop/hbase/client/Scan.java
index 20db2a8..eddefad 100644
--- src/java/org/apache/hadoop/hbase/client/Scan.java
+++ src/java/org/apache/hadoop/hbase/client/Scan.java
@@ -168,10 +168,29 @@ public class Scan implements Writable {
}
/**
+ * Builds a scan object with the same specs as get.
+ * @param get get to model scan after
+ */
+ public Scan(Get get) {
+ this.startRow = get.getRow();
+ this.stopRow = get.getRow();
+ this.filter = get.getFilter();
+ this.maxVersions = get.getMaxVersions();
+ this.tr = get.getTimeRange();
+ this.familyMap = get.getFamilyMap();
+ }
+
+ public boolean isGetScan() {
+ return this.startRow != null && this.startRow.length > 0 &&
+ Bytes.equals(this.startRow, this.stopRow);
+ }
+
+ /**
* Get all columns from the specified family.
*
* Overrides previous calls to addColumn for this family.
* @param family family name
+ * @return this
*/
public Scan addFamily(byte [] family) {
familyMap.remove(family);
@@ -185,6 +204,7 @@ public class Scan implements Writable {
* Overrides previous calls to addFamily for this family.
* @param family family name
* @param qualifier column qualifier
+ * @return this
*/
public Scan addColumn(byte [] family, byte [] qualifier) {
NavigableSet set = familyMap.get(family);
diff --git src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
index e1dbddc..03dd83b 100644
--- src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
+++ src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
@@ -178,9 +178,6 @@ public class SingleColumnValueFilter implements Filter {
// byte array copy?
int compareResult =
this.comparator.compareTo(Arrays.copyOfRange(data, offset, offset + length));
- if (LOG.isDebugEnabled()) {
- LOG.debug("compareResult=" + compareResult + " " + Bytes.toString(data, offset, length));
- }
switch (this.compareOp) {
case LESS:
return compareResult <= 0;
diff --git src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
new file mode 100644
index 0000000..f0ec0b7
--- /dev/null
+++ src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
@@ -0,0 +1,50 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DebugPrint {
+
+private static final AtomicBoolean enabled = new AtomicBoolean(false);
+ private static final Object sync = new Object();
+ public static StringBuilder out = new StringBuilder();
+
+ static public void enable() {
+ enabled.set(true);
+ }
+ static public void disable() {
+ enabled.set(false);
+ }
+
+ static public void reset() {
+ synchronized (sync) {
+ enable(); // someone wants us enabled basically.
+
+ out = new StringBuilder();
+ }
+ }
+ static public void dumpToFile(String file) throws IOException {
+ FileWriter f = new FileWriter(file);
+ synchronized (sync) {
+ f.write(out.toString());
+ }
+ f.close();
+ }
+
+ public static void println(String m) {
+ if (!enabled.get()) {
+ System.out.println(m);
+ return;
+ }
+
+ synchronized (sync) {
+ String threadName = Thread.currentThread().getName();
+ out.append("<");
+ out.append(threadName);
+ out.append("> ");
+ out.append(m);
+ out.append("\n");
+ }
+ }
+}
\ No newline at end of file
diff --git src/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9b1e6b8..c1ac1b4 100644
--- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -67,10 +67,11 @@ package org.apache.hadoop.hbase.regionserver;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
+ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@@ -176,6 +177,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
/**
* Set flags that make this region read-only.
+ *
+ * @param onOff flip value for region r/o setting
*/
synchronized void setReadOnly(final boolean onOff) {
this.writesEnabled = !onOff;
@@ -191,7 +194,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
}
- private volatile WriteState writestate = new WriteState();
+ private final WriteState writestate = new WriteState();
final long memstoreFlushSize;
private volatile long lastFlushTime;
@@ -210,7 +213,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
private final Object splitLock = new Object();
private long minSequenceId;
private boolean splitRequest;
-
+
+ private final ReadWriteConsistencyControl rwcc =
+ new ReadWriteConsistencyControl();
+
/**
* Name of the region info file that resides just under the region directory.
*/
@@ -296,9 +302,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* Initialize this region and get it ready to roll.
* Called after construction.
*
- * @param initialFiles
- * @param reporter
- * @throws IOException
+ * @param initialFiles path
+ * @param reporter progressable
+ * @throws IOException e
*/
public void initialize(Path initialFiles, final Progressable reporter)
throws IOException {
@@ -436,6 +442,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
return this.closing.get();
}
+ public ReadWriteConsistencyControl getRWCC() {
+ return rwcc;
+ }
+
/**
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
@@ -447,7 +457,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* HStores make use of. It's a list of all HStoreFile objects. Returns empty
* vector if already closed and null if judged that it should not close.
*
- * @throws IOException
+ * @throws IOException e
*/
public List close() throws IOException {
return close(false);
@@ -465,7 +475,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* HStores make use of. It's a list of HStoreFile objects. Can be null if
* we are not to close at this time or we are already closed.
*
- * @throws IOException
+ * @throws IOException e
*/
public List close(final boolean abort) throws IOException {
if (isClosed()) {
@@ -580,6 +590,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
/** @return the last time the region was flushed */
+ @SuppressWarnings({"UnusedDeclaration"})
public long getLastFlushTime() {
return this.lastFlushTime;
}
@@ -681,8 +692,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null);
moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
- HRegion regions[] = new HRegion [] {regionA, regionB};
- return regions;
+ return new HRegion [] {regionA, regionB};
}
}
@@ -756,7 +766,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* server does them sequentially and not in parallel.
*
* @return mid key if split is needed
- * @throws IOException
+ * @throws IOException e
*/
public byte [] compactStores() throws IOException {
boolean majorCompaction = this.forceMajorCompaction;
@@ -777,7 +787,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
*
* @param majorCompaction True to force a major compaction regardless of thresholds
* @return split row if split is needed
- * @throws IOException
+ * @throws IOException e
*/
byte [] compactStores(final boolean majorCompaction)
throws IOException {
@@ -846,7 +856,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
*
* @return true if cache was flushed
*
- * @throws IOException
+ * @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
@@ -912,7 +922,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
*
* @return true if the region needs compacting
*
- * @throws IOException
+ * @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
@@ -940,18 +950,28 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// during the flush
long sequenceId = -1L;
long completeSequenceId = -1L;
+
+ // we have to take a write lock during snapshot, or else a write could
+ // end up in both snapshot and memstore (makes it difficult to do atomic
+ // rows then)
this.updatesLock.writeLock().lock();
- // Get current size of memstores.
final long currentMemStoreSize = this.memstoreSize.get();
- List storeFlushers = new ArrayList();
+ List storeFlushers = new ArrayList(stores.size());
try {
sequenceId = log.startCacheFlush();
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
- // create the store flushers
+
for (Store s : stores.values()) {
storeFlushers.add(s.getStoreFlusher(completeSequenceId));
}
+ // This thread is going to cause a whole bunch of scanners to reseek.
+ // They are depending
+ // on a thread-local to know where to read from.
+ // The reason why we set it up high is so that each HRegionScanner only
+ // has a single read point for all its sub-StoreScanners.
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
// prepare flush (take a snapshot)
for (StoreFlusher flusher: storeFlushers) {
flusher.prepare();
@@ -960,6 +980,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
this.updatesLock.writeLock().unlock();
}
+ LOG.debug("Finished snapshotting, commencing flushing stores");
+
// Any failure from here on out will be catastrophic requiring server
// restart so hlog content can be replayed and put back into the memstore.
// Otherwise, the snapshot content while backed up in the hlog, it will not
@@ -973,13 +995,28 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
flusher.flushCache();
}
- internalPreFlashcacheCommit();
+ Callable atomicWork = internalPreFlushcacheCommit();
+
+ LOG.debug("Caches flushed, doing commit now (which includes update scanners)");
/**
- * Switch between memstore and the new store file
+ * Switch between memstore(snapshot) and the new store file
*/
- this.newScannerLock.writeLock().lock();
+ if (atomicWork != null) {
+ LOG.debug("internalPreFlushcacheCommit gives us work to do, acquiring newScannerLock");
+ newScannerLock.writeLock().lock();
+ }
+
try {
+ // update this again to make sure we are 'fresh'
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+ if (atomicWork != null) {
+ atomicWork.call();
+ }
+
+ // Switch snapshot (in memstore) -> new hfile (thus causing
+ // all the store scanners to reset/reseek).
for (StoreFlusher flusher : storeFlushers) {
boolean needsCompaction = flusher.commit();
if (needsCompaction) {
@@ -987,10 +1024,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
}
} finally {
- this.newScannerLock.writeLock().unlock();
+ if (atomicWork != null) {
+ newScannerLock.writeLock().unlock();
+ }
}
- // clear the stireFlushers list
storeFlushers.clear();
// Set down the memstore size by amount of flush.
this.memstoreSize.addAndGet(-currentMemStoreSize);
@@ -1040,9 +1078,14 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* A hook for sub classed wishing to perform operations prior to the cache
* flush commit stage.
*
+ * If a subclass wishes that an atomic update of their work and the
+ * flush commit stage happens, they should return a callable. The new scanner
+ * lock will be acquired and released.
+
* @throws java.io.IOException allow children to throw exception
*/
- protected void internalPreFlashcacheCommit() throws IOException {
+ protected Callable internalPreFlushcacheCommit() throws IOException {
+ return null;
}
/**
@@ -1080,9 +1123,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* ts.
*
* @param row row key
- * @param family
+ * @param family column family to find on
* @return map of values
- * @throws IOException
+ * @throws IOException read exceptions
*/
public Result getClosestRowBefore(final byte [] row, final byte [] family)
throws IOException {
@@ -1099,11 +1142,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
if (key == null) {
return null;
}
- // This will get all results for this store. TODO: Do we need to do this?
Get get = new Get(key.getRow());
- List results = new ArrayList();
- store.get(get, null, results);
- return new Result(results);
+ get.addFamily(family);
+ return get(get, null);
} finally {
splitsAndClosesLock.readLock().unlock();
}
@@ -1117,7 +1158,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
*
* @param scan configured {@link Scan}
* @return InternalScanner
- * @throws IOException
+ * @throws IOException read exceptions
*/
public InternalScanner getScanner(Scan scan)
throws IOException {
@@ -1155,24 +1196,23 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// set() methods for client use.
//////////////////////////////////////////////////////////////////////////////
/**
- * @param delete
- * @param lockid
- * @param writeToWAL
- * @throws IOException
+ * @param delete delete object
+ * @param lockid existing lock id, or null for grab a lock
+ * @param writeToWAL append to the write ahead lock or not
+ * @throws IOException read exceptions
*/
public void delete(Delete delete, Integer lockid, boolean writeToWAL)
throws IOException {
checkReadOnly();
checkResources();
Integer lid = null;
- newScannerLock.writeLock().lock();
splitsAndClosesLock.readLock().lock();
try {
byte [] row = delete.getRow();
// If we did not pass an existing row lock, obtain a new one
lid = getLock(lockid, row);
- //Check to see if this is a deleteRow insert
+ // Check to see if this is a deleteRow insert
if(delete.getFamilyMap().isEmpty()){
for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
// Don't eat the timestamp
@@ -1193,7 +1233,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
} finally {
if(lockid == null) releaseRowLock(lid);
splitsAndClosesLock.readLock().unlock();
- newScannerLock.writeLock().unlock();
}
}
@@ -1208,7 +1247,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
long now = System.currentTimeMillis();
byte [] byteNow = Bytes.toBytes(now);
boolean flush = false;
- this.updatesLock.readLock().lock();
+
+ updatesLock.readLock().lock();
+ ReadWriteConsistencyControl.WriteEntry w = null;
try {
@@ -1225,21 +1266,21 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
if (kv.isLatestTimestamp() && kv.isDeleteType()) {
byte[] qual = kv.getQualifier();
if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
+
Integer count = kvCount.get(qual);
if (count == null) {
- kvCount.put(qual, new Integer(1));
+ kvCount.put(qual, 1);
} else {
- kvCount.put(qual, new Integer(count+1));
+ kvCount.put(qual, count + 1);
}
count = kvCount.get(qual);
- List result = new ArrayList(1);
- Get g = new Get(kv.getRow());
- g.setMaxVersions(count);
- NavigableSet qualifiers =
- new TreeSet(Bytes.BYTES_COMPARATOR);
- qualifiers.add(qual);
- get(store, g, qualifiers, result);
+ Get get = new Get(kv.getRow());
+ get.setMaxVersions(count);
+ get.addColumn(family, qual);
+
+ List result = get(get);
+
if (result.size() < count) {
// Nothing to delete
kv.updateLatestStamp(byteNow);
@@ -1284,11 +1325,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
}
+ // Now make changes to the memstore.
+
long size = 0;
+ w = rwcc.beginMemstoreInsert();
- //
- // Now make changes to the memstore.
- //
for (Map.Entry> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@@ -1296,13 +1337,17 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
Store store = getStore(family);
for (KeyValue kv: kvs) {
+ kv.setMemstoreTS(w.getWriteNumber());
size = this.memstoreSize.addAndGet(store.delete(kv));
}
}
flush = isFlushSize(size);
} finally {
+ if (w != null) rwcc.completeMemstoreInsert(w);
+
this.updatesLock.readLock().unlock();
}
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -1350,8 +1395,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// read lock, resources may run out. For now, the thought is that this
// will be extremely rare; we'll deal with it when it happens.
checkResources();
- newScannerLock.writeLock().lock();
splitsAndClosesLock.readLock().lock();
+
try {
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
@@ -1361,6 +1406,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
byte [] row = put.getRow();
// If we did not pass an existing row lock, obtain a new one
Integer lid = getLock(lockid, row);
+
byte [] now = Bytes.toBytes(System.currentTimeMillis());
try {
// All edits for the given row (across all column families) must happen atomically.
@@ -1370,7 +1416,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
} finally {
splitsAndClosesLock.readLock().unlock();
- newScannerLock.writeLock().unlock();
}
}
@@ -1410,15 +1455,12 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
Integer lid = getLock(lockId, get.getRow());
List result = new ArrayList();
try {
- //Getting data
- for(Map.Entry> entry:
- get.getFamilyMap().entrySet()) {
- get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
- }
+ result = get(get);
+
boolean matches = false;
if (result.size() == 0 && expectedValue.length == 0) {
matches = true;
- } else if(result.size() == 1) {
+ } else if (result.size() == 1) {
//Compare the expected value with the actual value
byte [] actualValue = result.get(0).getValue();
matches = Bytes.equals(expectedValue, actualValue);
@@ -1534,6 +1576,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
/**
* Add updates first to the hlog and then add values to memstore.
* Warning: Assumption is caller has lock on passed in row.
+ * @param family
* @param edits Cell updates by column
* @praram now
* @throws IOException
@@ -1558,6 +1601,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
byte[] byteNow = Bytes.toBytes(now);
boolean flush = false;
this.updatesLock.readLock().lock();
+ ReadWriteConsistencyControl.WriteEntry w = null;
try {
WALEdit walEdit = new WALEdit();
@@ -1601,6 +1645,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
long size = 0;
+ w = rwcc.beginMemstoreInsert();
+
// now make changes to the memstore
for (Map.Entry> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@@ -1608,11 +1654,14 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
Store store = getStore(family);
for (KeyValue kv: edits) {
+ kv.setMemstoreTS(w.getWriteNumber());
size = this.memstoreSize.addAndGet(store.add(kv));
}
}
flush = isFlushSize(size);
} finally {
+ if (w != null) rwcc.completeMemstoreInsert(w);
+
this.updatesLock.readLock().unlock();
}
if (flush) {
@@ -1854,9 +1903,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
private Filter filter;
private RowFilterInterface oldFilter;
private List results = new ArrayList();
+ private int isScan;
private int batch;
RegionScanner(Scan scan, List additionalScanners) {
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+ //DebugPrint.println("HRegionScanner., threadpoint = " + ReadWriteConsistencyControl.getThreadReadPoint());
this.filter = scan.getFilter();
this.batch = scan.getBatch();
this.oldFilter = scan.getOldFilter();
@@ -1865,12 +1918,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
} else {
this.stopRow = scan.getStopRow();
}
+ this.isScan = scan.isGetScan() ? -1 : 0;
List scanners = new ArrayList();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
}
- for (Map.Entry> entry :
+ for (Map.Entry> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
scanners.add(store.getScanner(scan, entry.getValue()));
@@ -1893,6 +1947,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
if (oldFilter != null) {
oldFilter.reset();
}
+
+ // Start the next row read and reset the thread point
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
}
public boolean next(List outResults, int limit) throws IOException {
@@ -1901,6 +1958,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing=" + closing.get() + " or closed=" + closed.get());
}
+
+ // This could be a new thread from the last time we called next().
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
results.clear();
boolean returnResult = nextInternal(limit);
if (!returnResult && filterRow()) {
@@ -1982,7 +2042,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
return currentRow == null ||
(this.stopRow != null &&
comparator.compareRows(this.stopRow, 0, this.stopRow.length,
- currentRow, 0, currentRow.length) <= 0);
+ currentRow, 0, currentRow.length) <= isScan);
}
private boolean filterRow() {
@@ -2502,10 +2562,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// HBASE-880
//
/**
- * @param get
- * @param lockid
+ * @param get get object
+ * @param lockid existing lock id, or null for no previous lock
* @return result
- * @throws IOException
+ * @throws IOException read exceptions
*/
public Result get(final Get get, final Integer lockid) throws IOException {
// Verify families are all valid
@@ -2520,22 +2580,33 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
// Lock row
Integer lid = getLock(lockid, get.getRow());
- List result = new ArrayList();
+ List result = null;
try {
- for (Map.Entry> entry:
- get.getFamilyMap().entrySet()) {
- get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
- }
+ result = get(get);
} finally {
- if(lockid == null) releaseRowLock(lid);
+ if(lockid == null)
+ releaseRowLock(lid);
}
return new Result(result);
}
- private void get(final Store store, final Get get,
- final NavigableSet qualifiers, List result)
- throws IOException {
- store.get(get, qualifiers, result);
+ /*
+ * Do a get based on the get parameter.
+ */
+ private List get(final Get get) throws IOException {
+ Scan scan = new Scan(get);
+
+ List results = new ArrayList();
+
+ InternalScanner scanner = null;
+ try {
+ scanner = getScanner(scan);
+ scanner.next(results);
+ } finally {
+ if (scanner != null)
+ scanner.close();
+ }
+ return results;
}
/**
@@ -2544,6 +2615,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* @param family
* @param qualifier
* @param amount
+ * @param writeToWAL
* @return The new value.
* @throws IOException
*/
@@ -2558,6 +2630,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
try {
Store store = stores.get(family);
+ // TODO call the proper GET API
// Get the old value:
Get get = new Get(row);
get.addColumn(family, qualifier);
@@ -2623,7 +2696,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
(5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
- (20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
+ (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
diff --git src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
index de3df22..440f5a7 100644
--- src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
+++ src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.KeyValue;
+
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
@@ -28,8 +30,6 @@ import java.util.SortedSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.hadoop.hbase.KeyValue;
-
/**
* A {@link java.util.Set} of {@link KeyValue}s implemented on top of a
* {@link java.util.concurrent.ConcurrentSkipListMap}. Works like a
@@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.KeyValue;
* has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent
* get and set and won't throw ConcurrentModificationException when iterating.
*/
-class KeyValueSkipListSet implements NavigableSet, Cloneable {
+class KeyValueSkipListSet implements NavigableSet {
private ConcurrentNavigableMap delegatee;
KeyValueSkipListSet(final KeyValue.KVComparator c) {
@@ -167,6 +167,7 @@ class KeyValueSkipListSet implements NavigableSet, Cloneable {
}
public boolean contains(Object o) {
+ //noinspection SuspiciousMethodCalls
return this.delegatee.containsKey(o);
}
@@ -201,17 +202,4 @@ class KeyValueSkipListSet implements NavigableSet, Cloneable {
public T[] toArray(T[] a) {
throw new UnsupportedOperationException("Not implemented");
}
-
- @Override
- public KeyValueSkipListSet clone() {
- assert this.delegatee.getClass() == ConcurrentSkipListMap.class;
- KeyValueSkipListSet clonedSet = null;
- try {
- clonedSet = (KeyValueSkipListSet) super.clone();
- } catch (CloneNotSupportedException e) {
- throw new InternalError(e.getMessage());
- }
- clonedSet.delegatee = ((ConcurrentSkipListMap) this.delegatee).clone();
- return clonedSet;
- }
}
\ No newline at end of file
diff --git src/java/org/apache/hadoop/hbase/regionserver/MemStore.java src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 7afe297..ba251bd 100644
--- src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -28,7 +28,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.SortedSet;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -110,7 +112,7 @@ public class MemStore implements HeapSize {
/**
* Creates a snapshot of the current memstore.
- * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.Map)}
+ * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.SortedSet)}
* To get the snapshot made by this method, use {@link #getSnapshot()}
*/
void snapshot() {
@@ -140,7 +142,7 @@ public class MemStore implements HeapSize {
* call to {@link #snapshot()}
* @return Return snapshot.
* @see {@link #snapshot()}
- * @see {@link #clearSnapshot(java.util.Map)}
+ * @see {@link #clearSnapshot(java.util.SortedSet)}
*/
KeyValueSkipListSet getSnapshot() {
return this.snapshot;
@@ -187,7 +189,7 @@ public class MemStore implements HeapSize {
return s;
}
- /**
+ /**
* Write a delete
* @param delete
* @return approximate size of the passed key and value.
@@ -195,69 +197,8 @@ public class MemStore implements HeapSize {
long delete(final KeyValue delete) {
long s = 0;
this.lock.readLock().lock();
- //Have to find out what we want to do here, to find the fastest way of
- //removing things that are under a delete.
- //Actions that will take place here are:
- //1. Insert a delete and remove all the affected entries already in memstore
- //2. In the case of a Delete and the matching put is found then don't insert
- // the delete
- //TODO Would be nice with if we had an iterator for this, so we could remove
- //things that needs to be removed while iterating and don't have to go
- //back and do it afterwards
try {
- boolean notpresent = false;
- List deletes = new ArrayList();
- SortedSet tail = this.kvset.tailSet(delete);
-
- //Parse the delete, so that it is only done once
- byte [] deleteBuffer = delete.getBuffer();
- int deleteOffset = delete.getOffset();
-
- int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset);
- deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
-
- short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset);
- deleteOffset += Bytes.SIZEOF_SHORT;
- int deleteRowOffset = deleteOffset;
-
- deleteOffset += deleteRowLen;
-
- byte deleteFamLen = deleteBuffer[deleteOffset];
- deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen;
-
- int deleteQualifierOffset = deleteOffset;
- int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen -
- Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG -
- Bytes.SIZEOF_BYTE;
-
- deleteOffset += deleteQualifierLen;
-
- int deleteTimestampOffset = deleteOffset;
- deleteOffset += Bytes.SIZEOF_LONG;
- byte deleteType = deleteBuffer[deleteOffset];
-
- //Comparing with tail from memstore
- for (KeyValue kv : tail) {
- DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer,
- deleteRowOffset, deleteRowLen, deleteQualifierOffset,
- deleteQualifierLen, deleteTimestampOffset, deleteType,
- comparator.getRawComparator());
- if (res == DeleteCode.DONE) {
- break;
- } else if (res == DeleteCode.DELETE) {
- deletes.add(kv);
- } // SKIP
- }
-
- //Delete all the entries effected by the last added delete
- for (KeyValue kv : deletes) {
- notpresent = this.kvset.remove(kv);
- s -= heapSizeChange(kv, notpresent);
- }
-
- // Adding the delete to memstore. Add any value, as long as
- // same instance each time.
s += heapSizeChange(delete, this.kvset.add(delete));
} finally {
this.lock.readLock().unlock();
@@ -265,7 +206,7 @@ public class MemStore implements HeapSize {
this.size.addAndGet(s);
return s;
}
-
+
/**
* @param kv Find the row that comes after this one. If null, we return the
* first.
@@ -318,7 +259,7 @@ public class MemStore implements HeapSize {
}
/**
- * @param state
+ * @param state column/delete tracking state
*/
void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
this.lock.readLock().lock();
@@ -442,8 +383,7 @@ public class MemStore implements HeapSize {
this.lock.readLock().lock();
try {
KeyValueScanner [] scanners = new KeyValueScanner[1];
- scanners[0] = new MemStoreScanner(this.kvset.clone(),
- this.snapshot.clone(), this.comparator);
+ scanners[0] = new MemStoreScanner();
return scanners;
} finally {
this.lock.readLock().unlock();
@@ -465,10 +405,8 @@ public class MemStore implements HeapSize {
* @param matcher Column matcher
* @param result List to add results to
* @return true if done with store (early-out), false if not
- * @throws IOException
*/
- public boolean get(QueryMatcher matcher, List result)
- throws IOException {
+ public boolean get(QueryMatcher matcher, List result) {
this.lock.readLock().lock();
try {
if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
@@ -485,11 +423,11 @@ public class MemStore implements HeapSize {
* Gets from either the memstore or the snapshop, and returns a code
* to let you know which is which.
*
- * @param matcher
- * @param result
+ * @param matcher query matcher
+ * @param result puts results here
* @return 1 == memstore, 2 == snapshot, 0 == none
*/
- int getWithCode(QueryMatcher matcher, List result) throws IOException {
+ int getWithCode(QueryMatcher matcher, List result) {
this.lock.readLock().lock();
try {
boolean fromMemstore = internalGet(this.kvset, matcher, result);
@@ -517,18 +455,16 @@ public class MemStore implements HeapSize {
void readLockUnlock() {
this.lock.readLock().unlock();
}
-
+
/**
*
* @param set memstore or snapshot
* @param matcher query matcher
* @param result list to add results to
* @return true if done with store (early-out), false if not
- * @throws IOException
*/
boolean internalGet(final NavigableSet set,
- final QueryMatcher matcher, final List result)
- throws IOException {
+ final QueryMatcher matcher, final List result) {
if(set.isEmpty()) return false;
// Seek to startKey
SortedSet tail = set.tailSet(matcher.getStartKey());
@@ -550,11 +486,161 @@ public class MemStore implements HeapSize {
}
return false;
}
+
+
+ /*
+ * MemStoreScanner implements the KeyValueScanner.
+ * It lets the caller scan the contents of a memstore -- both current
+ * map and snapshot.
+ * This behaves as if it were a real scanner but does not maintain position.
+ */
+ protected class MemStoreScanner implements KeyValueScanner {
+ // Next row information for either kvset or snapshot
+ private KeyValue kvsetNextRow = null;
+ private KeyValue snapshotNextRow = null;
+
+ // iterator based scanning.
+ Iterator kvsetIt;
+ Iterator snapshotIt;
+
+ /*
+ Some notes...
+
+ So memstorescanner is fixed at creation time. this includes pointers/iterators into
+ existing kvset/snapshot. during a snapshot creation, the kvset is null, and the
+ snapshot is moved. since kvset is null there is no point on reseeking on both,
+ we can save us the trouble. During the snapshot->hfile transition, the memstore
+ scanner is re-created by StoreScanner#updateReaders(). StoreScanner should
+ potentially do something smarter by adjusting the existing memstore scanner.
+
+ But there is a greater problem here, that being once a scanner has progressed
+ during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
+ if a scan lasts a little while, there is a chance for new entries in kvset to
+ become available but we will never see them. This needs to be handled at the
+ StoreScanner level with coordination with MemStoreScanner.
+
+ */
+
+ MemStoreScanner() {
+ super();
+
+ //DebugPrint.println(" MS new@" + hashCode());
+ }
+
+ protected KeyValue getNext(Iterator it) {
+ KeyValue ret = null;
+ long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
+ //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
+
+ while (ret == null && it.hasNext()) {
+ KeyValue v = it.next();
+ if (v.getMemstoreTS() <= readPoint) {
+ // keep it.
+ ret = v;
+ }
+ }
+ return ret;
+ }
+
+ public synchronized boolean seek(KeyValue key) {
+ if (key == null) {
+ close();
+ return false;
+ }
+
+ // kvset and snapshot will never be empty.
+ // if tailSet cant find anything, SS is empty (not null).
+ SortedSet kvTail = kvset.tailSet(key);
+ SortedSet snapshotTail = snapshot.tailSet(key);
+
+ kvsetIt = kvTail.iterator();
+ snapshotIt = snapshotTail.iterator();
+
+ kvsetNextRow = getNext(kvsetIt);
+ snapshotNextRow = getNext(snapshotIt);
+ //long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
+ //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " +
+ // kvset.size() + " threadread = " + readPoint);
+ //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
+ // snapshot.size() + " threadread = " + readPoint);
+
+
+ KeyValue lowest = getLowest();
+
+ // has data := (lowest != null)
+ return lowest != null;
+ }
+
+ public synchronized KeyValue peek() {
+ //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
+ return getLowest();
+ }
+
+
+ public synchronized KeyValue next() {
+ KeyValue theNext = getLowest();
+
+ if (theNext == null) {
+ return null;
+ }
+
+ // Advance one of the iterators
+ if (theNext == kvsetNextRow) {
+ kvsetNextRow = getNext(kvsetIt);
+ } else {
+ snapshotNextRow = getNext(snapshotIt);
+ }
+
+ //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
+ //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
+ // getLowest() + " threadpoint=" + readpoint);
+ return theNext;
+ }
+
+ protected KeyValue getLowest() {
+ return getLower(kvsetNextRow,
+ snapshotNextRow);
+ }
+
+ /*
+ * Returns the lower of the two key values, or null if they are both null.
+ * This uses comparator.compare() to compare the KeyValue using the memstore
+ * comparator.
+ */
+ protected KeyValue getLower(KeyValue first, KeyValue second) {
+ if (first == null && second == null) {
+ return null;
+ }
+ if (first != null && second != null) {
+ int compare = comparator.compare(first, second);
+ return (compare <= 0 ? first : second);
+ }
+ return (first != null ? first : second);
+ }
+
+ public synchronized void close() {
+ this.kvsetNextRow = null;
+ this.snapshotNextRow = null;
+
+ this.kvsetIt = null;
+ this.snapshotIt = null;
+ }
+
+ public synchronized void changedMemStore() {
+ // its possible that the snapshot has been cleared, therefore we become
+ // 'invalid'. But the scanner containing us depends on us knowing our
+ // "current" key to reseek the scanner stack. Thus we cannot be reseeked
+ // when data has been pushed out of snapshot.
+ if (!snapshot.isEmpty())
+ seek(peek());
+ }
+ }
+
public final static long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
-
+
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
@@ -568,11 +654,11 @@ public class MemStore implements HeapSize {
* @return Size
*/
long heapSizeChange(final KeyValue kv, final boolean notpresent) {
- return notpresent ?
+ return notpresent ?
ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
0;
}
-
+
/**
* Get the entire heap usage for this MemStore not including keys in the
* snapshot.
@@ -581,7 +667,7 @@ public class MemStore implements HeapSize {
public long heapSize() {
return size.get();
}
-
+
/**
* Get the heap usage of KVs in this MemStore.
*/
@@ -603,7 +689,7 @@ public class MemStore implements HeapSize {
* enough. See hbase-900. Fills memstores then waits so user can heap
* dump and bring up resultant hprof in something like jprofiler which
* allows you get 'deep size' on objects.
- * @param args
+ * @param args main args
*/
public static void main(String [] args) {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
@@ -638,5 +724,4 @@ public class MemStore implements HeapSize {
}
LOG.info("Exiting.");
}
-
}
diff --git src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
deleted file mode 100644
index d342bab..0000000
--- src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-/**
- * MemStoreScanner implements the KeyValueScanner.
- * It lets the caller scan the contents of a memstore -- both current
- * map and snapshot.
- *
- * The memstore scanner keeps its own reference to the main and snapshot
- * key/value sets. Keeping those references allows the scanner to be indifferent
- * to memstore flushes. Calling the {@link #close()} method ensures that the
- * references to those classes are null'd allowing the GC to pick them up.
- */
-class MemStoreScanner implements KeyValueScanner {
- private static final Log LOG = LogFactory.getLog(MemStoreScanner.class);
-
- private static final
- SortedSet EMPTY_SET = new TreeSet();
- private static final Iterator EMPTY_ITERATOR =
- new Iterator() {
-
- @Override
- public boolean hasNext() {
- return false;
- }
- @Override
- public KeyValue next() {
- return null;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
-
-
- private SortedSet kvsetRef;
- private SortedSet snapshotRef;
- private KeyValue.KVComparator comparatorRef;
- private Iterator kvsetIterator;
- private Iterator snapshotIterator;
-
- private KeyValue currentKvsetKV;
- private KeyValue currentSnapshotKV;
- private KeyValue nextKV;
-
- /**
- * Create a new memstore scanner.
- *
- * @param kvset the main key value set
- * @param snapshot the snapshot set
- * @param comparator the comparator to use
- */
- MemStoreScanner(KeyValueSkipListSet kvset,
- KeyValueSkipListSet snapshot, KeyValue.KVComparator comparator) {
- super();
- this.kvsetRef = kvset;
- this.snapshotRef = snapshot != null ? snapshot : EMPTY_SET;
- this.comparatorRef = comparator;
- this.kvsetIterator = kvsetRef.iterator();
- this.snapshotIterator = snapshotRef.iterator();
- this.nextKV = currentKvsetKV = currentSnapshotKV = null;
- }
-
- private void fill() {
- if (nextKV == null) {
- if (currentSnapshotKV == null && snapshotIterator.hasNext()) {
- currentSnapshotKV = snapshotIterator.next();
- }
-
- if (currentKvsetKV == null && kvsetIterator.hasNext()) {
- currentKvsetKV = kvsetIterator.next();
- }
-
- if (currentSnapshotKV != null && currentKvsetKV != null) {
- int cmp = comparatorRef.compare(currentSnapshotKV, currentKvsetKV);
- if (cmp <= 0) {
- nextKV = currentSnapshotKV;
- currentSnapshotKV = null;
- } else {
- nextKV = currentKvsetKV;
- currentKvsetKV = null;
- }
- } else if (currentSnapshotKV != null) {
- nextKV = currentSnapshotKV;
- currentSnapshotKV = null;
- } else {
- nextKV = currentKvsetKV;
- currentKvsetKV = null;
- }
- }
- }
-
- @Override
- public synchronized boolean seek(KeyValue key) {
- if (key == null) {
- close();
- return false;
- }
- SortedSet kvsetTail = kvsetRef.tailSet(key);
- SortedSet snapshotTail = snapshotRef != null ?
- snapshotRef.tailSet(key) : EMPTY_SET;
-
- kvsetIterator = kvsetTail.iterator();
- snapshotIterator = snapshotTail.iterator();
-
- currentKvsetKV = null;
- currentSnapshotKV = null;
- nextKV = null;
-
- return kvsetIterator.hasNext() || snapshotIterator.hasNext();
- }
-
- @Override
- public synchronized KeyValue peek() {
- fill();
- return nextKV;
- }
-
- @Override
- public synchronized KeyValue next() {
- fill();
- KeyValue next = nextKV;
- nextKV = null;
- return next;
- }
-
- public synchronized void close() {
- this.kvsetRef = EMPTY_SET;
- this.snapshotRef = EMPTY_SET;
- this.kvsetIterator = EMPTY_ITERATOR;
- this.snapshotIterator = EMPTY_ITERATOR;
- this.currentKvsetKV = null;
- this.currentSnapshotKV = null;
- this.nextKV = null;
- }
-}
diff --git src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
new file mode 100644
index 0000000..b1f1368
--- /dev/null
+++ src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
@@ -0,0 +1,106 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manages the read/write consistency within memstore. This provides
+ * an interface for readers to determine what entries to ignore, and
+ * a mechanism for writers to obtain new write numbers, then "commit"
+ * the new writes for readers to read (thus forming atomic transactions).
+ */
+public class ReadWriteConsistencyControl {
+ private final AtomicLong memstoreRead = new AtomicLong();
+ private final AtomicLong memstoreWrite = new AtomicLong();
+ // This is the pending queue of writes.
+ private final LinkedList writeQueue =
+ new LinkedList();
+
+ private static final ThreadLocal perThreadReadPoint =
+ new ThreadLocal();
+
+ public static long getThreadReadPoint() {
+ return perThreadReadPoint.get();
+ }
+
+ public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
+ perThreadReadPoint.set(rwcc.memstoreReadPoint());
+ return getThreadReadPoint();
+ }
+
+ public WriteEntry beginMemstoreInsert() {
+ synchronized (writeQueue) {
+ long nextWriteNumber = memstoreWrite.incrementAndGet();
+ WriteEntry e = new WriteEntry(nextWriteNumber);
+ writeQueue.add(e);
+ return e;
+ }
+ }
+ public void completeMemstoreInsert(WriteEntry e) {
+ synchronized (writeQueue) {
+ e.markCompleted();
+
+ long nextReadValue = -1;
+ boolean ranOnce=false;
+ while (!writeQueue.isEmpty()) {
+ ranOnce=true;
+ WriteEntry queueFirst = writeQueue.getFirst();
+
+ if (nextReadValue > 0) {
+ if (nextReadValue+1 != queueFirst.getWriteNumber()) {
+ throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+ + nextReadValue + " next: " + queueFirst.getWriteNumber());
+ }
+ }
+
+ if (queueFirst.isCompleted()) {
+ nextReadValue = queueFirst.getWriteNumber();
+ writeQueue.removeFirst();
+ } else {
+ break;
+ }
+ }
+
+ if (!ranOnce) {
+ throw new RuntimeException("never was a first");
+ }
+
+ if (nextReadValue > 0) {
+ memstoreRead.set(nextReadValue);
+ }
+ }
+
+ // Spin until any other concurrent puts have finished. This makes sure that
+ // if we move on to construct a scanner, we'll get read-your-own-writes
+ // consistency. We anticipate that since puts to the memstore are very fast,
+ // this will be on the order of microseconds - so spinning should be faster
+ // than a condition variable.
+ int spun = 0;
+ while (memstoreRead.get() < e.getWriteNumber()) {
+ spun++;
+ }
+ // Could potentially expose spun as a metric
+ }
+
+ public long memstoreReadPoint() {
+ return memstoreRead.get();
+ }
+
+
+ public static class WriteEntry {
+ private long writeNumber;
+ private boolean completed = false;
+ WriteEntry(long writeNumber) {
+ this.writeNumber = writeNumber;
+ }
+ void markCompleted() {
+ this.completed = true;
+ }
+ boolean isCompleted() {
+ return this.completed;
+ }
+ long getWriteNumber() {
+ return this.writeNumber;
+ }
+ }
+}
diff --git src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index a42289d..4a71876 100644
--- src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -55,7 +55,11 @@ public class ScanQueryMatcher extends QueryMatcher {
this.rowComparator = rowComparator;
this.deletes = new ScanDeleteTracker();
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
- this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow());
+ if (scan.isGetScan()) {
+ this.stopKey = KeyValue.createLastOnRow(scan.getStopRow());
+ } else {
+ this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow());
+ }
this.filter = scan.getFilter();
this.oldFilter = scan.getOldFilter();
diff --git src/java/org/apache/hadoop/hbase/regionserver/Store.java src/java/org/apache/hadoop/hbase/regionserver/Store.java
index 2a6c35f..8d07a90 100644
--- src/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ src/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -509,7 +509,7 @@ public class Store implements HConstants, HeapSize {
/**
* Snapshot this stores memstore. Call before running
- * {@link #flushCache(long)} so it has some work to do.
+ * {@link #flushCache(long, java.util.SortedSet)} so it has some work to do.
*/
void snapshot() {
this.memstore.snapshot();
@@ -614,9 +614,12 @@ public class Store implements HConstants, HeapSize {
this.lock.writeLock().lock();
try {
this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
+
+ this.memstore.clearSnapshot(set);
+
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
- this.memstore.clearSnapshot(set);
+
return this.storefiles.size() >= this.compactionThreshold;
} finally {
this.lock.writeLock().unlock();
@@ -644,10 +647,8 @@ public class Store implements HConstants, HeapSize {
* @param o Observer no longer interested in changes in set of Readers.
*/
void deleteChangedReaderObserver(ChangedReadersObserver o) {
- if(this.changedReaderObservers.size() > 0) {
- if (!this.changedReaderObservers.remove(o)) {
- LOG.warn("Not in set" + o);
- }
+ if (!this.changedReaderObservers.remove(o)) {
+ LOG.warn("Not in set" + o);
}
}
@@ -866,7 +867,6 @@ public class Store implements HConstants, HeapSize {
/**
* Do a minor/major compaction. Uses the scan infrastructure to make it easy.
*
- * @param writer output writer
* @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
* @param maxId Readers maximum sequence id.
@@ -999,6 +999,10 @@ public class Store implements HConstants, HeapSize {
Long orderVal = Long.valueOf(result.getMaxSequenceId());
this.storefiles.put(orderVal, result);
}
+
+ // WARN ugly hack here, but necessary sadly.
+ ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
+
// Tell observers that list of StoreFiles has changed.
notifyChangedReadersObservers();
// Finally, delete old store files.
@@ -1489,7 +1493,12 @@ public class Store implements HConstants, HeapSize {
}
/**
- * Increments the value for the given row/family/qualifier
+ * Increments the value for the given row/family/qualifier.
+ *
+ * This function will always be seen as atomic by other readers
+ * because it only puts a single KV to memstore. Thus no
+ * read/write control necessary.
+ *
* @param row
* @param f
* @param qualifier
diff --git src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 9e38c37..528f274 100644
--- src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,10 +45,15 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
private boolean cacheBlocks;
// Used to indicate that the scanner has closed (see HBASE-1107)
- private final AtomicBoolean closing = new AtomicBoolean(false);
+ private boolean closing = false;
+ private final boolean isGet;
/**
* Opens a scanner across memstore, snapshot, and all StoreFiles.
+ *
+ * @param store who we scan
+ * @param scan the spec
+ * @param columns which columns we are scanning
*/
StoreScanner(Store store, Scan scan, final NavigableSet columns) {
this.store = store;
@@ -58,9 +62,11 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
columns, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()));
+ this.isGet = scan.isGetScan();
List scanners = getScanners();
// Seek all scanners to the initial key
+ // TODO if scan.isGetScan, use bloomfilters to skip seeking
for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
@@ -76,10 +82,14 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
* Used for major compactions.
*
* Opens a scanner across specified StoreFiles.
+ * @param store who we scan
+ * @param scan the spec
+ * @param scanners ancilliary scanners
*/
StoreScanner(Store store, Scan scan, KeyValueScanner [] scanners) {
this.store = store;
this.cacheBlocks = false;
+ this.isGet = false;
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
null, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()));
@@ -99,6 +109,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
final NavigableSet columns,
final KeyValueScanner [] scanners) {
this.store = null;
+ this.isGet = false;
this.cacheBlocks = scan.getCacheBlocks();
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
comparator.getRawComparator(), scan.getMaxVersions());
@@ -132,7 +143,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
}
public synchronized void close() {
- this.closing.set(true);
+ this.closing = true;
// under test, we dont have a this.store
if (this.store != null)
this.store.deleteChangedReaderObserver(this);
@@ -145,11 +156,12 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
/**
* Get the next row of values from this Store.
- * @param result
+ * @param outResult
* @param limit
* @return true if there are more rows, false if scanner is done
*/
public synchronized boolean next(List outResult, int limit) throws IOException {
+ //DebugPrint.println("SS.next");
KeyValue peeked = this.heap.peek();
if (peeked == null) {
close();
@@ -160,6 +172,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
List results = new ArrayList();
LOOP: while((kv = this.heap.peek()) != null) {
QueryMatcher.MatchCode qcode = matcher.match(kv);
+ //DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
switch(qcode) {
case INCLUDE:
KeyValue next = this.heap.next();
@@ -227,8 +240,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
LOG.warn("StoreFile " + sf + " has null Reader");
continue;
}
- // Get a scanner that does not use pread.
- s.add(r.getScanner(this.cacheBlocks, false));
+ // If isGet, use pread, else false, dont use pread
+ s.add(r.getScanner(this.cacheBlocks, isGet));
}
List scanners =
new ArrayList(s.size()+1);
@@ -240,16 +253,20 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
// Implementation of ChangedReadersObserver
public synchronized void updateReaders() throws IOException {
- if (this.closing.get()) return;
+ if (this.closing) return;
KeyValue topKey = this.peek();
if (topKey == null) return;
+
List scanners = getScanners();
- // Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
scanner.seek(topKey);
}
+ // close the previous scanners:
+ this.heap.close(); // bubble thru and close all scanners.
+ this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
+
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(
scanners.toArray(new KeyValueScanner[scanners.size()]), store.comparator);
diff --git src/java/org/apache/hadoop/hbase/util/FSUtils.java src/java/org/apache/hadoop/hbase/util/FSUtils.java
index 1bddbf5..09bac3d 100644
--- src/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ src/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -367,6 +367,7 @@ public class FSUtils {
return true;
}
+ // TODO move this method OUT of FSUtils. No dependencies to HMaster
/**
* Expects to find -ROOT- directory.
* @param fs
diff --git src/test/org/apache/hadoop/hbase/TestKeyValue.java src/test/org/apache/hadoop/hbase/TestKeyValue.java
index 1b74ae1..6f56769 100644
--- src/test/org/apache/hadoop/hbase/TestKeyValue.java
+++ src/test/org/apache/hadoop/hbase/TestKeyValue.java
@@ -276,4 +276,49 @@ public class TestKeyValue extends TestCase {
// TODO actually write this test!
}
+
+ private final byte[] rowA = Bytes.toBytes("rowA");
+ private final byte[] rowB = Bytes.toBytes("rowB");
+
+ private final byte[] family = Bytes.toBytes("family");
+ private final byte[] qualA = Bytes.toBytes("qfA");
+
+ private void assertKVLess(KeyValue.KVComparator c,
+ KeyValue less,
+ KeyValue greater) {
+ int cmp = c.compare(less,greater);
+ assertTrue(cmp < 0);
+ cmp = c.compare(greater,less);
+ assertTrue(cmp > 0);
+ }
+
+ public void testFirstLastOnRow() {
+ final KVComparator c = KeyValue.COMPARATOR;
+ long ts = 1;
+
+ // These are listed in sort order (ie: every one should be less
+ // than the one on the next line).
+ final KeyValue firstOnRowA = KeyValue.createFirstOnRow(rowA);
+ final KeyValue kvA_1 = new KeyValue(rowA, null, null, ts, Type.Put);
+ final KeyValue kvA_2 = new KeyValue(rowA, family, qualA, ts, Type.Put);
+
+ final KeyValue lastOnRowA = KeyValue.createLastOnRow(rowA);
+ final KeyValue firstOnRowB = KeyValue.createFirstOnRow(rowB);
+ final KeyValue kvB = new KeyValue(rowB, family, qualA, ts, Type.Put);
+
+ assertKVLess(c, firstOnRowA, firstOnRowB);
+ assertKVLess(c, firstOnRowA, kvA_1);
+ assertKVLess(c, firstOnRowA, kvA_2);
+ assertKVLess(c, kvA_1, kvA_2);
+ assertKVLess(c, kvA_2, firstOnRowB);
+ assertKVLess(c, kvA_1, firstOnRowB);
+
+ assertKVLess(c, lastOnRowA, firstOnRowB);
+ assertKVLess(c, firstOnRowB, kvB);
+ assertKVLess(c, lastOnRowA, kvB);
+
+ assertKVLess(c, kvA_2, lastOnRowA);
+ assertKVLess(c, kvA_1, lastOnRowA);
+ assertKVLess(c, firstOnRowA, lastOnRowA);
+ }
}
diff --git src/test/org/apache/hadoop/hbase/client/TestClient.java src/test/org/apache/hadoop/hbase/client/TestClient.java
index f4403e7..8ad1898 100644
--- src/test/org/apache/hadoop/hbase/client/TestClient.java
+++ src/test/org/apache/hadoop/hbase/client/TestClient.java
@@ -67,7 +67,7 @@ public class TestClient extends HBaseClusterTestCase {
super();
}
- /**
+ /*
* Test from client side of an involved filter against a multi family that
* involves deletes.
*
@@ -196,7 +196,7 @@ public class TestClient extends HBaseClusterTestCase {
}
}
- /**
+ /*
* Test filters when multiple regions. It does counts. Needs eye-balling of
* logs to ensure that we're not scanning more regions that we're supposed to.
* Related to the TestFilterAcrossRegions over in the o.a.h.h.filter package.
@@ -253,7 +253,7 @@ public class TestClient extends HBaseClusterTestCase {
assertEquals(rowCount - endKeyCount, countGreater);
}
- /**
+ /*
* Load table with rows from 'aaa' to 'zzz'.
* @param t
* @return Count of rows loaded.
@@ -418,7 +418,7 @@ public class TestClient extends HBaseClusterTestCase {
scanner.close();
}
- /**
+ /*
* Test simple table and non-existent row cases.
*/
public void testSimpleMissing() throws Exception {
@@ -531,7 +531,7 @@ public class TestClient extends HBaseClusterTestCase {
assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
}
- /**
+ /*
* Test basic puts, gets, scans, and deletes for a single row
* in a multiple family table.
*/
@@ -1438,7 +1438,7 @@ public class TestClient extends HBaseClusterTestCase {
ht.put(put);
delete = new Delete(ROW);
- delete.deleteColumn(FAMILIES[0], QUALIFIER);
+ delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4]
ht.delete(delete);
get = new Get(ROW);
@@ -1473,23 +1473,24 @@ public class TestClient extends HBaseClusterTestCase {
// But alas, this is not to be. We can't put them back in either case.
put = new Put(ROW);
- put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
- put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]);
+ put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000
+ put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000
ht.put(put);
- // The Get returns the latest value but then does not return the
- // oldest, which was never deleted, ts[1].
-
+
+ // It used to be due to the internal implementation of Get, that
+ // the Get() call would return ts[4] UNLIKE the Scan below. With
+ // the switch to using Scan for Get this is no longer the case.
get = new Get(ROW);
get.addFamily(FAMILIES[0]);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
assertNResult(result, ROW, FAMILIES[0], QUALIFIER,
- new long [] {ts[2], ts[3], ts[4]},
- new byte[][] {VALUES[2], VALUES[3], VALUES[4]},
+ new long [] {ts[1], ts[2], ts[3]},
+ new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
0, 2);
- // The Scanner returns the previous values, the expected-unexpected behavior
+ // The Scanner returns the previous values, the expected-naive-unexpected behavior
scan = new Scan(ROW);
scan.addFamily(FAMILIES[0]);
@@ -1553,7 +1554,7 @@ public class TestClient extends HBaseClusterTestCase {
result = ht.get(get);
assertTrue("Expected 2 keys but received " + result.size(),
result.size() == 2);
- assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER,
+ assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER,
new long [] {ts[0], ts[1]},
new byte[][] {VALUES[0], VALUES[1]},
0, 1);
@@ -1591,9 +1592,8 @@ public class TestClient extends HBaseClusterTestCase {
get.addFamily(FAMILIES[2]);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
- assertTrue("Expected 1 key but received " + result.size() + ": " + result,
- result.size() == 1);
- assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
+ assertEquals(1, result.size());
+ assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
new long [] {ts[2]},
new byte[][] {VALUES[2]},
0, 0);
@@ -1603,9 +1603,8 @@ public class TestClient extends HBaseClusterTestCase {
scan.addFamily(FAMILIES[2]);
scan.setMaxVersions(Integer.MAX_VALUE);
result = getSingleScanResult(ht, scan);
- assertTrue("Expected 1 key but received " + result.size(),
- result.size() == 1);
- assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
+ assertEquals(1, result.size());
+ assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
new long [] {ts[2]},
new byte[][] {VALUES[2]},
0, 0);
@@ -1691,7 +1690,7 @@ public class TestClient extends HBaseClusterTestCase {
}
}
- /**
+ /*
* Baseline "scalability" test.
*
* Tests one hundred families, one million columns, one million versions
@@ -1738,7 +1737,7 @@ public class TestClient extends HBaseClusterTestCase {
}
- /**
+ /*
* Explicitly test JIRAs related to HBASE-880 / Client API
*/
public void testJIRAs() throws Exception {
@@ -1754,7 +1753,7 @@ public class TestClient extends HBaseClusterTestCase {
// JIRA Testers
//
- /**
+ /*
* HBASE-867
* If millions of columns in a column family, hbase scanner won't come up
*
@@ -1844,7 +1843,7 @@ public class TestClient extends HBaseClusterTestCase {
}
- /**
+ /*
* HBASE-861
* get with timestamp will return a value if there is a version with an
* earlier timestamp
@@ -1907,7 +1906,7 @@ public class TestClient extends HBaseClusterTestCase {
}
- /**
+ /*
* HBASE-33
* Add a HTable get/obtainScanner method that retrieves all versions of a
* particular column and row between two timestamps
@@ -1956,7 +1955,7 @@ public class TestClient extends HBaseClusterTestCase {
}
- /**
+ /*
* HBASE-1014
* commit(BatchUpdate) method should return timestamp
*/
@@ -1980,7 +1979,7 @@ public class TestClient extends HBaseClusterTestCase {
}
- /**
+ /*
* HBASE-1182
* Scan for columns > some timestamp
*/
@@ -2025,7 +2024,7 @@ public class TestClient extends HBaseClusterTestCase {
}
- /**
+ /*
* HBASE-52
* Add a means of scanning over all versions
*/
@@ -2423,7 +2422,7 @@ public class TestClient extends HBaseClusterTestCase {
- /**
+ /*
* Verify a single column using gets.
* Expects family and qualifier arrays to be valid for at least
* the range: idx-2 < idx < idx+2
@@ -2480,7 +2479,7 @@ public class TestClient extends HBaseClusterTestCase {
}
- /**
+ /*
* Verify a single column using scanners.
* Expects family and qualifier arrays to be valid for at least
* the range: idx-2 to idx+2
@@ -2542,11 +2541,11 @@ public class TestClient extends HBaseClusterTestCase {
}
- /**
+ /*
* Verify we do not read any values by accident around a single column
* Same requirements as getVerifySingleColumn
*/
- private void getVerifySingleEmpty(HTable ht,
+ private void getVerifySingleEmpty(HTable ht,
byte [][] ROWS, int ROWIDX,
byte [][] FAMILIES, int FAMILYIDX,
byte [][] QUALIFIERS, int QUALIFIERIDX)
@@ -2668,12 +2667,11 @@ public class TestClient extends HBaseClusterTestCase {
"Got row [" + Bytes.toString(result.getRow()) +"]",
equals(row, result.getRow()));
int expectedResults = end - start + 1;
- assertTrue("Expected " + expectedResults + " keys but result contains "
- + result.size(), result.size() == expectedResults);
-
+ assertEquals(expectedResults, result.size());
+
KeyValue [] keys = result.sorted();
- for(int i=0;i kvs = new ArrayList();
kvs.add(new KeyValue(row1, fam4, null, null));
@@ -1439,6 +1465,41 @@ public class TestHRegion extends HBaseTestCase {
assertICV(row, fam1, qual1, value+amount);
}
+ public void testIncrementColumnValue_BumpSnapshot() throws IOException {
+ initHRegion(tableName, getName(), fam1);
+
+ long value = 42L;
+ long incr = 44L;
+
+ // first put something in kvset, then snapshot it.
+ Put put = new Put(row);
+ put.add(fam1, qual1, Bytes.toBytes(value));
+ region.put(put);
+
+ // get the store in question:
+ Store s = region.getStore(fam1);
+ s.snapshot(); //bam
+
+ // now increment:
+ long newVal = region.incrementColumnValue(row, fam1, qual1,
+ incr, false);
+
+ assertEquals(value+incr, newVal);
+
+ // get both versions:
+ Get get = new Get(row);
+ get.setMaxVersions();
+ get.addColumn(fam1,qual1);
+
+ Result r = region.get(get, null);
+ assertEquals(2, r.size());
+ KeyValue first = r.raw()[0];
+ KeyValue second = r.raw()[1];
+
+ assertTrue("ICV failed to upgrade timestamp",
+ first.getTimestamp() != second.getTimestamp());
+ }
+
public void testIncrementColumnValue_ConcurrentFlush() throws IOException {
initHRegion(tableName, getName(), fam1);
@@ -1652,7 +1713,7 @@ public class TestHRegion extends HBaseTestCase {
assertEquals(expected.get(i), actual.get(i));
}
}
-
+
//////////////////////////////////////////////////////////////////////////////
// Split test
//////////////////////////////////////////////////////////////////////////////
@@ -1935,9 +1996,9 @@ public class TestHRegion extends HBaseTestCase {
FlushThread flushThread = new FlushThread();
flushThread.start();
- Scan scan = new Scan();
- scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
- new BinaryComparator(Bytes.toBytes("row0"))));
+ Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
+// scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
+// new BinaryComparator(Bytes.toBytes("row0"))));
int expectedCount = numFamilies * numQualifiers;
List res = new ArrayList();
@@ -1950,7 +2011,7 @@ public class TestHRegion extends HBaseTestCase {
}
if (i != 0 && i % flushInterval == 0) {
- //System.out.println("scan iteration = " + i);
+ //System.out.println("flush scan iteration = " + i);
flushThread.flush();
}
@@ -1959,9 +2020,10 @@ public class TestHRegion extends HBaseTestCase {
InternalScanner scanner = region.getScanner(scan);
while (scanner.next(res)) ;
if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
- Assert.assertEquals("i=" + i, expectedCount, res.size());
+ assertEquals("i=" + i, expectedCount, res.size());
long timestamp = res.get(0).getTimestamp();
- Assert.assertTrue(timestamp >= prevTimestamp);
+ assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
+ timestamp >= prevTimestamp);
prevTimestamp = timestamp;
}
}
@@ -2011,15 +2073,16 @@ public class TestHRegion extends HBaseTestCase {
for (int r = 0; r < numRows; r++) {
byte[] row = Bytes.toBytes("row" + r);
Put put = new Put(row);
- for (int f = 0; f < families.length; f++) {
- for (int q = 0; q < qualifiers.length; q++) {
- put.add(families[f], qualifiers[q], (long) val,
- Bytes.toBytes(val));
+ for (byte[] family : families) {
+ for (byte[] qualifier : qualifiers) {
+ put.add(family, qualifier, (long) val,
+ Bytes.toBytes(val));
}
}
+// System.out.println("Putting of kvsetsize=" + put.size());
region.put(put);
- if (val > 0 && val % 47 == 0){
- //System.out.println("put iteration = " + val);
+ if (val > 0 && val % 47 == 0) {
+ System.out.println("put iteration = " + val);
Delete delete = new Delete(row, (long)val-30, null);
region.delete(delete, null, true);
}
diff --git src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
index 1e602e7..28a01a2 100644
--- src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
+++ src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
@@ -46,10 +47,12 @@ public class TestMemStore extends TestCase {
private static final byte [] FAMILY = Bytes.toBytes("column");
private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic");
private static final String CONTENTSTR = "contentstr";
+ private ReadWriteConsistencyControl rwcc;
@Override
public void setUp() throws Exception {
super.setUp();
+ this.rwcc = new ReadWriteConsistencyControl();
this.memstore = new MemStore();
}
@@ -75,6 +78,7 @@ public class TestMemStore extends TestCase {
KeyValueScanner [] memstorescanners = this.memstore.getScanners();
Scan scan = new Scan();
List result = new ArrayList();
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memstore.comparator, null, memstorescanners);
int count = 0;
@@ -93,6 +97,8 @@ public class TestMemStore extends TestCase {
for (int i = 0; i < memstorescanners.length; i++) {
memstorescanners[0].close();
}
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
@@ -137,9 +143,9 @@ public class TestMemStore extends TestCase {
if (count == snapshotIndex) {
this.memstore.snapshot();
this.memstore.clearSnapshot(this.memstore.getSnapshot());
- // Added more rows into kvset.
+ // Added more rows into kvset. But the scanner wont see these rows.
addRows(this.memstore, ts);
- LOG.info("Snapshotted, cleared it and then added values");
+ LOG.info("Snapshotted, cleared it and then added values (which wont be seen)");
}
result.clear();
}
@@ -149,6 +155,181 @@ public class TestMemStore extends TestCase {
assertEquals(rowCount, count);
}
+ /**
+ * A simple test which verifies the 3 possible states when scanning across snapshot.
+ */
+ public void testScanAcrossSnapshot2() {
+ // we are going to the scanning across snapshot with two kvs
+ // kv1 should always be returned before kv2
+ final byte[] one = Bytes.toBytes(1);
+ final byte[] two = Bytes.toBytes(2);
+ final byte[] f = Bytes.toBytes("f");
+ final byte[] q = Bytes.toBytes("q");
+ final byte[] v = Bytes.toBytes(3);
+
+ final KeyValue kv1 = new KeyValue(one, f, q, v);
+ final KeyValue kv2 = new KeyValue(two, f, q, v);
+
+ // use case 1: both kvs in kvset
+ this.memstore.add(kv1.clone());
+ this.memstore.add(kv2.clone());
+ verifyScanAcrossSnapshot2(kv1, kv2);
+
+ // use case 2: both kvs in snapshot
+ this.memstore.snapshot();
+ verifyScanAcrossSnapshot2(kv1, kv2);
+
+ // use case 3: first in snapshot second in kvset
+ this.memstore = new MemStore();
+ this.memstore.add(kv1.clone());
+ this.memstore.snapshot();
+ this.memstore.add(kv2.clone());
+ verifyScanAcrossSnapshot2(kv1, kv2);
+ }
+
+ private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) {
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ KeyValueScanner[] memstorescanners = this.memstore.getScanners();
+ assertEquals(1, memstorescanners.length);
+ final KeyValueScanner scanner = memstorescanners[0];
+ scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW));
+ assertEquals(kv1, scanner.next());
+ assertEquals(kv2, scanner.next());
+ assertNull(scanner.next());
+ }
+
+ private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) {
+ scanner.seek(KeyValue.createFirstOnRow(new byte[]{}));
+ for (KeyValue kv : expected) {
+ assertTrue(0 ==
+ KeyValue.COMPARATOR.compare(kv,
+ scanner.next()));
+ }
+ assertNull(scanner.peek());
+ }
+
+ public void testMemstoreConcurrentControl() {
+ final byte[] row = Bytes.toBytes(1);
+ final byte[] f = Bytes.toBytes("family");
+ final byte[] q1 = Bytes.toBytes("q1");
+ final byte[] q2 = Bytes.toBytes("q2");
+ final byte[] v = Bytes.toBytes("value");
+
+ ReadWriteConsistencyControl.WriteEntry w =
+ rwcc.beginMemstoreInsert();
+
+ KeyValue kv1 = new KeyValue(row, f, q1, v);
+ kv1.setMemstoreTS(w.getWriteNumber());
+ memstore.add(kv1);
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ KeyValueScanner[] s = this.memstore.getScanners();
+ assertScannerResults(s[0], new KeyValue[]{});
+
+ rwcc.completeMemstoreInsert(w);
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ s = this.memstore.getScanners();
+ assertScannerResults(s[0], new KeyValue[]{kv1});
+
+ w = rwcc.beginMemstoreInsert();
+ KeyValue kv2 = new KeyValue(row, f, q2, v);
+ kv2.setMemstoreTS(w.getWriteNumber());
+ memstore.add(kv2);
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ s = this.memstore.getScanners();
+ assertScannerResults(s[0], new KeyValue[]{kv1});
+
+ rwcc.completeMemstoreInsert(w);
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+ s = this.memstore.getScanners();
+ assertScannerResults(s[0], new KeyValue[]{kv1, kv2});
+ }
+
+ private static class ReadOwnWritesTester extends Thread {
+ final int id;
+ static final int NUM_TRIES = 1000;
+
+ final byte[] row;
+
+ final byte[] f = Bytes.toBytes("family");
+ final byte[] q1 = Bytes.toBytes("q1");
+
+ final ReadWriteConsistencyControl rwcc;
+ final MemStore memstore;
+
+ AtomicReference caughtException;
+
+
+ public ReadOwnWritesTester(int id,
+ MemStore memstore,
+ ReadWriteConsistencyControl rwcc,
+ AtomicReference caughtException)
+ {
+ this.id = id;
+ this.rwcc = rwcc;
+ this.memstore = memstore;
+ this.caughtException = caughtException;
+ row = Bytes.toBytes(id);
+ }
+
+ public void run() {
+ try {
+ internalRun();
+ } catch (Throwable t) {
+ caughtException.compareAndSet(null, t);
+ }
+ }
+
+ private void internalRun() {
+ for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
+ ReadWriteConsistencyControl.WriteEntry w =
+ rwcc.beginMemstoreInsert();
+
+ // Insert the sequence value (i)
+ byte[] v = Bytes.toBytes(i);
+
+ KeyValue kv = new KeyValue(row, f, q1, i, v);
+ kv.setMemstoreTS(w.getWriteNumber());
+ memstore.add(kv);
+ rwcc.completeMemstoreInsert(w);
+
+ // Assert that we can read back
+
+ KeyValueScanner s = this.memstore.getScanners()[0];
+ s.seek(kv);
+
+ KeyValue ret = s.next();
+ assertNotNull("Didnt find own write at all", ret);
+ assertEquals("Didnt read own writes",
+ kv.getTimestamp(), ret.getTimestamp());
+ }
+ }
+ }
+
+ public void no_testReadOwnWritesUnderConcurrency() throws Throwable {
+
+ int NUM_THREADS = 8;
+
+ ReadOwnWritesTester threads[] = new ReadOwnWritesTester[NUM_THREADS];
+ AtomicReference caught = new AtomicReference();
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught);
+ threads[i].start();
+ }
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ threads[i].join();
+ }
+
+ if (caught.get() != null) {
+ throw caught.get();
+ }
+ }
+
/**
* Test memstore snapshots
* @throws IOException
@@ -442,9 +623,10 @@ public class TestMemStore extends TestCase {
List expected = new ArrayList();
expected.add(put3);
expected.add(del2);
+ expected.add(put2);
expected.add(put1);
-
- assertEquals(3, memstore.kvset.size());
+
+ assertEquals(4, memstore.kvset.size());
int i = 0;
for(KeyValue kv : memstore.kvset) {
assertEquals(expected.get(i++), kv);
@@ -476,8 +658,11 @@ public class TestMemStore extends TestCase {
List expected = new ArrayList();
expected.add(put3);
expected.add(del2);
+ expected.add(put2);
+ expected.add(put1);
+
- assertEquals(2, memstore.kvset.size());
+ assertEquals(4, memstore.kvset.size());
int i = 0;
for (KeyValue kv: memstore.kvset) {
assertEquals(expected.get(i++), kv);
@@ -510,9 +695,14 @@ public class TestMemStore extends TestCase {
List expected = new ArrayList();
expected.add(del);
+ expected.add(put1);
+ expected.add(put2);
expected.add(put4);
+ expected.add(put3);
+
+
- assertEquals(2, memstore.kvset.size());
+ assertEquals(5, memstore.kvset.size());
int i = 0;
for (KeyValue kv: memstore.kvset) {
assertEquals(expected.get(i++), kv);
@@ -528,7 +718,7 @@ public class TestMemStore extends TestCase {
memstore.add(new KeyValue(row, fam, qf, ts, val));
KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
memstore.delete(delete);
- assertEquals(1, memstore.kvset.size());
+ assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
@@ -541,7 +731,7 @@ public class TestMemStore extends TestCase {
"row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
memstore.delete(delete);
- assertEquals(1, memstore.kvset.size());
+ assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
public void testRetainsDeleteColumn() throws IOException {
@@ -553,7 +743,7 @@ public class TestMemStore extends TestCase {
KeyValue.Type.DeleteColumn, "dont-care");
memstore.delete(delete);
- assertEquals(1, memstore.kvset.size());
+ assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
public void testRetainsDeleteFamily() throws IOException {
@@ -565,7 +755,7 @@ public class TestMemStore extends TestCase {
KeyValue.Type.DeleteFamily, "dont-care");
memstore.delete(delete);
- assertEquals(1, memstore.kvset.size());
+ assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
@@ -573,13 +763,13 @@ public class TestMemStore extends TestCase {
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
- private byte [] makeQualifier(final int i1, final int i2){
+ private static byte [] makeQualifier(final int i1, final int i2){
return Bytes.toBytes(Integer.toString(i1) + ";" +
Integer.toString(i2));
}
/**
- * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
+ * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
* @param hmc Instance to add rows to.
* @return How many rows we added.
* @throws IOException
@@ -589,7 +779,7 @@ public class TestMemStore extends TestCase {
}
/**
- * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
+ * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
* @param hmc Instance to add rows to.
* @return How many rows we added.
* @throws IOException
@@ -643,4 +833,57 @@ public class TestMemStore extends TestCase {
return new KeyValue(row, Bytes.toBytes("test_col:"),
HConstants.LATEST_TIMESTAMP, value);
}
+ private static void addRows(int count, final MemStore mem) {
+ long nanos = System.nanoTime();
+
+ for (int i = 0 ; i < count ; i++) {
+ if (i % 1000 == 0) {
+
+ System.out.println(i + " Took for 1k usec: " + (System.nanoTime() - nanos)/1000);
+ nanos = System.nanoTime();
+ }
+ long timestamp = System.currentTimeMillis();
+
+ for (int ii = 0; ii < QUALIFIER_COUNT ; ii++) {
+ byte [] row = Bytes.toBytes(i);
+ byte [] qf = makeQualifier(i, ii);
+ mem.add(new KeyValue(row, FAMILY, qf, timestamp, qf));
+ }
+ }
+ }
+
+
+ static void doScan(MemStore ms, int iteration) {
+ long nanos = System.nanoTime();
+ KeyValueScanner [] ss = ms.getScanners();
+ KeyValueScanner s = ss[0];
+ s.seek(KeyValue.createFirstOnRow(new byte[]{}));
+
+ System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000);
+ int cnt=0;
+ while(s.next() != null) ++cnt;
+
+ System.out.println(iteration + " took usec: " + (System.nanoTime() - nanos)/1000 + " for: " + cnt);
+
+ }
+
+ public static void main(String [] args) {
+ ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+ MemStore ms = new MemStore();
+
+ long n1 = System.nanoTime();
+ addRows(25000, ms);
+ System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000);
+
+
+ System.out.println("foo");
+
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+ for (int i = 0 ; i < 50 ; i++)
+ doScan(ms, i);
+
+ }
+
+
}
diff --git src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
new file mode 100644
index 0000000..78fe59c
--- /dev/null
+++ src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
@@ -0,0 +1,109 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestReadWriteConsistencyControl extends TestCase {
+ static class Writer implements Runnable {
+ final AtomicBoolean finished;
+ final ReadWriteConsistencyControl rwcc;
+ final AtomicBoolean status;
+
+ Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) {
+ this.finished = finished;
+ this.rwcc = rwcc;
+ this.status = status;
+ }
+ private Random rnd = new Random();
+ public boolean failed = false;
+
+ public void run() {
+ while (!finished.get()) {
+ ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert();
+// System.out.println("Begin write: " + e.getWriteNumber());
+ // 10 usec - 500usec (including 0)
+ int sleepTime = rnd.nextInt(500);
+ // 500 * 1000 = 500,000ns = 500 usec
+ // 1 * 100 = 100ns = 1usec
+ try {
+ if (sleepTime > 0)
+ Thread.sleep(0, sleepTime * 1000);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ rwcc.completeMemstoreInsert(e);
+ } catch (RuntimeException ex) {
+ // got failure
+ System.out.println(ex.toString());
+ ex.printStackTrace();
+ status.set(false);
+ return;
+ // Report failure if possible.
+ }
+ }
+ }
+ }
+
+ public void testParallelism() throws Exception {
+ final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+
+ final AtomicBoolean finished = new AtomicBoolean(false);
+
+ // fail flag for the reader thread
+ final AtomicBoolean readerFailed = new AtomicBoolean(false);
+ final AtomicLong failedAt = new AtomicLong();
+ Runnable reader = new Runnable() {
+ public void run() {
+ long prev = rwcc.memstoreReadPoint();
+ while (!finished.get()) {
+ long newPrev = rwcc.memstoreReadPoint();
+ if (newPrev < prev) {
+ // serious problem.
+ System.out.println("Reader got out of order, prev: " +
+ prev + " next was: " + newPrev);
+ readerFailed.set(true);
+ // might as well give up
+ failedAt.set(newPrev);
+ return;
+ }
+ }
+ }
+ };
+
+ // writer thread parallelism.
+ int n = 20;
+ Thread [] writers = new Thread[n];
+ AtomicBoolean [] statuses = new AtomicBoolean[n];
+ Thread readThread = new Thread(reader);
+
+ for (int i = 0 ; i < n ; ++i ) {
+ statuses[i] = new AtomicBoolean(true);
+ writers[i] = new Thread(new Writer(finished, rwcc, statuses[i]));
+ writers[i].start();
+ }
+ readThread.start();
+
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException ex) {
+ }
+
+ finished.set(true);
+
+ readThread.join();
+ for (int i = 0; i < n; ++i) {
+ writers[i].join();
+ }
+
+ // check failure.
+ assertFalse(readerFailed.get());
+ for (int i = 0; i < n; ++i) {
+ assertTrue(statuses[i].get());
+ }
+
+
+ }
+}
diff --git src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index f1ec15b..76ab7b5 100644
--- src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,24 +20,23 @@
package org.apache.hadoop.hbase.regionserver;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
import junit.framework.TestCase;
-
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
public class TestStoreScanner extends TestCase {
private final String CF_STR = "cf";
final byte [] CF = Bytes.toBytes(CF_STR);
- /**
+ /*
* Test utility for building a NavigableSet for scanners.
* @param strCols
* @return
@@ -128,7 +127,7 @@ public class TestStoreScanner extends TestCase {
assertEquals(kvs[0], results.get(0));
}
- /**
+ /*
* Test test shows exactly how the matcher's return codes confuses the StoreScanner
* and prevent it from doing the right thing. Seeking once, then nexting twice
* should return R1, then R2, but in this case it doesnt.
@@ -189,7 +188,7 @@ public class TestStoreScanner extends TestCase {
assertEquals(0, results.size());
}
- /**
+ /*
* Test the case where there is a delete row 'in front of' the next row, the scanner
* will move to the next row.
*/
@@ -408,7 +407,7 @@ public class TestStoreScanner extends TestCase {
assertEquals(false, scan.next(results));
}
- /**
+ /*
* Test expiration of KeyValues in combination with a configured TTL for
* a column family (as should be triggered in a major compaction).
*/