diff --git src/java/org/apache/hadoop/hbase/KeyValue.java src/java/org/apache/hadoop/hbase/KeyValue.java index 81a6ccd..f2d8538 100644 --- src/java/org/apache/hadoop/hbase/KeyValue.java +++ src/java/org/apache/hadoop/hbase/KeyValue.java @@ -200,6 +200,23 @@ public class KeyValue implements Writable, HeapSize { private int offset = 0; private int length = 0; + /** Here be dragons **/ + + // used to achieve atomic operations in the memstore. + public long getMemstoreTS() { + return memstoreTS; + } + + public void setMemstoreTS(long memstoreTS) { + this.memstoreTS = memstoreTS; + } + + // default value is 0, aka DNC + private long memstoreTS = 0; + + /** Dragon time over, return to normal business */ + + /** Writable Constructor -- DO NOT USE */ public KeyValue() {} @@ -1503,6 +1520,21 @@ public class KeyValue implements Writable, HeapSize { } /** + * Creates a KeyValue that is last on the specified row id. That is, + * every other possible KeyValue for the given row would compareTo() + * less than the result of this call. + * @param row row key + * @return Last possible KeyValue on passed row + */ + public static KeyValue createLastOnRow(final byte[] row) { + return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum); + } + + /** + * Create a KeyValue that is smaller than all other possible KeyValues + * for the given row. That is any (valid) KeyValue on 'row' would sort + * _after_ the result. + * * @param row - row key (arbitrary byte array) * @return First possible KeyValue on passed row */ @@ -1511,6 +1543,8 @@ public class KeyValue implements Writable, HeapSize { } /** + * Creates a KeyValue that is smaller than all other KeyValues that + * are older than the passed timestamp. * @param row - row key (arbitrary byte array) * @param ts - timestamp * @return First possible key on passed row and timestamp. @@ -1522,8 +1556,11 @@ public class KeyValue implements Writable, HeapSize { /** * @param row - row key (arbitrary byte array) + * @param c column - {@link #parseColumn(byte[])} is called to split + * the column. * @param ts - timestamp * @return First possible key on passed row, column and timestamp + * @deprecated */ public static KeyValue createFirstOnRow(final byte [] row, final byte [] c, final long ts) { @@ -1532,14 +1569,17 @@ public class KeyValue implements Writable, HeapSize { } /** + * Create a KeyValue for the specified row, family and qualifier that would be + * smaller than all other possible KeyValues that have the same row,family,qualifier. + * Used for seeking. * @param row - row key (arbitrary byte array) - * @param f - family name - * @param q - column qualifier + * @param family - family name + * @param qualifier - column qualifier * @return First possible key on passed row, and column. */ - public static KeyValue createFirstOnRow(final byte [] row, final byte [] f, - final byte [] q) { - return new KeyValue(row, f, q, HConstants.LATEST_TIMESTAMP, Type.Maximum); + public static KeyValue createFirstOnRow(final byte [] row, final byte [] family, + final byte [] qualifier) { + return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum); } /** @@ -1706,9 +1746,6 @@ public class KeyValue implements Writable, HeapSize { return compare; } - // if row matches, and no column in the 'left' AND put type is 'minimum', - // then return that left is larger than right. - // Compare column family. Start compare past row and family length. int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset; int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset; @@ -1717,17 +1754,25 @@ public class KeyValue implements Writable, HeapSize { int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE - (rcolumnoffset - roffset); + // if row matches, and no column in the 'left' AND put type is 'minimum', + // then return that left is larger than right. + // This supports 'last key on a row' - the magic is if there is no column in the // left operand, and the left operand has a type of '0' - magical value, // then we say the left is bigger. This will let us seek to the last key in // a row. byte ltype = left[loffset + (llength - 1)]; + byte rtype = right[roffset + (rlength - 1)]; if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) { return 1; // left is bigger. } + if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) { + return -1; + } + // TODO the family and qualifier should be compared separately compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right, rcolumnoffset, rcolumnlength); if (compare != 0) { @@ -1749,9 +1794,6 @@ public class KeyValue implements Writable, HeapSize { if (!this.ignoreType) { // Compare types. Let the delete types sort ahead of puts; i.e. types // of higher numbers sort before those of lesser numbers - - // ltype is defined above - byte rtype = right[roffset + (rlength - 1)]; return (0xff & rtype) - (0xff & ltype); } return 0; @@ -1791,7 +1833,8 @@ public class KeyValue implements Writable, HeapSize { public long heapSize() { return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE + ClassSize.align(ClassSize.ARRAY + length) + - (2 * Bytes.SIZEOF_INT)); + (2 * Bytes.SIZEOF_INT) + + Bytes.SIZEOF_LONG); } // this overload assumes that the length bytes have already been read, diff --git src/java/org/apache/hadoop/hbase/client/HTable.java src/java/org/apache/hadoop/hbase/client/HTable.java index 3b7b3c0..2bc571c 100644 --- src/java/org/apache/hadoop/hbase/client/HTable.java +++ src/java/org/apache/hadoop/hbase/client/HTable.java @@ -660,6 +660,7 @@ public class HTable { */ public void close() throws IOException{ flushCommits(); + this.pool.shutdownNow(); } /** diff --git src/java/org/apache/hadoop/hbase/client/Scan.java src/java/org/apache/hadoop/hbase/client/Scan.java index 20db2a8..eddefad 100644 --- src/java/org/apache/hadoop/hbase/client/Scan.java +++ src/java/org/apache/hadoop/hbase/client/Scan.java @@ -168,10 +168,29 @@ public class Scan implements Writable { } /** + * Builds a scan object with the same specs as get. + * @param get get to model scan after + */ + public Scan(Get get) { + this.startRow = get.getRow(); + this.stopRow = get.getRow(); + this.filter = get.getFilter(); + this.maxVersions = get.getMaxVersions(); + this.tr = get.getTimeRange(); + this.familyMap = get.getFamilyMap(); + } + + public boolean isGetScan() { + return this.startRow != null && this.startRow.length > 0 && + Bytes.equals(this.startRow, this.stopRow); + } + + /** * Get all columns from the specified family. *

* Overrides previous calls to addColumn for this family. * @param family family name + * @return this */ public Scan addFamily(byte [] family) { familyMap.remove(family); @@ -185,6 +204,7 @@ public class Scan implements Writable { * Overrides previous calls to addFamily for this family. * @param family family name * @param qualifier column qualifier + * @return this */ public Scan addColumn(byte [] family, byte [] qualifier) { NavigableSet set = familyMap.get(family); diff --git src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java index e1dbddc..03dd83b 100644 --- src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java +++ src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java @@ -178,9 +178,6 @@ public class SingleColumnValueFilter implements Filter { // byte array copy? int compareResult = this.comparator.compareTo(Arrays.copyOfRange(data, offset, offset + length)); - if (LOG.isDebugEnabled()) { - LOG.debug("compareResult=" + compareResult + " " + Bytes.toString(data, offset, length)); - } switch (this.compareOp) { case LESS: return compareResult <= 0; diff --git src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java new file mode 100644 index 0000000..f0ec0b7 --- /dev/null +++ src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java @@ -0,0 +1,50 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.FileWriter; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class DebugPrint { + +private static final AtomicBoolean enabled = new AtomicBoolean(false); + private static final Object sync = new Object(); + public static StringBuilder out = new StringBuilder(); + + static public void enable() { + enabled.set(true); + } + static public void disable() { + enabled.set(false); + } + + static public void reset() { + synchronized (sync) { + enable(); // someone wants us enabled basically. + + out = new StringBuilder(); + } + } + static public void dumpToFile(String file) throws IOException { + FileWriter f = new FileWriter(file); + synchronized (sync) { + f.write(out.toString()); + } + f.close(); + } + + public static void println(String m) { + if (!enabled.get()) { + System.out.println(m); + return; + } + + synchronized (sync) { + String threadName = Thread.currentThread().getName(); + out.append("<"); + out.append(threadName); + out.append("> "); + out.append(m); + out.append("\n"); + } + } +} \ No newline at end of file diff --git src/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9b1e6b8..c1ac1b4 100644 --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1,5 +1,5 @@ /** - * Copyright 2009 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -67,10 +67,11 @@ package org.apache.hadoop.hbase.regionserver; import java.util.HashMap; import java.util.HashSet; import java.util.Random; + import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; + import java.util.concurrent.locks.ReentrantReadWriteLock; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -176,6 +177,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ /** * Set flags that make this region read-only. + * + * @param onOff flip value for region r/o setting */ synchronized void setReadOnly(final boolean onOff) { this.writesEnabled = !onOff; @@ -191,7 +194,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } - private volatile WriteState writestate = new WriteState(); + private final WriteState writestate = new WriteState(); final long memstoreFlushSize; private volatile long lastFlushTime; @@ -210,7 +213,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ private final Object splitLock = new Object(); private long minSequenceId; private boolean splitRequest; - + + private final ReadWriteConsistencyControl rwcc = + new ReadWriteConsistencyControl(); + /** * Name of the region info file that resides just under the region directory. */ @@ -296,9 +302,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * Initialize this region and get it ready to roll. * Called after construction. * - * @param initialFiles - * @param reporter - * @throws IOException + * @param initialFiles path + * @param reporter progressable + * @throws IOException e */ public void initialize(Path initialFiles, final Progressable reporter) throws IOException { @@ -436,6 +442,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ return this.closing.get(); } + public ReadWriteConsistencyControl getRWCC() { + return rwcc; + } + /** * Close down this HRegion. Flush the cache, shut down each HStore, don't * service any more calls. @@ -447,7 +457,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * HStores make use of. It's a list of all HStoreFile objects. Returns empty * vector if already closed and null if judged that it should not close. * - * @throws IOException + * @throws IOException e */ public List close() throws IOException { return close(false); @@ -465,7 +475,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * HStores make use of. It's a list of HStoreFile objects. Can be null if * we are not to close at this time or we are already closed. * - * @throws IOException + * @throws IOException e */ public List close(final boolean abort) throws IOException { if (isClosed()) { @@ -580,6 +590,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } /** @return the last time the region was flushed */ + @SuppressWarnings({"UnusedDeclaration"}) public long getLastFlushTime() { return this.lastFlushTime; } @@ -681,8 +692,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null); moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir()); - HRegion regions[] = new HRegion [] {regionA, regionB}; - return regions; + return new HRegion [] {regionA, regionB}; } } @@ -756,7 +766,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * server does them sequentially and not in parallel. * * @return mid key if split is needed - * @throws IOException + * @throws IOException e */ public byte [] compactStores() throws IOException { boolean majorCompaction = this.forceMajorCompaction; @@ -777,7 +787,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * * @param majorCompaction True to force a major compaction regardless of thresholds * @return split row if split is needed - * @throws IOException + * @throws IOException e */ byte [] compactStores(final boolean majorCompaction) throws IOException { @@ -846,7 +856,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * * @return true if cache was flushed * - * @throws IOException + * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ @@ -912,7 +922,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * * @return true if the region needs compacting * - * @throws IOException + * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ @@ -940,18 +950,28 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // during the flush long sequenceId = -1L; long completeSequenceId = -1L; + + // we have to take a write lock during snapshot, or else a write could + // end up in both snapshot and memstore (makes it difficult to do atomic + // rows then) this.updatesLock.writeLock().lock(); - // Get current size of memstores. final long currentMemStoreSize = this.memstoreSize.get(); - List storeFlushers = new ArrayList(); + List storeFlushers = new ArrayList(stores.size()); try { sequenceId = log.startCacheFlush(); completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); - // create the store flushers + for (Store s : stores.values()) { storeFlushers.add(s.getStoreFlusher(completeSequenceId)); } + // This thread is going to cause a whole bunch of scanners to reseek. + // They are depending + // on a thread-local to know where to read from. + // The reason why we set it up high is so that each HRegionScanner only + // has a single read point for all its sub-StoreScanners. + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + // prepare flush (take a snapshot) for (StoreFlusher flusher: storeFlushers) { flusher.prepare(); @@ -960,6 +980,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this.updatesLock.writeLock().unlock(); } + LOG.debug("Finished snapshotting, commencing flushing stores"); + // Any failure from here on out will be catastrophic requiring server // restart so hlog content can be replayed and put back into the memstore. // Otherwise, the snapshot content while backed up in the hlog, it will not @@ -973,13 +995,28 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ flusher.flushCache(); } - internalPreFlashcacheCommit(); + Callable atomicWork = internalPreFlushcacheCommit(); + + LOG.debug("Caches flushed, doing commit now (which includes update scanners)"); /** - * Switch between memstore and the new store file + * Switch between memstore(snapshot) and the new store file */ - this.newScannerLock.writeLock().lock(); + if (atomicWork != null) { + LOG.debug("internalPreFlushcacheCommit gives us work to do, acquiring newScannerLock"); + newScannerLock.writeLock().lock(); + } + try { + // update this again to make sure we are 'fresh' + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + + if (atomicWork != null) { + atomicWork.call(); + } + + // Switch snapshot (in memstore) -> new hfile (thus causing + // all the store scanners to reset/reseek). for (StoreFlusher flusher : storeFlushers) { boolean needsCompaction = flusher.commit(); if (needsCompaction) { @@ -987,10 +1024,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } } finally { - this.newScannerLock.writeLock().unlock(); + if (atomicWork != null) { + newScannerLock.writeLock().unlock(); + } } - // clear the stireFlushers list storeFlushers.clear(); // Set down the memstore size by amount of flush. this.memstoreSize.addAndGet(-currentMemStoreSize); @@ -1040,9 +1078,14 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * A hook for sub classed wishing to perform operations prior to the cache * flush commit stage. * + * If a subclass wishes that an atomic update of their work and the + * flush commit stage happens, they should return a callable. The new scanner + * lock will be acquired and released. + * @throws java.io.IOException allow children to throw exception */ - protected void internalPreFlashcacheCommit() throws IOException { + protected Callable internalPreFlushcacheCommit() throws IOException { + return null; } /** @@ -1080,9 +1123,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * ts. * * @param row row key - * @param family + * @param family column family to find on * @return map of values - * @throws IOException + * @throws IOException read exceptions */ public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException { @@ -1099,11 +1142,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ if (key == null) { return null; } - // This will get all results for this store. TODO: Do we need to do this? Get get = new Get(key.getRow()); - List results = new ArrayList(); - store.get(get, null, results); - return new Result(results); + get.addFamily(family); + return get(get, null); } finally { splitsAndClosesLock.readLock().unlock(); } @@ -1117,7 +1158,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * * @param scan configured {@link Scan} * @return InternalScanner - * @throws IOException + * @throws IOException read exceptions */ public InternalScanner getScanner(Scan scan) throws IOException { @@ -1155,24 +1196,23 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // set() methods for client use. ////////////////////////////////////////////////////////////////////////////// /** - * @param delete - * @param lockid - * @param writeToWAL - * @throws IOException + * @param delete delete object + * @param lockid existing lock id, or null for grab a lock + * @param writeToWAL append to the write ahead lock or not + * @throws IOException read exceptions */ public void delete(Delete delete, Integer lockid, boolean writeToWAL) throws IOException { checkReadOnly(); checkResources(); Integer lid = null; - newScannerLock.writeLock().lock(); splitsAndClosesLock.readLock().lock(); try { byte [] row = delete.getRow(); // If we did not pass an existing row lock, obtain a new one lid = getLock(lockid, row); - //Check to see if this is a deleteRow insert + // Check to see if this is a deleteRow insert if(delete.getFamilyMap().isEmpty()){ for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){ // Don't eat the timestamp @@ -1193,7 +1233,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } finally { if(lockid == null) releaseRowLock(lid); splitsAndClosesLock.readLock().unlock(); - newScannerLock.writeLock().unlock(); } } @@ -1208,7 +1247,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ long now = System.currentTimeMillis(); byte [] byteNow = Bytes.toBytes(now); boolean flush = false; - this.updatesLock.readLock().lock(); + + updatesLock.readLock().lock(); + ReadWriteConsistencyControl.WriteEntry w = null; try { @@ -1225,21 +1266,21 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ if (kv.isLatestTimestamp() && kv.isDeleteType()) { byte[] qual = kv.getQualifier(); if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY; + Integer count = kvCount.get(qual); if (count == null) { - kvCount.put(qual, new Integer(1)); + kvCount.put(qual, 1); } else { - kvCount.put(qual, new Integer(count+1)); + kvCount.put(qual, count + 1); } count = kvCount.get(qual); - List result = new ArrayList(1); - Get g = new Get(kv.getRow()); - g.setMaxVersions(count); - NavigableSet qualifiers = - new TreeSet(Bytes.BYTES_COMPARATOR); - qualifiers.add(qual); - get(store, g, qualifiers, result); + Get get = new Get(kv.getRow()); + get.setMaxVersions(count); + get.addColumn(family, qual); + + List result = get(get); + if (result.size() < count) { // Nothing to delete kv.updateLatestStamp(byteNow); @@ -1284,11 +1325,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } + // Now make changes to the memstore. + long size = 0; + w = rwcc.beginMemstoreInsert(); - // - // Now make changes to the memstore. - // for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); @@ -1296,13 +1337,17 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ Store store = getStore(family); for (KeyValue kv: kvs) { + kv.setMemstoreTS(w.getWriteNumber()); size = this.memstoreSize.addAndGet(store.delete(kv)); } } flush = isFlushSize(size); } finally { + if (w != null) rwcc.completeMemstoreInsert(w); + this.updatesLock.readLock().unlock(); } + if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); @@ -1350,8 +1395,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // read lock, resources may run out. For now, the thought is that this // will be extremely rare; we'll deal with it when it happens. checkResources(); - newScannerLock.writeLock().lock(); splitsAndClosesLock.readLock().lock(); + try { // We obtain a per-row lock, so other clients will block while one client // performs an update. The read lock is released by the client calling @@ -1361,6 +1406,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ byte [] row = put.getRow(); // If we did not pass an existing row lock, obtain a new one Integer lid = getLock(lockid, row); + byte [] now = Bytes.toBytes(System.currentTimeMillis()); try { // All edits for the given row (across all column families) must happen atomically. @@ -1370,7 +1416,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } finally { splitsAndClosesLock.readLock().unlock(); - newScannerLock.writeLock().unlock(); } } @@ -1410,15 +1455,12 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ Integer lid = getLock(lockId, get.getRow()); List result = new ArrayList(); try { - //Getting data - for(Map.Entry> entry: - get.getFamilyMap().entrySet()) { - get(this.stores.get(entry.getKey()), get, entry.getValue(), result); - } + result = get(get); + boolean matches = false; if (result.size() == 0 && expectedValue.length == 0) { matches = true; - } else if(result.size() == 1) { + } else if (result.size() == 1) { //Compare the expected value with the actual value byte [] actualValue = result.get(0).getValue(); matches = Bytes.equals(expectedValue, actualValue); @@ -1534,6 +1576,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ /** * Add updates first to the hlog and then add values to memstore. * Warning: Assumption is caller has lock on passed in row. + * @param family * @param edits Cell updates by column * @praram now * @throws IOException @@ -1558,6 +1601,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ byte[] byteNow = Bytes.toBytes(now); boolean flush = false; this.updatesLock.readLock().lock(); + ReadWriteConsistencyControl.WriteEntry w = null; try { WALEdit walEdit = new WALEdit(); @@ -1601,6 +1645,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ long size = 0; + w = rwcc.beginMemstoreInsert(); + // now make changes to the memstore for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); @@ -1608,11 +1654,14 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ Store store = getStore(family); for (KeyValue kv: edits) { + kv.setMemstoreTS(w.getWriteNumber()); size = this.memstoreSize.addAndGet(store.add(kv)); } } flush = isFlushSize(size); } finally { + if (w != null) rwcc.completeMemstoreInsert(w); + this.updatesLock.readLock().unlock(); } if (flush) { @@ -1854,9 +1903,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ private Filter filter; private RowFilterInterface oldFilter; private List results = new ArrayList(); + private int isScan; private int batch; RegionScanner(Scan scan, List additionalScanners) { + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + + //DebugPrint.println("HRegionScanner., threadpoint = " + ReadWriteConsistencyControl.getThreadReadPoint()); this.filter = scan.getFilter(); this.batch = scan.getBatch(); this.oldFilter = scan.getOldFilter(); @@ -1865,12 +1918,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } else { this.stopRow = scan.getStopRow(); } + this.isScan = scan.isGetScan() ? -1 : 0; List scanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } - for (Map.Entry> entry : + for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); scanners.add(store.getScanner(scan, entry.getValue())); @@ -1893,6 +1947,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ if (oldFilter != null) { oldFilter.reset(); } + + // Start the next row read and reset the thread point + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); } public boolean next(List outResults, int limit) throws IOException { @@ -1901,6 +1958,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing=" + closing.get() + " or closed=" + closed.get()); } + + // This could be a new thread from the last time we called next(). + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); results.clear(); boolean returnResult = nextInternal(limit); if (!returnResult && filterRow()) { @@ -1982,7 +2042,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ return currentRow == null || (this.stopRow != null && comparator.compareRows(this.stopRow, 0, this.stopRow.length, - currentRow, 0, currentRow.length) <= 0); + currentRow, 0, currentRow.length) <= isScan); } private boolean filterRow() { @@ -2502,10 +2562,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // HBASE-880 // /** - * @param get - * @param lockid + * @param get get object + * @param lockid existing lock id, or null for no previous lock * @return result - * @throws IOException + * @throws IOException read exceptions */ public Result get(final Get get, final Integer lockid) throws IOException { // Verify families are all valid @@ -2520,22 +2580,33 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } // Lock row Integer lid = getLock(lockid, get.getRow()); - List result = new ArrayList(); + List result = null; try { - for (Map.Entry> entry: - get.getFamilyMap().entrySet()) { - get(this.stores.get(entry.getKey()), get, entry.getValue(), result); - } + result = get(get); } finally { - if(lockid == null) releaseRowLock(lid); + if(lockid == null) + releaseRowLock(lid); } return new Result(result); } - private void get(final Store store, final Get get, - final NavigableSet qualifiers, List result) - throws IOException { - store.get(get, qualifiers, result); + /* + * Do a get based on the get parameter. + */ + private List get(final Get get) throws IOException { + Scan scan = new Scan(get); + + List results = new ArrayList(); + + InternalScanner scanner = null; + try { + scanner = getScanner(scan); + scanner.next(results); + } finally { + if (scanner != null) + scanner.close(); + } + return results; } /** @@ -2544,6 +2615,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @param family * @param qualifier * @param amount + * @param writeToWAL * @return The new value. * @throws IOException */ @@ -2558,6 +2630,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ try { Store store = stores.get(family); + // TODO call the proper GET API // Get the old value: Get get = new Get(row); get.addColumn(family, qualifier); @@ -2623,7 +2696,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( (5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + - (20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); + (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) + diff --git src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java index de3df22..440f5a7 100644 --- src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java +++ src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java @@ -1,5 +1,5 @@ /** - * Copyright 2009 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,6 +19,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.KeyValue; + import java.util.Collection; import java.util.Comparator; import java.util.Iterator; @@ -28,8 +30,6 @@ import java.util.SortedSet; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; -import org.apache.hadoop.hbase.KeyValue; - /** * A {@link java.util.Set} of {@link KeyValue}s implemented on top of a * {@link java.util.concurrent.ConcurrentSkipListMap}. Works like a @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.KeyValue; * has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent * get and set and won't throw ConcurrentModificationException when iterating. */ -class KeyValueSkipListSet implements NavigableSet, Cloneable { +class KeyValueSkipListSet implements NavigableSet { private ConcurrentNavigableMap delegatee; KeyValueSkipListSet(final KeyValue.KVComparator c) { @@ -167,6 +167,7 @@ class KeyValueSkipListSet implements NavigableSet, Cloneable { } public boolean contains(Object o) { + //noinspection SuspiciousMethodCalls return this.delegatee.containsKey(o); } @@ -201,17 +202,4 @@ class KeyValueSkipListSet implements NavigableSet, Cloneable { public T[] toArray(T[] a) { throw new UnsupportedOperationException("Not implemented"); } - - @Override - public KeyValueSkipListSet clone() { - assert this.delegatee.getClass() == ConcurrentSkipListMap.class; - KeyValueSkipListSet clonedSet = null; - try { - clonedSet = (KeyValueSkipListSet) super.clone(); - } catch (CloneNotSupportedException e) { - throw new InternalError(e.getMessage()); - } - clonedSet.delegatee = ((ConcurrentSkipListMap) this.delegatee).clone(); - return clonedSet; - } } \ No newline at end of file diff --git src/java/org/apache/hadoop/hbase/regionserver/MemStore.java src/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 7afe297..ba251bd 100644 --- src/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ src/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -28,7 +28,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.SortedSet; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -110,7 +112,7 @@ public class MemStore implements HeapSize { /** * Creates a snapshot of the current memstore. - * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.Map)} + * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.SortedSet)} * To get the snapshot made by this method, use {@link #getSnapshot()} */ void snapshot() { @@ -140,7 +142,7 @@ public class MemStore implements HeapSize { * call to {@link #snapshot()} * @return Return snapshot. * @see {@link #snapshot()} - * @see {@link #clearSnapshot(java.util.Map)} + * @see {@link #clearSnapshot(java.util.SortedSet)} */ KeyValueSkipListSet getSnapshot() { return this.snapshot; @@ -187,7 +189,7 @@ public class MemStore implements HeapSize { return s; } - /** + /** * Write a delete * @param delete * @return approximate size of the passed key and value. @@ -195,69 +197,8 @@ public class MemStore implements HeapSize { long delete(final KeyValue delete) { long s = 0; this.lock.readLock().lock(); - //Have to find out what we want to do here, to find the fastest way of - //removing things that are under a delete. - //Actions that will take place here are: - //1. Insert a delete and remove all the affected entries already in memstore - //2. In the case of a Delete and the matching put is found then don't insert - // the delete - //TODO Would be nice with if we had an iterator for this, so we could remove - //things that needs to be removed while iterating and don't have to go - //back and do it afterwards try { - boolean notpresent = false; - List deletes = new ArrayList(); - SortedSet tail = this.kvset.tailSet(delete); - - //Parse the delete, so that it is only done once - byte [] deleteBuffer = delete.getBuffer(); - int deleteOffset = delete.getOffset(); - - int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset); - deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT; - - short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset); - deleteOffset += Bytes.SIZEOF_SHORT; - int deleteRowOffset = deleteOffset; - - deleteOffset += deleteRowLen; - - byte deleteFamLen = deleteBuffer[deleteOffset]; - deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen; - - int deleteQualifierOffset = deleteOffset; - int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen - - Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG - - Bytes.SIZEOF_BYTE; - - deleteOffset += deleteQualifierLen; - - int deleteTimestampOffset = deleteOffset; - deleteOffset += Bytes.SIZEOF_LONG; - byte deleteType = deleteBuffer[deleteOffset]; - - //Comparing with tail from memstore - for (KeyValue kv : tail) { - DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer, - deleteRowOffset, deleteRowLen, deleteQualifierOffset, - deleteQualifierLen, deleteTimestampOffset, deleteType, - comparator.getRawComparator()); - if (res == DeleteCode.DONE) { - break; - } else if (res == DeleteCode.DELETE) { - deletes.add(kv); - } // SKIP - } - - //Delete all the entries effected by the last added delete - for (KeyValue kv : deletes) { - notpresent = this.kvset.remove(kv); - s -= heapSizeChange(kv, notpresent); - } - - // Adding the delete to memstore. Add any value, as long as - // same instance each time. s += heapSizeChange(delete, this.kvset.add(delete)); } finally { this.lock.readLock().unlock(); @@ -265,7 +206,7 @@ public class MemStore implements HeapSize { this.size.addAndGet(s); return s; } - + /** * @param kv Find the row that comes after this one. If null, we return the * first. @@ -318,7 +259,7 @@ public class MemStore implements HeapSize { } /** - * @param state + * @param state column/delete tracking state */ void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) { this.lock.readLock().lock(); @@ -442,8 +383,7 @@ public class MemStore implements HeapSize { this.lock.readLock().lock(); try { KeyValueScanner [] scanners = new KeyValueScanner[1]; - scanners[0] = new MemStoreScanner(this.kvset.clone(), - this.snapshot.clone(), this.comparator); + scanners[0] = new MemStoreScanner(); return scanners; } finally { this.lock.readLock().unlock(); @@ -465,10 +405,8 @@ public class MemStore implements HeapSize { * @param matcher Column matcher * @param result List to add results to * @return true if done with store (early-out), false if not - * @throws IOException */ - public boolean get(QueryMatcher matcher, List result) - throws IOException { + public boolean get(QueryMatcher matcher, List result) { this.lock.readLock().lock(); try { if(internalGet(this.kvset, matcher, result) || matcher.isDone()) { @@ -485,11 +423,11 @@ public class MemStore implements HeapSize { * Gets from either the memstore or the snapshop, and returns a code * to let you know which is which. * - * @param matcher - * @param result + * @param matcher query matcher + * @param result puts results here * @return 1 == memstore, 2 == snapshot, 0 == none */ - int getWithCode(QueryMatcher matcher, List result) throws IOException { + int getWithCode(QueryMatcher matcher, List result) { this.lock.readLock().lock(); try { boolean fromMemstore = internalGet(this.kvset, matcher, result); @@ -517,18 +455,16 @@ public class MemStore implements HeapSize { void readLockUnlock() { this.lock.readLock().unlock(); } - + /** * * @param set memstore or snapshot * @param matcher query matcher * @param result list to add results to * @return true if done with store (early-out), false if not - * @throws IOException */ boolean internalGet(final NavigableSet set, - final QueryMatcher matcher, final List result) - throws IOException { + final QueryMatcher matcher, final List result) { if(set.isEmpty()) return false; // Seek to startKey SortedSet tail = set.tailSet(matcher.getStartKey()); @@ -550,11 +486,161 @@ public class MemStore implements HeapSize { } return false; } + + + /* + * MemStoreScanner implements the KeyValueScanner. + * It lets the caller scan the contents of a memstore -- both current + * map and snapshot. + * This behaves as if it were a real scanner but does not maintain position. + */ + protected class MemStoreScanner implements KeyValueScanner { + // Next row information for either kvset or snapshot + private KeyValue kvsetNextRow = null; + private KeyValue snapshotNextRow = null; + + // iterator based scanning. + Iterator kvsetIt; + Iterator snapshotIt; + + /* + Some notes... + + So memstorescanner is fixed at creation time. this includes pointers/iterators into + existing kvset/snapshot. during a snapshot creation, the kvset is null, and the + snapshot is moved. since kvset is null there is no point on reseeking on both, + we can save us the trouble. During the snapshot->hfile transition, the memstore + scanner is re-created by StoreScanner#updateReaders(). StoreScanner should + potentially do something smarter by adjusting the existing memstore scanner. + + But there is a greater problem here, that being once a scanner has progressed + during a snapshot scenario, we currently iterate past the kvset then 'finish' up. + if a scan lasts a little while, there is a chance for new entries in kvset to + become available but we will never see them. This needs to be handled at the + StoreScanner level with coordination with MemStoreScanner. + + */ + + MemStoreScanner() { + super(); + + //DebugPrint.println(" MS new@" + hashCode()); + } + + protected KeyValue getNext(Iterator it) { + KeyValue ret = null; + long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); + //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint); + + while (ret == null && it.hasNext()) { + KeyValue v = it.next(); + if (v.getMemstoreTS() <= readPoint) { + // keep it. + ret = v; + } + } + return ret; + } + + public synchronized boolean seek(KeyValue key) { + if (key == null) { + close(); + return false; + } + + // kvset and snapshot will never be empty. + // if tailSet cant find anything, SS is empty (not null). + SortedSet kvTail = kvset.tailSet(key); + SortedSet snapshotTail = snapshot.tailSet(key); + + kvsetIt = kvTail.iterator(); + snapshotIt = snapshotTail.iterator(); + + kvsetNextRow = getNext(kvsetIt); + snapshotNextRow = getNext(snapshotIt); + //long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); + //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " + + // kvset.size() + " threadread = " + readPoint); + //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " + + // snapshot.size() + " threadread = " + readPoint); + + + KeyValue lowest = getLowest(); + + // has data := (lowest != null) + return lowest != null; + } + + public synchronized KeyValue peek() { + //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest()); + return getLowest(); + } + + + public synchronized KeyValue next() { + KeyValue theNext = getLowest(); + + if (theNext == null) { + return null; + } + + // Advance one of the iterators + if (theNext == kvsetNextRow) { + kvsetNextRow = getNext(kvsetIt); + } else { + snapshotNextRow = getNext(snapshotIt); + } + + //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); + //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + + // getLowest() + " threadpoint=" + readpoint); + return theNext; + } + + protected KeyValue getLowest() { + return getLower(kvsetNextRow, + snapshotNextRow); + } + + /* + * Returns the lower of the two key values, or null if they are both null. + * This uses comparator.compare() to compare the KeyValue using the memstore + * comparator. + */ + protected KeyValue getLower(KeyValue first, KeyValue second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = comparator.compare(first, second); + return (compare <= 0 ? first : second); + } + return (first != null ? first : second); + } + + public synchronized void close() { + this.kvsetNextRow = null; + this.snapshotNextRow = null; + + this.kvsetIt = null; + this.snapshotIt = null; + } + + public synchronized void changedMemStore() { + // its possible that the snapshot has been cleared, therefore we become + // 'invalid'. But the scanner containing us depends on us knowing our + // "current" key to reseek the scanner stack. Thus we cannot be reseeked + // when data has been pushed out of snapshot. + if (!snapshot.isEmpty()) + seek(peek()); + } + } + public final static long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + (7 * ClassSize.REFERENCE)); - + public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST + @@ -568,11 +654,11 @@ public class MemStore implements HeapSize { * @return Size */ long heapSizeChange(final KeyValue kv, final boolean notpresent) { - return notpresent ? + return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()): 0; } - + /** * Get the entire heap usage for this MemStore not including keys in the * snapshot. @@ -581,7 +667,7 @@ public class MemStore implements HeapSize { public long heapSize() { return size.get(); } - + /** * Get the heap usage of KVs in this MemStore. */ @@ -603,7 +689,7 @@ public class MemStore implements HeapSize { * enough. See hbase-900. Fills memstores then waits so user can heap * dump and bring up resultant hprof in something like jprofiler which * allows you get 'deep size' on objects. - * @param args + * @param args main args */ public static void main(String [] args) { RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); @@ -638,5 +724,4 @@ public class MemStore implements HeapSize { } LOG.info("Exiting."); } - } diff --git src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java deleted file mode 100644 index d342bab..0000000 --- src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.util.Iterator; -import java.util.SortedSet; -import java.util.TreeSet; - -/** - * MemStoreScanner implements the KeyValueScanner. - * It lets the caller scan the contents of a memstore -- both current - * map and snapshot. - *

- * The memstore scanner keeps its own reference to the main and snapshot - * key/value sets. Keeping those references allows the scanner to be indifferent - * to memstore flushes. Calling the {@link #close()} method ensures that the - * references to those classes are null'd allowing the GC to pick them up. - */ -class MemStoreScanner implements KeyValueScanner { - private static final Log LOG = LogFactory.getLog(MemStoreScanner.class); - - private static final - SortedSet EMPTY_SET = new TreeSet(); - private static final Iterator EMPTY_ITERATOR = - new Iterator() { - - @Override - public boolean hasNext() { - return false; - } - @Override - public KeyValue next() { - return null; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - - - private SortedSet kvsetRef; - private SortedSet snapshotRef; - private KeyValue.KVComparator comparatorRef; - private Iterator kvsetIterator; - private Iterator snapshotIterator; - - private KeyValue currentKvsetKV; - private KeyValue currentSnapshotKV; - private KeyValue nextKV; - - /** - * Create a new memstore scanner. - * - * @param kvset the main key value set - * @param snapshot the snapshot set - * @param comparator the comparator to use - */ - MemStoreScanner(KeyValueSkipListSet kvset, - KeyValueSkipListSet snapshot, KeyValue.KVComparator comparator) { - super(); - this.kvsetRef = kvset; - this.snapshotRef = snapshot != null ? snapshot : EMPTY_SET; - this.comparatorRef = comparator; - this.kvsetIterator = kvsetRef.iterator(); - this.snapshotIterator = snapshotRef.iterator(); - this.nextKV = currentKvsetKV = currentSnapshotKV = null; - } - - private void fill() { - if (nextKV == null) { - if (currentSnapshotKV == null && snapshotIterator.hasNext()) { - currentSnapshotKV = snapshotIterator.next(); - } - - if (currentKvsetKV == null && kvsetIterator.hasNext()) { - currentKvsetKV = kvsetIterator.next(); - } - - if (currentSnapshotKV != null && currentKvsetKV != null) { - int cmp = comparatorRef.compare(currentSnapshotKV, currentKvsetKV); - if (cmp <= 0) { - nextKV = currentSnapshotKV; - currentSnapshotKV = null; - } else { - nextKV = currentKvsetKV; - currentKvsetKV = null; - } - } else if (currentSnapshotKV != null) { - nextKV = currentSnapshotKV; - currentSnapshotKV = null; - } else { - nextKV = currentKvsetKV; - currentKvsetKV = null; - } - } - } - - @Override - public synchronized boolean seek(KeyValue key) { - if (key == null) { - close(); - return false; - } - SortedSet kvsetTail = kvsetRef.tailSet(key); - SortedSet snapshotTail = snapshotRef != null ? - snapshotRef.tailSet(key) : EMPTY_SET; - - kvsetIterator = kvsetTail.iterator(); - snapshotIterator = snapshotTail.iterator(); - - currentKvsetKV = null; - currentSnapshotKV = null; - nextKV = null; - - return kvsetIterator.hasNext() || snapshotIterator.hasNext(); - } - - @Override - public synchronized KeyValue peek() { - fill(); - return nextKV; - } - - @Override - public synchronized KeyValue next() { - fill(); - KeyValue next = nextKV; - nextKV = null; - return next; - } - - public synchronized void close() { - this.kvsetRef = EMPTY_SET; - this.snapshotRef = EMPTY_SET; - this.kvsetIterator = EMPTY_ITERATOR; - this.snapshotIterator = EMPTY_ITERATOR; - this.currentKvsetKV = null; - this.currentSnapshotKV = null; - this.nextKV = null; - } -} diff --git src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java new file mode 100644 index 0000000..b1f1368 --- /dev/null +++ src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Manages the read/write consistency within memstore. This provides + * an interface for readers to determine what entries to ignore, and + * a mechanism for writers to obtain new write numbers, then "commit" + * the new writes for readers to read (thus forming atomic transactions). + */ +public class ReadWriteConsistencyControl { + private final AtomicLong memstoreRead = new AtomicLong(); + private final AtomicLong memstoreWrite = new AtomicLong(); + // This is the pending queue of writes. + private final LinkedList writeQueue = + new LinkedList(); + + private static final ThreadLocal perThreadReadPoint = + new ThreadLocal(); + + public static long getThreadReadPoint() { + return perThreadReadPoint.get(); + } + + public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) { + perThreadReadPoint.set(rwcc.memstoreReadPoint()); + return getThreadReadPoint(); + } + + public WriteEntry beginMemstoreInsert() { + synchronized (writeQueue) { + long nextWriteNumber = memstoreWrite.incrementAndGet(); + WriteEntry e = new WriteEntry(nextWriteNumber); + writeQueue.add(e); + return e; + } + } + public void completeMemstoreInsert(WriteEntry e) { + synchronized (writeQueue) { + e.markCompleted(); + + long nextReadValue = -1; + boolean ranOnce=false; + while (!writeQueue.isEmpty()) { + ranOnce=true; + WriteEntry queueFirst = writeQueue.getFirst(); + + if (nextReadValue > 0) { + if (nextReadValue+1 != queueFirst.getWriteNumber()) { + throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " + + nextReadValue + " next: " + queueFirst.getWriteNumber()); + } + } + + if (queueFirst.isCompleted()) { + nextReadValue = queueFirst.getWriteNumber(); + writeQueue.removeFirst(); + } else { + break; + } + } + + if (!ranOnce) { + throw new RuntimeException("never was a first"); + } + + if (nextReadValue > 0) { + memstoreRead.set(nextReadValue); + } + } + + // Spin until any other concurrent puts have finished. This makes sure that + // if we move on to construct a scanner, we'll get read-your-own-writes + // consistency. We anticipate that since puts to the memstore are very fast, + // this will be on the order of microseconds - so spinning should be faster + // than a condition variable. + int spun = 0; + while (memstoreRead.get() < e.getWriteNumber()) { + spun++; + } + // Could potentially expose spun as a metric + } + + public long memstoreReadPoint() { + return memstoreRead.get(); + } + + + public static class WriteEntry { + private long writeNumber; + private boolean completed = false; + WriteEntry(long writeNumber) { + this.writeNumber = writeNumber; + } + void markCompleted() { + this.completed = true; + } + boolean isCompleted() { + return this.completed; + } + long getWriteNumber() { + return this.writeNumber; + } + } +} diff --git src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index a42289d..4a71876 100644 --- src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -55,7 +55,11 @@ public class ScanQueryMatcher extends QueryMatcher { this.rowComparator = rowComparator; this.deletes = new ScanDeleteTracker(); this.startKey = KeyValue.createFirstOnRow(scan.getStartRow()); - this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow()); + if (scan.isGetScan()) { + this.stopKey = KeyValue.createLastOnRow(scan.getStopRow()); + } else { + this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow()); + } this.filter = scan.getFilter(); this.oldFilter = scan.getOldFilter(); diff --git src/java/org/apache/hadoop/hbase/regionserver/Store.java src/java/org/apache/hadoop/hbase/regionserver/Store.java index 2a6c35f..8d07a90 100644 --- src/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -509,7 +509,7 @@ public class Store implements HConstants, HeapSize { /** * Snapshot this stores memstore. Call before running - * {@link #flushCache(long)} so it has some work to do. + * {@link #flushCache(long, java.util.SortedSet)} so it has some work to do. */ void snapshot() { this.memstore.snapshot(); @@ -614,9 +614,12 @@ public class Store implements HConstants, HeapSize { this.lock.writeLock().lock(); try { this.storefiles.put(Long.valueOf(logCacheFlushId), sf); + + this.memstore.clearSnapshot(set); + // Tell listeners of the change in readers. notifyChangedReadersObservers(); - this.memstore.clearSnapshot(set); + return this.storefiles.size() >= this.compactionThreshold; } finally { this.lock.writeLock().unlock(); @@ -644,10 +647,8 @@ public class Store implements HConstants, HeapSize { * @param o Observer no longer interested in changes in set of Readers. */ void deleteChangedReaderObserver(ChangedReadersObserver o) { - if(this.changedReaderObservers.size() > 0) { - if (!this.changedReaderObservers.remove(o)) { - LOG.warn("Not in set" + o); - } + if (!this.changedReaderObservers.remove(o)) { + LOG.warn("Not in set" + o); } } @@ -866,7 +867,6 @@ public class Store implements HConstants, HeapSize { /** * Do a minor/major compaction. Uses the scan infrastructure to make it easy. * - * @param writer output writer * @param filesToCompact which files to compact * @param majorCompaction true to major compact (prune all deletes, max versions, etc) * @param maxId Readers maximum sequence id. @@ -999,6 +999,10 @@ public class Store implements HConstants, HeapSize { Long orderVal = Long.valueOf(result.getMaxSequenceId()); this.storefiles.put(orderVal, result); } + + // WARN ugly hack here, but necessary sadly. + ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + // Tell observers that list of StoreFiles has changed. notifyChangedReadersObservers(); // Finally, delete old store files. @@ -1489,7 +1493,12 @@ public class Store implements HConstants, HeapSize { } /** - * Increments the value for the given row/family/qualifier + * Increments the value for the given row/family/qualifier. + * + * This function will always be seen as atomic by other readers + * because it only puts a single KV to memstore. Thus no + * read/write control necessary. + * * @param row * @param f * @param qualifier diff --git src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 9e38c37..528f274 100644 --- src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NavigableSet; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,10 +45,15 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb private boolean cacheBlocks; // Used to indicate that the scanner has closed (see HBASE-1107) - private final AtomicBoolean closing = new AtomicBoolean(false); + private boolean closing = false; + private final boolean isGet; /** * Opens a scanner across memstore, snapshot, and all StoreFiles. + * + * @param store who we scan + * @param scan the spec + * @param columns which columns we are scanning */ StoreScanner(Store store, Scan scan, final NavigableSet columns) { this.store = store; @@ -58,9 +62,11 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb columns, store.ttl, store.comparator.getRawComparator(), store.versionsToReturn(scan.getMaxVersions())); + this.isGet = scan.isGetScan(); List scanners = getScanners(); // Seek all scanners to the initial key + // TODO if scan.isGetScan, use bloomfilters to skip seeking for(KeyValueScanner scanner : scanners) { scanner.seek(matcher.getStartKey()); } @@ -76,10 +82,14 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb * Used for major compactions.

* * Opens a scanner across specified StoreFiles. + * @param store who we scan + * @param scan the spec + * @param scanners ancilliary scanners */ StoreScanner(Store store, Scan scan, KeyValueScanner [] scanners) { this.store = store; this.cacheBlocks = false; + this.isGet = false; matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), null, store.ttl, store.comparator.getRawComparator(), store.versionsToReturn(scan.getMaxVersions())); @@ -99,6 +109,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb final NavigableSet columns, final KeyValueScanner [] scanners) { this.store = null; + this.isGet = false; this.cacheBlocks = scan.getCacheBlocks(); this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, comparator.getRawComparator(), scan.getMaxVersions()); @@ -132,7 +143,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb } public synchronized void close() { - this.closing.set(true); + this.closing = true; // under test, we dont have a this.store if (this.store != null) this.store.deleteChangedReaderObserver(this); @@ -145,11 +156,12 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb /** * Get the next row of values from this Store. - * @param result + * @param outResult * @param limit * @return true if there are more rows, false if scanner is done */ public synchronized boolean next(List outResult, int limit) throws IOException { + //DebugPrint.println("SS.next"); KeyValue peeked = this.heap.peek(); if (peeked == null) { close(); @@ -160,6 +172,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb List results = new ArrayList(); LOOP: while((kv = this.heap.peek()) != null) { QueryMatcher.MatchCode qcode = matcher.match(kv); + //DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode); switch(qcode) { case INCLUDE: KeyValue next = this.heap.next(); @@ -227,8 +240,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb LOG.warn("StoreFile " + sf + " has null Reader"); continue; } - // Get a scanner that does not use pread. - s.add(r.getScanner(this.cacheBlocks, false)); + // If isGet, use pread, else false, dont use pread + s.add(r.getScanner(this.cacheBlocks, isGet)); } List scanners = new ArrayList(s.size()+1); @@ -240,16 +253,20 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb // Implementation of ChangedReadersObserver public synchronized void updateReaders() throws IOException { - if (this.closing.get()) return; + if (this.closing) return; KeyValue topKey = this.peek(); if (topKey == null) return; + List scanners = getScanners(); - // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { scanner.seek(topKey); } + // close the previous scanners: + this.heap.close(); // bubble thru and close all scanners. + this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP + // Combine all seeked scanners with a heap heap = new KeyValueHeap( scanners.toArray(new KeyValueScanner[scanners.size()]), store.comparator); diff --git src/java/org/apache/hadoop/hbase/util/FSUtils.java src/java/org/apache/hadoop/hbase/util/FSUtils.java index 1bddbf5..09bac3d 100644 --- src/java/org/apache/hadoop/hbase/util/FSUtils.java +++ src/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -367,6 +367,7 @@ public class FSUtils { return true; } + // TODO move this method OUT of FSUtils. No dependencies to HMaster /** * Expects to find -ROOT- directory. * @param fs diff --git src/test/org/apache/hadoop/hbase/TestKeyValue.java src/test/org/apache/hadoop/hbase/TestKeyValue.java index 1b74ae1..6f56769 100644 --- src/test/org/apache/hadoop/hbase/TestKeyValue.java +++ src/test/org/apache/hadoop/hbase/TestKeyValue.java @@ -276,4 +276,49 @@ public class TestKeyValue extends TestCase { // TODO actually write this test! } + + private final byte[] rowA = Bytes.toBytes("rowA"); + private final byte[] rowB = Bytes.toBytes("rowB"); + + private final byte[] family = Bytes.toBytes("family"); + private final byte[] qualA = Bytes.toBytes("qfA"); + + private void assertKVLess(KeyValue.KVComparator c, + KeyValue less, + KeyValue greater) { + int cmp = c.compare(less,greater); + assertTrue(cmp < 0); + cmp = c.compare(greater,less); + assertTrue(cmp > 0); + } + + public void testFirstLastOnRow() { + final KVComparator c = KeyValue.COMPARATOR; + long ts = 1; + + // These are listed in sort order (ie: every one should be less + // than the one on the next line). + final KeyValue firstOnRowA = KeyValue.createFirstOnRow(rowA); + final KeyValue kvA_1 = new KeyValue(rowA, null, null, ts, Type.Put); + final KeyValue kvA_2 = new KeyValue(rowA, family, qualA, ts, Type.Put); + + final KeyValue lastOnRowA = KeyValue.createLastOnRow(rowA); + final KeyValue firstOnRowB = KeyValue.createFirstOnRow(rowB); + final KeyValue kvB = new KeyValue(rowB, family, qualA, ts, Type.Put); + + assertKVLess(c, firstOnRowA, firstOnRowB); + assertKVLess(c, firstOnRowA, kvA_1); + assertKVLess(c, firstOnRowA, kvA_2); + assertKVLess(c, kvA_1, kvA_2); + assertKVLess(c, kvA_2, firstOnRowB); + assertKVLess(c, kvA_1, firstOnRowB); + + assertKVLess(c, lastOnRowA, firstOnRowB); + assertKVLess(c, firstOnRowB, kvB); + assertKVLess(c, lastOnRowA, kvB); + + assertKVLess(c, kvA_2, lastOnRowA); + assertKVLess(c, kvA_1, lastOnRowA); + assertKVLess(c, firstOnRowA, lastOnRowA); + } } diff --git src/test/org/apache/hadoop/hbase/client/TestClient.java src/test/org/apache/hadoop/hbase/client/TestClient.java index f4403e7..8ad1898 100644 --- src/test/org/apache/hadoop/hbase/client/TestClient.java +++ src/test/org/apache/hadoop/hbase/client/TestClient.java @@ -67,7 +67,7 @@ public class TestClient extends HBaseClusterTestCase { super(); } - /** + /* * Test from client side of an involved filter against a multi family that * involves deletes. * @@ -196,7 +196,7 @@ public class TestClient extends HBaseClusterTestCase { } } - /** + /* * Test filters when multiple regions. It does counts. Needs eye-balling of * logs to ensure that we're not scanning more regions that we're supposed to. * Related to the TestFilterAcrossRegions over in the o.a.h.h.filter package. @@ -253,7 +253,7 @@ public class TestClient extends HBaseClusterTestCase { assertEquals(rowCount - endKeyCount, countGreater); } - /** + /* * Load table with rows from 'aaa' to 'zzz'. * @param t * @return Count of rows loaded. @@ -418,7 +418,7 @@ public class TestClient extends HBaseClusterTestCase { scanner.close(); } - /** + /* * Test simple table and non-existent row cases. */ public void testSimpleMissing() throws Exception { @@ -531,7 +531,7 @@ public class TestClient extends HBaseClusterTestCase { assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); } - /** + /* * Test basic puts, gets, scans, and deletes for a single row * in a multiple family table. */ @@ -1438,7 +1438,7 @@ public class TestClient extends HBaseClusterTestCase { ht.put(put); delete = new Delete(ROW); - delete.deleteColumn(FAMILIES[0], QUALIFIER); + delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4] ht.delete(delete); get = new Get(ROW); @@ -1473,23 +1473,24 @@ public class TestClient extends HBaseClusterTestCase { // But alas, this is not to be. We can't put them back in either case. put = new Put(ROW); - put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); - put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000 + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000 ht.put(put); - // The Get returns the latest value but then does not return the - // oldest, which was never deleted, ts[1]. - + + // It used to be due to the internal implementation of Get, that + // the Get() call would return ts[4] UNLIKE the Scan below. With + // the switch to using Scan for Get this is no longer the case. get = new Get(ROW); get.addFamily(FAMILIES[0]); get.setMaxVersions(Integer.MAX_VALUE); result = ht.get(get); assertNResult(result, ROW, FAMILIES[0], QUALIFIER, - new long [] {ts[2], ts[3], ts[4]}, - new byte[][] {VALUES[2], VALUES[3], VALUES[4]}, + new long [] {ts[1], ts[2], ts[3]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, 0, 2); - // The Scanner returns the previous values, the expected-unexpected behavior + // The Scanner returns the previous values, the expected-naive-unexpected behavior scan = new Scan(ROW); scan.addFamily(FAMILIES[0]); @@ -1553,7 +1554,7 @@ public class TestClient extends HBaseClusterTestCase { result = ht.get(get); assertTrue("Expected 2 keys but received " + result.size(), result.size() == 2); - assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, + assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, new long [] {ts[0], ts[1]}, new byte[][] {VALUES[0], VALUES[1]}, 0, 1); @@ -1591,9 +1592,8 @@ public class TestClient extends HBaseClusterTestCase { get.addFamily(FAMILIES[2]); get.setMaxVersions(Integer.MAX_VALUE); result = ht.get(get); - assertTrue("Expected 1 key but received " + result.size() + ": " + result, - result.size() == 1); - assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, + assertEquals(1, result.size()); + assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, new long [] {ts[2]}, new byte[][] {VALUES[2]}, 0, 0); @@ -1603,9 +1603,8 @@ public class TestClient extends HBaseClusterTestCase { scan.addFamily(FAMILIES[2]); scan.setMaxVersions(Integer.MAX_VALUE); result = getSingleScanResult(ht, scan); - assertTrue("Expected 1 key but received " + result.size(), - result.size() == 1); - assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, + assertEquals(1, result.size()); + assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, new long [] {ts[2]}, new byte[][] {VALUES[2]}, 0, 0); @@ -1691,7 +1690,7 @@ public class TestClient extends HBaseClusterTestCase { } } - /** + /* * Baseline "scalability" test. * * Tests one hundred families, one million columns, one million versions @@ -1738,7 +1737,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * Explicitly test JIRAs related to HBASE-880 / Client API */ public void testJIRAs() throws Exception { @@ -1754,7 +1753,7 @@ public class TestClient extends HBaseClusterTestCase { // JIRA Testers // - /** + /* * HBASE-867 * If millions of columns in a column family, hbase scanner won't come up * @@ -1844,7 +1843,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * HBASE-861 * get with timestamp will return a value if there is a version with an * earlier timestamp @@ -1907,7 +1906,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * HBASE-33 * Add a HTable get/obtainScanner method that retrieves all versions of a * particular column and row between two timestamps @@ -1956,7 +1955,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * HBASE-1014 * commit(BatchUpdate) method should return timestamp */ @@ -1980,7 +1979,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * HBASE-1182 * Scan for columns > some timestamp */ @@ -2025,7 +2024,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * HBASE-52 * Add a means of scanning over all versions */ @@ -2423,7 +2422,7 @@ public class TestClient extends HBaseClusterTestCase { - /** + /* * Verify a single column using gets. * Expects family and qualifier arrays to be valid for at least * the range: idx-2 < idx < idx+2 @@ -2480,7 +2479,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * Verify a single column using scanners. * Expects family and qualifier arrays to be valid for at least * the range: idx-2 to idx+2 @@ -2542,11 +2541,11 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * Verify we do not read any values by accident around a single column * Same requirements as getVerifySingleColumn */ - private void getVerifySingleEmpty(HTable ht, + private void getVerifySingleEmpty(HTable ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES, int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX) @@ -2668,12 +2667,11 @@ public class TestClient extends HBaseClusterTestCase { "Got row [" + Bytes.toString(result.getRow()) +"]", equals(row, result.getRow())); int expectedResults = end - start + 1; - assertTrue("Expected " + expectedResults + " keys but result contains " - + result.size(), result.size() == expectedResults); - + assertEquals(expectedResults, result.size()); + KeyValue [] keys = result.sorted(); - for(int i=0;i kvs = new ArrayList(); kvs.add(new KeyValue(row1, fam4, null, null)); @@ -1439,6 +1465,41 @@ public class TestHRegion extends HBaseTestCase { assertICV(row, fam1, qual1, value+amount); } + public void testIncrementColumnValue_BumpSnapshot() throws IOException { + initHRegion(tableName, getName(), fam1); + + long value = 42L; + long incr = 44L; + + // first put something in kvset, then snapshot it. + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + region.put(put); + + // get the store in question: + Store s = region.getStore(fam1); + s.snapshot(); //bam + + // now increment: + long newVal = region.incrementColumnValue(row, fam1, qual1, + incr, false); + + assertEquals(value+incr, newVal); + + // get both versions: + Get get = new Get(row); + get.setMaxVersions(); + get.addColumn(fam1,qual1); + + Result r = region.get(get, null); + assertEquals(2, r.size()); + KeyValue first = r.raw()[0]; + KeyValue second = r.raw()[1]; + + assertTrue("ICV failed to upgrade timestamp", + first.getTimestamp() != second.getTimestamp()); + } + public void testIncrementColumnValue_ConcurrentFlush() throws IOException { initHRegion(tableName, getName(), fam1); @@ -1652,7 +1713,7 @@ public class TestHRegion extends HBaseTestCase { assertEquals(expected.get(i), actual.get(i)); } } - + ////////////////////////////////////////////////////////////////////////////// // Split test ////////////////////////////////////////////////////////////////////////////// @@ -1935,9 +1996,9 @@ public class TestHRegion extends HBaseTestCase { FlushThread flushThread = new FlushThread(); flushThread.start(); - Scan scan = new Scan(); - scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, - new BinaryComparator(Bytes.toBytes("row0")))); + Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1")); +// scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, +// new BinaryComparator(Bytes.toBytes("row0")))); int expectedCount = numFamilies * numQualifiers; List res = new ArrayList(); @@ -1950,7 +2011,7 @@ public class TestHRegion extends HBaseTestCase { } if (i != 0 && i % flushInterval == 0) { - //System.out.println("scan iteration = " + i); + //System.out.println("flush scan iteration = " + i); flushThread.flush(); } @@ -1959,9 +2020,10 @@ public class TestHRegion extends HBaseTestCase { InternalScanner scanner = region.getScanner(scan); while (scanner.next(res)) ; if (!res.isEmpty() || !previousEmpty || i > compactInterval) { - Assert.assertEquals("i=" + i, expectedCount, res.size()); + assertEquals("i=" + i, expectedCount, res.size()); long timestamp = res.get(0).getTimestamp(); - Assert.assertTrue(timestamp >= prevTimestamp); + assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp, + timestamp >= prevTimestamp); prevTimestamp = timestamp; } } @@ -2011,15 +2073,16 @@ public class TestHRegion extends HBaseTestCase { for (int r = 0; r < numRows; r++) { byte[] row = Bytes.toBytes("row" + r); Put put = new Put(row); - for (int f = 0; f < families.length; f++) { - for (int q = 0; q < qualifiers.length; q++) { - put.add(families[f], qualifiers[q], (long) val, - Bytes.toBytes(val)); + for (byte[] family : families) { + for (byte[] qualifier : qualifiers) { + put.add(family, qualifier, (long) val, + Bytes.toBytes(val)); } } +// System.out.println("Putting of kvsetsize=" + put.size()); region.put(put); - if (val > 0 && val % 47 == 0){ - //System.out.println("put iteration = " + val); + if (val > 0 && val % 47 == 0) { + System.out.println("put iteration = " + val); Delete delete = new Delete(row, (long)val-30, null); region.delete(delete, null, true); } diff --git src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 1e602e7..28a01a2 100644 --- src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -46,10 +47,12 @@ public class TestMemStore extends TestCase { private static final byte [] FAMILY = Bytes.toBytes("column"); private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic"); private static final String CONTENTSTR = "contentstr"; + private ReadWriteConsistencyControl rwcc; @Override public void setUp() throws Exception { super.setUp(); + this.rwcc = new ReadWriteConsistencyControl(); this.memstore = new MemStore(); } @@ -75,6 +78,7 @@ public class TestMemStore extends TestCase { KeyValueScanner [] memstorescanners = this.memstore.getScanners(); Scan scan = new Scan(); List result = new ArrayList(); + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, this.memstore.comparator, null, memstorescanners); int count = 0; @@ -93,6 +97,8 @@ public class TestMemStore extends TestCase { for (int i = 0; i < memstorescanners.length; i++) { memstorescanners[0].close(); } + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, @@ -137,9 +143,9 @@ public class TestMemStore extends TestCase { if (count == snapshotIndex) { this.memstore.snapshot(); this.memstore.clearSnapshot(this.memstore.getSnapshot()); - // Added more rows into kvset. + // Added more rows into kvset. But the scanner wont see these rows. addRows(this.memstore, ts); - LOG.info("Snapshotted, cleared it and then added values"); + LOG.info("Snapshotted, cleared it and then added values (which wont be seen)"); } result.clear(); } @@ -149,6 +155,181 @@ public class TestMemStore extends TestCase { assertEquals(rowCount, count); } + /** + * A simple test which verifies the 3 possible states when scanning across snapshot. + */ + public void testScanAcrossSnapshot2() { + // we are going to the scanning across snapshot with two kvs + // kv1 should always be returned before kv2 + final byte[] one = Bytes.toBytes(1); + final byte[] two = Bytes.toBytes(2); + final byte[] f = Bytes.toBytes("f"); + final byte[] q = Bytes.toBytes("q"); + final byte[] v = Bytes.toBytes(3); + + final KeyValue kv1 = new KeyValue(one, f, q, v); + final KeyValue kv2 = new KeyValue(two, f, q, v); + + // use case 1: both kvs in kvset + this.memstore.add(kv1.clone()); + this.memstore.add(kv2.clone()); + verifyScanAcrossSnapshot2(kv1, kv2); + + // use case 2: both kvs in snapshot + this.memstore.snapshot(); + verifyScanAcrossSnapshot2(kv1, kv2); + + // use case 3: first in snapshot second in kvset + this.memstore = new MemStore(); + this.memstore.add(kv1.clone()); + this.memstore.snapshot(); + this.memstore.add(kv2.clone()); + verifyScanAcrossSnapshot2(kv1, kv2); + } + + private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) { + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + KeyValueScanner[] memstorescanners = this.memstore.getScanners(); + assertEquals(1, memstorescanners.length); + final KeyValueScanner scanner = memstorescanners[0]; + scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW)); + assertEquals(kv1, scanner.next()); + assertEquals(kv2, scanner.next()); + assertNull(scanner.next()); + } + + private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) { + scanner.seek(KeyValue.createFirstOnRow(new byte[]{})); + for (KeyValue kv : expected) { + assertTrue(0 == + KeyValue.COMPARATOR.compare(kv, + scanner.next())); + } + assertNull(scanner.peek()); + } + + public void testMemstoreConcurrentControl() { + final byte[] row = Bytes.toBytes(1); + final byte[] f = Bytes.toBytes("family"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] v = Bytes.toBytes("value"); + + ReadWriteConsistencyControl.WriteEntry w = + rwcc.beginMemstoreInsert(); + + KeyValue kv1 = new KeyValue(row, f, q1, v); + kv1.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv1); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + KeyValueScanner[] s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{}); + + rwcc.completeMemstoreInsert(w); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{kv1}); + + w = rwcc.beginMemstoreInsert(); + KeyValue kv2 = new KeyValue(row, f, q2, v); + kv2.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv2); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{kv1}); + + rwcc.completeMemstoreInsert(w); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{kv1, kv2}); + } + + private static class ReadOwnWritesTester extends Thread { + final int id; + static final int NUM_TRIES = 1000; + + final byte[] row; + + final byte[] f = Bytes.toBytes("family"); + final byte[] q1 = Bytes.toBytes("q1"); + + final ReadWriteConsistencyControl rwcc; + final MemStore memstore; + + AtomicReference caughtException; + + + public ReadOwnWritesTester(int id, + MemStore memstore, + ReadWriteConsistencyControl rwcc, + AtomicReference caughtException) + { + this.id = id; + this.rwcc = rwcc; + this.memstore = memstore; + this.caughtException = caughtException; + row = Bytes.toBytes(id); + } + + public void run() { + try { + internalRun(); + } catch (Throwable t) { + caughtException.compareAndSet(null, t); + } + } + + private void internalRun() { + for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { + ReadWriteConsistencyControl.WriteEntry w = + rwcc.beginMemstoreInsert(); + + // Insert the sequence value (i) + byte[] v = Bytes.toBytes(i); + + KeyValue kv = new KeyValue(row, f, q1, i, v); + kv.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv); + rwcc.completeMemstoreInsert(w); + + // Assert that we can read back + + KeyValueScanner s = this.memstore.getScanners()[0]; + s.seek(kv); + + KeyValue ret = s.next(); + assertNotNull("Didnt find own write at all", ret); + assertEquals("Didnt read own writes", + kv.getTimestamp(), ret.getTimestamp()); + } + } + } + + public void no_testReadOwnWritesUnderConcurrency() throws Throwable { + + int NUM_THREADS = 8; + + ReadOwnWritesTester threads[] = new ReadOwnWritesTester[NUM_THREADS]; + AtomicReference caught = new AtomicReference(); + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught); + threads[i].start(); + } + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].join(); + } + + if (caught.get() != null) { + throw caught.get(); + } + } + /** * Test memstore snapshots * @throws IOException @@ -442,9 +623,10 @@ public class TestMemStore extends TestCase { List expected = new ArrayList(); expected.add(put3); expected.add(del2); + expected.add(put2); expected.add(put1); - - assertEquals(3, memstore.kvset.size()); + + assertEquals(4, memstore.kvset.size()); int i = 0; for(KeyValue kv : memstore.kvset) { assertEquals(expected.get(i++), kv); @@ -476,8 +658,11 @@ public class TestMemStore extends TestCase { List expected = new ArrayList(); expected.add(put3); expected.add(del2); + expected.add(put2); + expected.add(put1); + - assertEquals(2, memstore.kvset.size()); + assertEquals(4, memstore.kvset.size()); int i = 0; for (KeyValue kv: memstore.kvset) { assertEquals(expected.get(i++), kv); @@ -510,9 +695,14 @@ public class TestMemStore extends TestCase { List expected = new ArrayList(); expected.add(del); + expected.add(put1); + expected.add(put2); expected.add(put4); + expected.add(put3); + + - assertEquals(2, memstore.kvset.size()); + assertEquals(5, memstore.kvset.size()); int i = 0; for (KeyValue kv: memstore.kvset) { assertEquals(expected.get(i++), kv); @@ -528,7 +718,7 @@ public class TestMemStore extends TestCase { memstore.add(new KeyValue(row, fam, qf, ts, val)); KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } @@ -541,7 +731,7 @@ public class TestMemStore extends TestCase { "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } public void testRetainsDeleteColumn() throws IOException { @@ -553,7 +743,7 @@ public class TestMemStore extends TestCase { KeyValue.Type.DeleteColumn, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } public void testRetainsDeleteFamily() throws IOException { @@ -565,7 +755,7 @@ public class TestMemStore extends TestCase { KeyValue.Type.DeleteFamily, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } @@ -573,13 +763,13 @@ public class TestMemStore extends TestCase { ////////////////////////////////////////////////////////////////////////////// // Helpers ////////////////////////////////////////////////////////////////////////////// - private byte [] makeQualifier(final int i1, final int i2){ + private static byte [] makeQualifier(final int i1, final int i2){ return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2)); } /** - * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT} + * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to. * @return How many rows we added. * @throws IOException @@ -589,7 +779,7 @@ public class TestMemStore extends TestCase { } /** - * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT} + * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to. * @return How many rows we added. * @throws IOException @@ -643,4 +833,57 @@ public class TestMemStore extends TestCase { return new KeyValue(row, Bytes.toBytes("test_col:"), HConstants.LATEST_TIMESTAMP, value); } + private static void addRows(int count, final MemStore mem) { + long nanos = System.nanoTime(); + + for (int i = 0 ; i < count ; i++) { + if (i % 1000 == 0) { + + System.out.println(i + " Took for 1k usec: " + (System.nanoTime() - nanos)/1000); + nanos = System.nanoTime(); + } + long timestamp = System.currentTimeMillis(); + + for (int ii = 0; ii < QUALIFIER_COUNT ; ii++) { + byte [] row = Bytes.toBytes(i); + byte [] qf = makeQualifier(i, ii); + mem.add(new KeyValue(row, FAMILY, qf, timestamp, qf)); + } + } + } + + + static void doScan(MemStore ms, int iteration) { + long nanos = System.nanoTime(); + KeyValueScanner [] ss = ms.getScanners(); + KeyValueScanner s = ss[0]; + s.seek(KeyValue.createFirstOnRow(new byte[]{})); + + System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000); + int cnt=0; + while(s.next() != null) ++cnt; + + System.out.println(iteration + " took usec: " + (System.nanoTime() - nanos)/1000 + " for: " + cnt); + + } + + public static void main(String [] args) { + ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + MemStore ms = new MemStore(); + + long n1 = System.nanoTime(); + addRows(25000, ms); + System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000); + + + System.out.println("foo"); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + + for (int i = 0 ; i < 50 ; i++) + doScan(ms, i); + + } + + } diff --git src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java new file mode 100644 index 0000000..78fe59c --- /dev/null +++ src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java @@ -0,0 +1,109 @@ +package org.apache.hadoop.hbase.regionserver; + +import junit.framework.TestCase; + +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class TestReadWriteConsistencyControl extends TestCase { + static class Writer implements Runnable { + final AtomicBoolean finished; + final ReadWriteConsistencyControl rwcc; + final AtomicBoolean status; + + Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) { + this.finished = finished; + this.rwcc = rwcc; + this.status = status; + } + private Random rnd = new Random(); + public boolean failed = false; + + public void run() { + while (!finished.get()) { + ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert(); +// System.out.println("Begin write: " + e.getWriteNumber()); + // 10 usec - 500usec (including 0) + int sleepTime = rnd.nextInt(500); + // 500 * 1000 = 500,000ns = 500 usec + // 1 * 100 = 100ns = 1usec + try { + if (sleepTime > 0) + Thread.sleep(0, sleepTime * 1000); + } catch (InterruptedException e1) { + } + try { + rwcc.completeMemstoreInsert(e); + } catch (RuntimeException ex) { + // got failure + System.out.println(ex.toString()); + ex.printStackTrace(); + status.set(false); + return; + // Report failure if possible. + } + } + } + } + + public void testParallelism() throws Exception { + final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + + final AtomicBoolean finished = new AtomicBoolean(false); + + // fail flag for the reader thread + final AtomicBoolean readerFailed = new AtomicBoolean(false); + final AtomicLong failedAt = new AtomicLong(); + Runnable reader = new Runnable() { + public void run() { + long prev = rwcc.memstoreReadPoint(); + while (!finished.get()) { + long newPrev = rwcc.memstoreReadPoint(); + if (newPrev < prev) { + // serious problem. + System.out.println("Reader got out of order, prev: " + + prev + " next was: " + newPrev); + readerFailed.set(true); + // might as well give up + failedAt.set(newPrev); + return; + } + } + } + }; + + // writer thread parallelism. + int n = 20; + Thread [] writers = new Thread[n]; + AtomicBoolean [] statuses = new AtomicBoolean[n]; + Thread readThread = new Thread(reader); + + for (int i = 0 ; i < n ; ++i ) { + statuses[i] = new AtomicBoolean(true); + writers[i] = new Thread(new Writer(finished, rwcc, statuses[i])); + writers[i].start(); + } + readThread.start(); + + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException ex) { + } + + finished.set(true); + + readThread.join(); + for (int i = 0; i < n; ++i) { + writers[i].join(); + } + + // check failure. + assertFalse(readerFailed.get()); + for (int i = 0; i < n; ++i) { + assertTrue(statuses[i].get()); + } + + + } +} diff --git src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index f1ec15b..76ab7b5 100644 --- src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -1,5 +1,5 @@ /* - * Copyright 2009 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,24 +20,23 @@ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.NavigableSet; -import java.util.TreeSet; - import junit.framework.TestCase; - import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + public class TestStoreScanner extends TestCase { private final String CF_STR = "cf"; final byte [] CF = Bytes.toBytes(CF_STR); - /** + /* * Test utility for building a NavigableSet for scanners. * @param strCols * @return @@ -128,7 +127,7 @@ public class TestStoreScanner extends TestCase { assertEquals(kvs[0], results.get(0)); } - /** + /* * Test test shows exactly how the matcher's return codes confuses the StoreScanner * and prevent it from doing the right thing. Seeking once, then nexting twice * should return R1, then R2, but in this case it doesnt. @@ -189,7 +188,7 @@ public class TestStoreScanner extends TestCase { assertEquals(0, results.size()); } - /** + /* * Test the case where there is a delete row 'in front of' the next row, the scanner * will move to the next row. */ @@ -408,7 +407,7 @@ public class TestStoreScanner extends TestCase { assertEquals(false, scan.next(results)); } - /** + /* * Test expiration of KeyValues in combination with a configured TTL for * a column family (as should be triggered in a major compaction). */