diff --git src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java index 263a01c..2a9cf8f 100644 --- src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java +++ src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.idx.exp.Expression; import org.apache.hadoop.hbase.regionserver.idx.support.sets.IntSet; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.metrics.util.MBeanUtil; import org.apache.hadoop.util.Progressable; @@ -41,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -106,7 +108,13 @@ public class IdxRegion extends HRegion { public void initialize(Path initialFiles, Progressable reporter) throws IOException { super.initialize(initialFiles, reporter); - rebuildIndexes(); + Callable work = rebuildIndexes(); + try { + work.call(); + } catch (Exception e) { + // work can only throw an IOException, this code sucks + throw (IOException)e; + } JmxHelper.registerMBean( IdxRegionMBeanImpl.generateObjectName(getRegionInfo()), IdxRegionMBeanImpl.newIdxRegionMBeanImpl(this)); @@ -118,15 +126,21 @@ public class IdxRegion extends HRegion { * Rebuilds the index. */ @Override - protected void internalPreFlashcacheCommit() throws IOException { - rebuildIndexes(); - super.internalPreFlashcacheCommit(); + protected Callable internalPreFlushcacheCommit() throws IOException { + return rebuildIndexes(); } - private void rebuildIndexes() throws IOException { - long time = indexManager.rebuildIndexes(); + private Callable rebuildIndexes() throws IOException { + // Need to scan memstore, SO BAM GO + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + + Pair> aPair = indexManager.rebuildIndexes(); + + long time = aPair.getFirst(); buildTimes[currentBuildTimesIndex] = time; currentBuildTimesIndex = (currentBuildTimesIndex + 1) % buildTimes.length; + + return aPair.getSecond(); } @@ -151,23 +165,7 @@ public class IdxRegion extends HRegion { totalNonIndexedScans.incrementAndGet(); return super.instantiateInternalScanner(scan, additionalScanners); } else { - totalIndexedScans.incrementAndGet(); - // Grab a new search context - IdxSearchContext searchContext = indexManager.newSearchContext(); - // use the expression evaluator to determine the final set of ints - IntSet matchedExpression = null; - try { - matchedExpression = expressionEvaluator.evaluate( - searchContext, expression - ); - } catch (RuntimeException e) { - throw new DoNotRetryIOException(e.getMessage(), e); - } - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s rows matched the index expression", - matchedExpression.size())); - } - return new IdxRegionScanner(scan, searchContext, matchedExpression); + return new IdxRegionScanner(scan); } } @@ -281,19 +279,45 @@ public class IdxRegion extends HRegion { return indexManager.getIndexHeapSize(columnName); } + private IdxRegionScanner lastScanner = null; + class IdxRegionScanner extends RegionScanner { private final KeyProvider keyProvider; private KeyValue lastKeyValue; - IdxRegionScanner(Scan scan, IdxSearchContext idxSearchContext, - IntSet matchedExpression) throws IOException { + IdxRegionScanner(Scan scan) throws IOException { super(scan); + //DebugPrint.println("IdxRegionScanner."); + + Expression expression = IdxScan.getExpression(scan); + totalIndexedScans.incrementAndGet(); + + IdxSearchContext idxSearchContext = indexManager.newSearchContext(); + + // use the expression evaluator to determine the final set of ints + IntSet matchedExpression; + try { + matchedExpression = expressionEvaluator.evaluate( + idxSearchContext, expression + ); + } catch (RuntimeException e) { + throw new DoNotRetryIOException(e.getMessage(), e); + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("%s rows matched the index expression", + matchedExpression.size())); + } + numberOfOngoingIndexedScans.incrementAndGet(); keyProvider = new KeyProvider(idxSearchContext, matchedExpression, scan); } + @Override public boolean next(List outResults) throws IOException { + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + + //DebugPrint.println("IdxScanner.next"); // Seek to the next key value seekNext(); boolean result = super.next(outResults); @@ -304,6 +328,8 @@ public class IdxRegion extends HRegion { lastKeyValue = outResults.get(0); } + lastScanner = this; + return result; } @@ -342,9 +368,12 @@ public class IdxRegion extends HRegion { if (comparisonResult > 0) { break; } + //DebugPrint.println("seekNext: " + keyValue + " / " + lastKeyValue); } } while (true); + //DebugPrint.println("IdxScanner seek to: " + keyValue); + // seek the store heap to the next key // (this is what makes the scanner faster) getStoreHeap().seek(keyValue); @@ -354,6 +383,7 @@ public class IdxRegion extends HRegion { public void close() { numberOfOngoingIndexedScans.decrementAndGet(); keyProvider.close(); + super.close(); } } @@ -510,4 +540,5 @@ public class IdxRegion extends HRegion { return FIXED_OVERHEAD + super.heapSize() + indexManager.heapSize() + expressionEvaluator.heapSize(); } + } diff --git src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegionIndexManager.java src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegionIndexManager.java index 399a2df..7fa6503 100644 --- src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegionIndexManager.java +++ src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegionIndexManager.java @@ -43,8 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.Callable; /** * Manages the indexes for a single region. @@ -64,6 +63,7 @@ public class IdxRegionIndexManager implements HeapSize { * The wrapping region. */ private final IdxRegion region; + /** * The index map. Each pair holds the column and qualifier. */ @@ -78,7 +78,6 @@ public class IdxRegionIndexManager implements HeapSize { */ private long heapSize; - private final ReadWriteLock indexSwitchLock; private static final double INDEX_SIZE_GROWTH_FACTOR = 1.1; private static final double BYTES_IN_MB = 1024D * 1024D; @@ -89,10 +88,17 @@ public class IdxRegionIndexManager implements HeapSize { */ public IdxRegionIndexManager(IdxRegion region) { this.region = region; - indexSwitchLock = new ReentrantReadWriteLock(); heapSize = FIXED_SIZE; } + public ObjectArrayList getKeys() { + return keys; + } + + public Map, IdxIndex> getIndexMap() { + return indexMap; + } + /** * Creates and populates all indexes. Bruteforce scan fetching everything * into memory, creating indexes out of that. @@ -100,7 +106,7 @@ public class IdxRegionIndexManager implements HeapSize { * @return total time in millis to rebuild the indexes * @throws IOException in case scan throws */ - public long rebuildIndexes() throws IOException { + public Pair> rebuildIndexes() throws IOException { long startMillis = System.currentTimeMillis(); if (LOG.isInfoEnabled()) { LOG.info(String.format("Initializing index manager for region: %s", region.toString())); @@ -110,31 +116,42 @@ public class IdxRegionIndexManager implements HeapSize { // if the region is closing/closed then a fillIndex method will throw a // NotServingRegion exection when an attempt to obtain a scanner is made // NOTE: when the region is being created isClosing() returns true + Callable work = null; if (!(region.isClosing() || region.isClosed()) && !builderTable.isEmpty()) { try { - ObjectArrayList newKeys = fillIndex(builderTable); - Map, IdxIndex> newIndexMap = finalizeIndex(builderTable, newKeys); - switchIndex(newKeys, newIndexMap); + final ObjectArrayList newKeys = fillIndex(builderTable); + final Map, IdxIndex> newIndexMap = finalizeIndex(builderTable, newKeys); + work = new Callable() { + @Override + public Void call() throws Exception { + switchIndex(newKeys, newIndexMap); + return null; + } + }; } catch (NotServingRegionException e) { // the not serving exception may also be thrown during the scan if // the region was closed during the scan LOG.warn("Aborted index initialization", e); } } else { - switchIndex(new ObjectArrayList(), Collections., IdxIndex>emptyMap()); + work = new Callable() { + @Override + public Void call() throws Exception { + switchIndex(new ObjectArrayList(), Collections., IdxIndex>emptyMap()); + return null; + } + }; } - return System.currentTimeMillis() - startMillis; + return new Pair>(System.currentTimeMillis() - startMillis, + work); } - private void switchIndex(ObjectArrayList newKeys, - Map, IdxIndex> newIndexMap) { - indexSwitchLock.writeLock().lock(); - try { - this.keys = newKeys; - this.indexMap = newIndexMap; - } finally { - indexSwitchLock.writeLock().unlock(); - } + private void switchIndex(final ObjectArrayList newKeys, + final Map, IdxIndex> newIndexMap) { + // There is no lock here because switchIndex is called in the context of a newScannerLock + // and one scanners are created they keep a ref to the old index (which never changes). + this.keys = newKeys; + this.indexMap = newIndexMap; } /** @@ -264,12 +281,7 @@ public class IdxRegionIndexManager implements HeapSize { * @return the new search context. */ public IdxSearchContext newSearchContext() { - indexSwitchLock.readLock().lock(); - try { - return new IdxSearchContext(keys, indexMap); - } finally { - indexSwitchLock.readLock().unlock(); - } + return new IdxSearchContext(keys, indexMap); } @Override @@ -283,12 +295,7 @@ public class IdxRegionIndexManager implements HeapSize { * @return the number of keys. */ public int getNumberOfKeys() { - indexSwitchLock.readLock().lock(); - try { - return keys.size(); - } finally { - indexSwitchLock.readLock().unlock(); - } + return keys.size(); } /** @@ -302,13 +309,7 @@ public class IdxRegionIndexManager implements HeapSize { if (familyAndQualifier != null && familyAndQualifier.length == 2) { Pair fqPair = Pair.of(Bytes.toBytes(familyAndQualifier[0]), Bytes.toBytes(familyAndQualifier[1])); - indexSwitchLock.readLock().lock(); - IdxIndex idx = null; - try { - idx = indexMap.get(fqPair); - } finally { - indexSwitchLock.readLock().unlock(); - } + IdxIndex idx = indexMap.get(fqPair); if (idx != null) { return idx.heapSize(); } diff --git src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxSearchContext.java src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxSearchContext.java index b403b29..f43e0b2 100644 --- src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxSearchContext.java +++ src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxSearchContext.java @@ -49,6 +49,8 @@ public class IdxSearchContext { Map, IdxIndex> indexMap) { this.keys = keys; this.indexMap = indexMap; + //DebugPrint.println("IdxSearchContext w/ keys/indexMap: " + + // keys.hashCode() + " / " + indexMap.hashCode()); } /** diff --git src/contrib/indexed/src/test/org/apache/hadoop/hbase/TestIdxHBaseCluster.java src/contrib/indexed/src/test/org/apache/hadoop/hbase/TestIdxHBaseCluster.java index 9749177..59d908c 100644 --- src/contrib/indexed/src/test/org/apache/hadoop/hbase/TestIdxHBaseCluster.java +++ src/contrib/indexed/src/test/org/apache/hadoop/hbase/TestIdxHBaseCluster.java @@ -33,11 +33,14 @@ import org.apache.hadoop.hbase.client.idx.exp.Comparison; import org.apache.hadoop.hbase.client.idx.exp.Expression; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.DebugPrint; import org.apache.hadoop.hbase.regionserver.IdxRegion; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -102,13 +105,16 @@ public class TestIdxHBaseCluster extends TestHBaseCluster { int count = 0; int finalCount = maxRows / 10; int printCount = 0; + //List prevList = null; while (count < finalCount) { ResultScanner scanner = table.getScanner(idxScan); int nextCount = 0; + //List resultList = new ArrayList(); for (Result res : scanner) { nextCount++; Assert.assertTrue(Arrays.equals(res.getValue(family, qualifier), value)); + //resultList.add(res); } if (nextCount > printCount + 1000) { System.out.printf("++ found %d matching rows\n", nextCount); @@ -119,9 +125,19 @@ public class TestIdxHBaseCluster extends TestHBaseCluster { boolean condition = nextCount >= count && nextCount <= finalCount; if (!condition) { System.out.println("-------- " + infoString); + // Useful debugging harness. + //System.out.println("DEBUGPRINT:"); + //DebugPrint.dumpToFile("/tmp/debug.txt"); + //System.out.println(DebugPrint.out.toString()); + //System.out.println("----- MY RESULT"); + //for ( Result r : resultList ) System.out.println(r); + //System.out.println("---- PREV RESULTZ: " ); + //for ( Result r : prevList ) System.out.println(r); } Assert.assertTrue(infoString, condition); count = nextCount; + //prevList = resultList; + //DebugPrint.reset(); } service.shutdown(); } diff --git src/java/org/apache/hadoop/hbase/KeyValue.java src/java/org/apache/hadoop/hbase/KeyValue.java index 49a643e..a5665cf 100644 --- src/java/org/apache/hadoop/hbase/KeyValue.java +++ src/java/org/apache/hadoop/hbase/KeyValue.java @@ -200,6 +200,23 @@ public class KeyValue implements Writable, HeapSize { private int offset = 0; private int length = 0; + /** Here be dragons **/ + + // used to achieve atomic operations in the memstore. + public long getMemstoreTS() { + return memstoreTS; + } + + public void setMemstoreTS(long memstoreTS) { + this.memstoreTS = memstoreTS; + } + + // default value is 0, aka DNC + private long memstoreTS = 0; + + /** Dragon time over, return to normal business */ + + /** Writable Constructor -- DO NOT USE */ public KeyValue() {} @@ -1503,6 +1520,21 @@ public class KeyValue implements Writable, HeapSize { } /** + * Creates a KeyValue that is last on the specified row id. That is, + * every other possible KeyValue for the given row would compareTo() + * less than the result of this call. + * @param row row key + * @return Last possible KeyValue on passed row + */ + public static KeyValue createLastOnRow(final byte[] row) { + return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum); + } + + /** + * Create a KeyValue that is smaller than all other possible KeyValues + * for the given row. That is any (valid) KeyValue on 'row' would sort + * _after_ the result. + * * @param row - row key (arbitrary byte array) * @return First possible KeyValue on passed row */ @@ -1511,6 +1543,8 @@ public class KeyValue implements Writable, HeapSize { } /** + * Creates a KeyValue that is smaller than all other KeyValues that + * are older than the passed timestamp. * @param row - row key (arbitrary byte array) * @param ts - timestamp * @return First possible key on passed row and timestamp. @@ -1522,8 +1556,11 @@ public class KeyValue implements Writable, HeapSize { /** * @param row - row key (arbitrary byte array) + * @param c column - {@link #parseColumn(byte[])} is called to split + * the column. * @param ts - timestamp * @return First possible key on passed row, column and timestamp + * @deprecated */ public static KeyValue createFirstOnRow(final byte [] row, final byte [] c, final long ts) { @@ -1532,14 +1569,17 @@ public class KeyValue implements Writable, HeapSize { } /** + * Create a KeyValue for the specified row, family and qualifier that would be + * smaller than all other possible KeyValues that have the same row,family,qualifier. + * Used for seeking. * @param row - row key (arbitrary byte array) - * @param f - family name - * @param q - column qualifier + * @param family - family name + * @param qualifier - column qualifier * @return First possible key on passed row, and column. */ - public static KeyValue createFirstOnRow(final byte [] row, final byte [] f, - final byte [] q) { - return new KeyValue(row, f, q, HConstants.LATEST_TIMESTAMP, Type.Maximum); + public static KeyValue createFirstOnRow(final byte [] row, final byte [] family, + final byte [] qualifier) { + return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum); } /** @@ -1706,9 +1746,6 @@ public class KeyValue implements Writable, HeapSize { return compare; } - // if row matches, and no column in the 'left' AND put type is 'minimum', - // then return that left is larger than right. - // Compare column family. Start compare past row and family length. int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset; int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset; @@ -1717,17 +1754,25 @@ public class KeyValue implements Writable, HeapSize { int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE - (rcolumnoffset - roffset); + // if row matches, and no column in the 'left' AND put type is 'minimum', + // then return that left is larger than right. + // This supports 'last key on a row' - the magic is if there is no column in the // left operand, and the left operand has a type of '0' - magical value, // then we say the left is bigger. This will let us seek to the last key in // a row. byte ltype = left[loffset + (llength - 1)]; + byte rtype = right[roffset + (rlength - 1)]; if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) { return 1; // left is bigger. } + if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) { + return -1; + } + // TODO the family and qualifier should be compared separately compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right, rcolumnoffset, rcolumnlength); if (compare != 0) { @@ -1749,9 +1794,6 @@ public class KeyValue implements Writable, HeapSize { if (!this.ignoreType) { // Compare types. Let the delete types sort ahead of puts; i.e. types // of higher numbers sort before those of lesser numbers - - // ltype is defined above - byte rtype = right[roffset + (rlength - 1)]; return (0xff & rtype) - (0xff & ltype); } return 0; @@ -1791,7 +1833,8 @@ public class KeyValue implements Writable, HeapSize { public long heapSize() { return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE + ClassSize.align(ClassSize.ARRAY + length) + - (2 * Bytes.SIZEOF_INT)); + (2 * Bytes.SIZEOF_INT) + + Bytes.SIZEOF_LONG); } // Writable diff --git src/java/org/apache/hadoop/hbase/client/Scan.java src/java/org/apache/hadoop/hbase/client/Scan.java index 0972ce8..a03ceff 100644 --- src/java/org/apache/hadoop/hbase/client/Scan.java +++ src/java/org/apache/hadoop/hbase/client/Scan.java @@ -163,10 +163,29 @@ public class Scan implements Writable { } /** + * Builds a scan object with the same specs as get. + * @param get get to model scan after + */ + public Scan(Get get) { + this.startRow = get.getRow(); + this.stopRow = get.getRow(); + this.filter = get.getFilter(); + this.maxVersions = get.getMaxVersions(); + this.tr = get.getTimeRange(); + this.familyMap = get.getFamilyMap(); + } + + public boolean isGetScan() { + return this.startRow != null && this.startRow.length > 0 && + Bytes.equals(this.startRow, this.stopRow); + } + + /** * Get all columns from the specified family. *

* Overrides previous calls to addColumn for this family. * @param family family name + * @return this */ public Scan addFamily(byte [] family) { familyMap.remove(family); @@ -180,6 +199,7 @@ public class Scan implements Writable { * Overrides previous calls to addFamily for this family. * @param family family name * @param qualifier column qualifier + * @return this */ public Scan addColumn(byte [] family, byte [] qualifier) { NavigableSet set = familyMap.get(family); diff --git src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java index e1dbddc..463824a 100644 --- src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java +++ src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java @@ -178,9 +178,9 @@ public class SingleColumnValueFilter implements Filter { // byte array copy? int compareResult = this.comparator.compareTo(Arrays.copyOfRange(data, offset, offset + length)); - if (LOG.isDebugEnabled()) { - LOG.debug("compareResult=" + compareResult + " " + Bytes.toString(data, offset, length)); - } +// if (LOG.isDebugEnabled()) { +// LOG.debug("compareResult=" + compareResult + " " + Bytes.toString(data, offset, length)); +// } switch (this.compareOp) { case LESS: return compareResult <= 0; diff --git src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java new file mode 100644 index 0000000..8f9daf2 --- /dev/null +++ src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java @@ -0,0 +1,50 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.FileWriter; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class DebugPrint { + +private static final AtomicBoolean enabled = new AtomicBoolean(false); + private static final Object sync = new Object(); + public static StringBuilder out = new StringBuilder(); + + static public void enable() { + enabled.set(true); + } + static public void disable() { + enabled.set(false); + } + + static public void reset() { + synchronized (sync) { + enable(); // someone wants us enabled basically. + + out = new StringBuilder(); + } + } + static public void dumpToFile(String file) throws IOException { + FileWriter f = new FileWriter(file); + synchronized (sync) { + f.write(out.toString()); + } + f.close(); + } + + public static void println(String m) { + if (!enabled.get()) { + System.out.println(m); + return; + } + + synchronized (sync) { + String threadName = Thread.currentThread().getName(); + out.append("<"); + out.append(threadName); + out.append("> "); + out.append(m); + out.append("\n"); + } + } +} diff --git src/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3d38d4a..75a3ff9 100644 --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1,5 +1,5 @@ /** - * Copyright 2009 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -62,6 +62,7 @@ package org.apache.hadoop.hbase.regionserver; import java.util.NavigableSet; import java.util.TreeMap; import java.util.TreeSet; + import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -167,6 +168,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ /** * Set flags that make this region read-only. + * + * @param onOff flip value for region r/o setting */ synchronized void setReadOnly(final boolean onOff) { this.writesEnabled = !onOff; @@ -182,7 +185,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } - private volatile WriteState writestate = new WriteState(); + private final WriteState writestate = new WriteState(); final long memstoreFlushSize; private volatile long lastFlushTime; @@ -201,7 +204,15 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ private final Object splitLock = new Object(); private long minSequenceId; private boolean splitRequest; - + + + protected final ReadWriteConsistencyControl rwcc = + new ReadWriteConsistencyControl(); + + ReadWriteConsistencyControl getRWCC() { + return rwcc; + } + /** * Name of the region info file that resides just under the region directory. */ @@ -210,6 +221,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ /** * REGIONINFO_FILE as byte array. */ + @SuppressWarnings({"UnusedDeclaration"}) public final static byte [] REGIONINFO_FILE_BYTES = Bytes.toBytes(REGIONINFO_FILE); @@ -287,9 +299,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * Initialize this region and get it ready to roll. * Called after construction. * - * @param initialFiles - * @param reporter - * @throws IOException + * @param initialFiles path + * @param reporter progressable + * @throws IOException e */ public void initialize(Path initialFiles, final Progressable reporter) throws IOException { @@ -438,7 +450,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * HStores make use of. It's a list of all HStoreFile objects. Returns empty * vector if already closed and null if judged that it should not close. * - * @throws IOException + * @throws IOException e */ public List close() throws IOException { return close(false); @@ -456,7 +468,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * HStores make use of. It's a list of HStoreFile objects. Can be null if * we are not to close at this time or we are already closed. * - * @throws IOException + * @throws IOException e */ public List close(final boolean abort) throws IOException { if (isClosed()) { @@ -531,6 +543,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } /** @return region id */ + @SuppressWarnings({"UnusedDeclaration"}) public long getRegionId() { return this.regionInfo.getRegionId(); } @@ -571,6 +584,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } /** @return the last time the region was flushed */ + @SuppressWarnings({"UnusedDeclaration"}) public long getLastFlushTime() { return this.lastFlushTime; } @@ -672,8 +686,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null); moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir()); - HRegion regions[] = new HRegion [] {regionA, regionB}; - return regions; + return new HRegion [] {regionA, regionB}; } } @@ -747,7 +760,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * server does them sequentially and not in parallel. * * @return mid key if split is needed - * @throws IOException + * @throws IOException e */ public byte [] compactStores() throws IOException { boolean majorCompaction = this.forceMajorCompaction; @@ -768,7 +781,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * * @param majorCompaction True to force a major compaction regardless of thresholds * @return split row if split is needed - * @throws IOException + * @throws IOException e */ byte [] compactStores(final boolean majorCompaction) throws IOException { @@ -837,7 +850,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * * @return true if cache was flushed * - * @throws IOException + * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ @@ -903,7 +916,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * * @return true if the region needs compacting * - * @throws IOException + * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ @@ -931,18 +944,28 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // during the flush long sequenceId = -1L; long completeSequenceId = -1L; + + // we have to take a write lock during snapshot, or else a write could + // end up in both snapshot and memstore (makes it difficult to do atomic + // rows then) this.updatesLock.writeLock().lock(); - // Get current size of memstores. final long currentMemStoreSize = this.memstoreSize.get(); - List storeFlushers = new ArrayList(); + List storeFlushers = new ArrayList(stores.size()); try { sequenceId = log.startCacheFlush(); completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); - // create the store flushers + for (Store s : stores.values()) { storeFlushers.add(s.getStoreFlusher(completeSequenceId)); } + // This thread is going to cause a whole bunch of scanners to reseek. + // They are depending + // on a thread-local to know where to read from. + // The reason why we set it up high is so that each HRegionScanner only + // has a single read point for all its sub-StoreScanners. + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + // prepare flush (take a snapshot) for (StoreFlusher flusher: storeFlushers) { flusher.prepare(); @@ -951,6 +974,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this.updatesLock.writeLock().unlock(); } + LOG.debug("Finished snapshotting, commencing flushing stores"); + // Any failure from here on out will be catastrophic requiring server // restart so hlog content can be replayed and put back into the memstore. // Otherwise, the snapshot content while backed up in the hlog, it will not @@ -964,13 +989,24 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ flusher.flushCache(); } - internalPreFlashcacheCommit(); + Callable atomicWork = internalPreFlushcacheCommit(); + LOG.debug("Caches flushed, doing commit now (which includes update scanners)"); /** * Switch between memstore and the new store file */ - this.newScannerLock.writeLock().lock(); + if (atomicWork != null) { + LOG.debug("internalPreFlushcacheCommit gives us work to do, acquiring new scanner lock"); + newScannerLock.writeLock().lock(); + } try { + // update this again to make sure we are 'fresh' + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + + if (atomicWork != null) + atomicWork.call(); + + // now switch things out for (StoreFlusher flusher : storeFlushers) { boolean needsCompaction = flusher.commit(); if (needsCompaction) { @@ -978,10 +1014,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } } finally { - this.newScannerLock.writeLock().unlock(); + if (atomicWork != null) { + newScannerLock.writeLock().unlock(); + } } - // clear the stireFlushers list storeFlushers.clear(); // Set down the memstore size by amount of flush. this.memstoreSize.addAndGet(-currentMemStoreSize); @@ -1030,9 +1067,14 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * A hook for sub classed wishing to perform operations prior to the cache * flush commit stage. * + * If a subclass wishes that an atomic update of their work and the + * flush commit stage happens, they should return a callable. The new scanner + * lock will be acquired and released. + * * @throws java.io.IOException allow children to throw exception */ - protected void internalPreFlashcacheCommit() throws IOException { + protected Callable internalPreFlushcacheCommit() throws IOException { + return null; } /** @@ -1070,9 +1112,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * ts. * * @param row row key - * @param family + * @param family column family to find on * @return map of values - * @throws IOException + * @throws IOException read exceptions */ public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException { @@ -1091,9 +1133,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } // This will get all results for this store. TODO: Do we need to do this? Get get = new Get(key.getRow()); - List results = new ArrayList(); - store.get(get, null, results); - return new Result(results); + get.addFamily(family); + return get(get, null); } finally { splitsAndClosesLock.readLock().unlock(); } @@ -1107,7 +1148,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * * @param scan configured {@link Scan} * @return InternalScanner - * @throws IOException + * @throws IOException read exceptions */ public InternalScanner getScanner(Scan scan) throws IOException { @@ -1145,103 +1186,136 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // set() methods for client use. ////////////////////////////////////////////////////////////////////////////// /** - * @param delete - * @param lockid - * @param writeToWAL - * @throws IOException + * @param delete delete object + * @param lockid existing lock id, or null for grab a lock + * @param writeToWAL append to the write ahead lock or not + * @throws IOException read exceptions */ public void delete(Delete delete, Integer lockid, boolean writeToWAL) throws IOException { checkReadOnly(); checkResources(); Integer lid = null; - newScannerLock.writeLock().lock(); + splitsAndClosesLock.readLock().lock(); try { byte [] row = delete.getRow(); // If we did not pass an existing row lock, obtain a new one lid = getLock(lockid, row); - //Check to see if this is a deleteRow insert - if(delete.getFamilyMap().isEmpty()){ - for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){ + // Check to see if this is a deleteRow insert + if (delete.getFamilyMap().isEmpty()) { + for (byte [] family : regionInfo.getTableDesc().getFamiliesKeys()) { // Don't eat the timestamp delete.deleteFamily(family, delete.getTimeStamp()); } } else { - for(byte [] family : delete.getFamilyMap().keySet()) { - if(family == null) { + for (byte [] family : delete.getFamilyMap().keySet()) { + if (family == null) { throw new NoSuchColumnFamilyException("Empty family is invalid"); } checkFamily(family); } } - - for(Map.Entry> e: delete.getFamilyMap().entrySet()) { - byte [] family = e.getKey(); - delete(family, e.getValue(), writeToWAL); - } + + delete(delete.getFamilyMap(), writeToWAL); } finally { - if(lockid == null) releaseRowLock(lid); + if (lockid == null) releaseRowLock(lid); splitsAndClosesLock.readLock().unlock(); - newScannerLock.writeLock().unlock(); } } - + + // simple wrapper for testing + void delete(byte[] family, List edits, boolean writeToWAL) throws IOException { + Map> familyMap = new TreeMap>(); + familyMap.put(family, edits); + delete(familyMap, writeToWAL); + } /** - * @param family - * @param kvs + * @param familyMap * @param writeToWAL * @throws IOException */ - public void delete(byte [] family, List kvs, boolean writeToWAL) + public void delete(Map> familyMap, boolean writeToWAL) throws IOException { long now = System.currentTimeMillis(); byte [] byteNow = Bytes.toBytes(now); boolean flush = false; - this.updatesLock.readLock().lock(); + ReadWriteConsistencyControl.WriteEntry w = null; + updatesLock.readLock().lock(); try { if (writeToWAL) { - this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), kvs, - (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now); + for (Map.Entry> e : familyMap.entrySet()) { + List kvs = e.getValue(); + + this.log.append(regionInfo.getRegionName(), + regionInfo.getTableDesc().getName(), kvs, + (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now); + } } + + + w = rwcc.beginMemstoreInsert(); long size = 0; - Store store = getStore(family); - for (KeyValue kv: kvs) { - // Check if time is LATEST, change to time of most recent addition if so - // This is expensive. - if (kv.isLatestTimestamp() && kv.isDeleteType()) { - List result = new ArrayList(1); - Get g = new Get(kv.getRow()); - NavigableSet qualifiers = - new TreeSet(Bytes.BYTES_COMPARATOR); - byte [] q = kv.getQualifier(); - if(q == null) q = HConstants.EMPTY_BYTE_ARRAY; - qualifiers.add(q); - get(store, g, qualifiers, result); - if (result.isEmpty()) { - // Nothing to delete - continue; - } - if (result.size() > 1) { - throw new RuntimeException("Unexpected size: " + result.size()); + for (Map.Entry> e: familyMap.entrySet()) { + byte [] family = e.getKey(); + List kvs = e.getValue(); + + Store store = getStore(family); + Map kvCount = new TreeMap(Bytes.BYTES_COMPARATOR); + for (KeyValue kv: kvs) { + // Check if time is LATEST, change to time of most recent addition if so + // This is expensive. + if (kv.isLatestTimestamp() && kv.isDeleteType()) { + byte [] qual = kv.getQualifier(); + if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY; + + Integer count = kvCount.get(qual); + if (count == null) { + kvCount.put(qual, 1); + } else { + System.out.println("Deleting more than 1: " + (count+1)); + kvCount.put(qual, count + 1); + } + count = kvCount.get(qual); + + Get get = new Get(kv.getRow()); + get.addColumn(family, qual); + get.setMaxVersions(count); + + List result = get(get); + + if (result.size() < count) { + System.out.println("Result.size = " + result.size() + + " which is less than count = " + count); + System.out.println(result); + // Nothing to delete + kv.updateLatestStamp(byteNow); + continue; + } + if (result.size() > count) { + throw new RuntimeException("Unexpected size: " + result.size()); + } + KeyValue getkv = result.get(count - 1); + Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), + getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); + } else { + kv.updateLatestStamp(byteNow); } - KeyValue getkv = result.get(0); - Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), - getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); - } else { - kv.updateLatestStamp(byteNow); - } + kv.setMemstoreTS(w.getWriteNumber()); - size = this.memstoreSize.addAndGet(store.delete(kv)); + size = this.memstoreSize.addAndGet(store.delete(kv)); + } } flush = isFlushSize(size); } finally { - this.updatesLock.readLock().unlock(); + if (w != null) rwcc.completeMemstoreInsert(w); + + updatesLock.readLock().unlock(); } + if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); @@ -1289,42 +1363,26 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // read lock, resources may run out. For now, the thought is that this // will be extremely rare; we'll deal with it when it happens. checkResources(); - newScannerLock.writeLock().lock(); splitsAndClosesLock.readLock().lock(); + try { - // We obtain a per-row lock, so other clients will block while one client - // performs an update. The read lock is released by the client calling - // #commit or #abort or if the HRegionServer lease on the lock expires. - // See HRegionServer#RegionListener for how the expire on HRegionServer - // invokes a HRegion#abort. byte [] row = put.getRow(); - // If we did not pass an existing row lock, obtain a new one Integer lid = getLock(lockid, row); - byte [] now = Bytes.toBytes(System.currentTimeMillis()); + try { for (Map.Entry> entry: put.getFamilyMap().entrySet()) { - byte [] family = entry.getKey(); - checkFamily(family); - List puts = entry.getValue(); - if (updateKeys(puts, now)) { - put(family, puts, writeToWAL); - } + checkFamily(entry.getKey()); } + put(put.getFamilyMap(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } } finally { splitsAndClosesLock.readLock().unlock(); - newScannerLock.writeLock().unlock(); } } - - //TODO, Think that gets/puts and deletes should be refactored a bit so that - //the getting of the lock happens before, so that you would just pass it into - //the methods. So in the case of checkAndPut you could just do lockRow, - //get, put, unlockRow or something /** * * @param row @@ -1339,10 +1397,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ */ public boolean checkAndPut(byte [] row, byte [] family, byte [] qualifier, byte [] expectedValue, Put put, Integer lockId, boolean writeToWAL) - throws IOException{ + throws IOException{ checkReadOnly(); - //TODO, add check for value length or maybe even better move this to the - //client if this becomes a global setting checkResources(); splitsAndClosesLock.readLock().lock(); try { @@ -1350,41 +1406,29 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ checkFamily(family); get.addColumn(family, qualifier); - byte [] now = Bytes.toBytes(System.currentTimeMillis()); - // Lock row Integer lid = getLock(lockId, get.getRow()); - List result = new ArrayList(); + List result; + try { - //Getting data - for(Map.Entry> entry: - get.getFamilyMap().entrySet()) { - get(this.stores.get(entry.getKey()), get, entry.getValue(), result); - } + result = get(get); + boolean matches = false; if (result.size() == 0 && expectedValue.length == 0) { matches = true; - } else if(result.size() == 1) { + } else if (result.size() == 1) { //Compare the expected value with the actual value byte [] actualValue = result.get(0).getValue(); matches = Bytes.equals(expectedValue, actualValue); } //If matches put the new put - if(matches) { - for(Map.Entry> entry : - put.getFamilyMap().entrySet()) { - byte [] fam = entry.getKey(); - checkFamily(fam); - List puts = entry.getValue(); - if(updateKeys(puts, now)) { - put(fam, puts, writeToWAL); - } - } - return true; + if (matches) { + put(put.getFamilyMap(), writeToWAL); + return true; } return false; } finally { - if(lockId == null) releaseRowLock(lid); + if (lockId == null) releaseRowLock(lid); } } finally { splitsAndClosesLock.readLock().unlock(); @@ -1481,50 +1525,58 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } /** - * Add updates first to the hlog and then add values to memstore. - * Warning: Assumption is caller has lock on passed in row. - * @param edits Cell updates by column - * @praram now - * @throws IOException - */ - private void put(final byte [] family, final List edits) - throws IOException { - this.put(family, edits, true); - } - - /** * Add updates first to the hlog (if writeToWal) and then add values to memstore. * Warning: Assumption is caller has lock on passed in row. - * @param family - * @param edits + * @param familyMap * @param writeToWAL if true, then we should write to the log * @throws IOException */ - private void put(final byte [] family, final List edits, + private void put(final Map> familyMap, boolean writeToWAL) throws IOException { - if (edits == null || edits.isEmpty()) { - return; - } + + long now = System.currentTimeMillis(); + byte [] byteNow = Bytes.toBytes(now); boolean flush = false; + + ReadWriteConsistencyControl.WriteEntry w = null; this.updatesLock.readLock().lock(); try { - if (writeToWAL) { - long now = System.currentTimeMillis(); - this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), edits, - (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now); + + for (Map.Entry> e : familyMap.entrySet()) { + List edits = e.getValue(); + + checkFamily(e.getKey()); + updateKeys(edits, byteNow); + + if (writeToWAL) { + this.log.append(regionInfo.getRegionName(), + regionInfo.getTableDesc().getName(), edits, + (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now); + } } + long size = 0; - Store store = getStore(family); - for (KeyValue kv: edits) { - size = this.memstoreSize.addAndGet(store.add(kv)); + w = rwcc.beginMemstoreInsert(); + + for (Map.Entry> e : familyMap.entrySet()) { + byte [] family= e.getKey(); + List edits = e.getValue(); + + Store store = getStore(family); + for (KeyValue kv: edits) { + kv.setMemstoreTS(w.getWriteNumber()); + size = this.memstoreSize.addAndGet(store.add(kv)); + } } + flush = isFlushSize(size); } finally { + if (w != null) rwcc.completeMemstoreInsert(w); + this.updatesLock.readLock().unlock(); } + if (flush) { - // Request a cache flush. Do it outside update lock. requestFlush(); } } @@ -1741,8 +1793,12 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ private Filter filter; private RowFilterInterface oldFilter; private List results = new ArrayList(); + private int isScan; RegionScanner(Scan scan, List additionalScanners) { + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); +// DebugPrint.println("HRegionScanner., threadpoint = " + ReadWriteConsistencyControl.getThreadReadPoint()); + this.filter = scan.getFilter(); this.oldFilter = scan.getOldFilter(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { @@ -1750,12 +1806,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } else { this.stopRow = scan.getStopRow(); } + this.isScan = scan.isGetScan() ? -1 : 0; List scanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } - for (Map.Entry> entry : + for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); scanners.add(store.getScanner(scan, entry.getValue())); @@ -1778,6 +1835,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ if (oldFilter != null) { oldFilter.reset(); } + + // Start the next row read and reset the thread point + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); } public boolean next(List outResults) throws IOException { @@ -1786,6 +1846,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing=" + closing.get() + " or closed=" + closed.get()); } + + // This could be a new thread from the last time we called next(). + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); results.clear(); boolean returnResult = nextInternal(); if (!returnResult && filterRow()) { @@ -1859,7 +1922,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ return currentRow == null || (this.stopRow != null && comparator.compareRows(this.stopRow, 0, this.stopRow.length, - currentRow, 0, currentRow.length) <= 0); + currentRow, 0, currentRow.length) <= isScan); } private boolean filterRow() { @@ -2020,7 +2083,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ List edits = new ArrayList(); edits.add(new KeyValue(row, CATALOG_FAMILY, REGIONINFO_QUALIFIER, System.currentTimeMillis(), Writables.getBytes(r.getRegionInfo()))); - meta.put(HConstants.CATALOG_FAMILY, edits); + Map> familyMap = new TreeMap>(); + familyMap.put(HConstants.CATALOG_FAMILY, edits); + meta.put(familyMap, true); } finally { meta.releaseRowLock(lid); } @@ -2379,10 +2444,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // HBASE-880 // /** - * @param get - * @param lockid + * @param get get object + * @param lockid existing lock id, or null for no previous lock * @return result - * @throws IOException + * @throws IOException read exceptions */ public Result get(final Get get, final Integer lockid) throws IOException { // Verify families are all valid @@ -2397,22 +2462,33 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } // Lock row Integer lid = getLock(lockid, get.getRow()); - List result = new ArrayList(); + List result = null; try { - for (Map.Entry> entry: - get.getFamilyMap().entrySet()) { - get(this.stores.get(entry.getKey()), get, entry.getValue(), result); - } + result = get(get); } finally { - if(lockid == null) releaseRowLock(lid); + if(lockid == null) + releaseRowLock(lid); } return new Result(result); } - private void get(final Store store, final Get get, - final NavigableSet qualifiers, List result) - throws IOException { - store.get(get, qualifiers, result); + /* + * Do a get based on the get parameter. + */ + private List get(final Get get) throws IOException { + Scan scan = new Scan(get); + + List results = new ArrayList(); + + InternalScanner scanner = null; + try { + scanner = getScanner(scan); + scanner.next(results); + } finally { + if (scanner != null) + scanner.close(); + } + return results; } /** @@ -2421,6 +2497,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @param family * @param qualifier * @param amount + * @param writeToWAL * @return The new value. * @throws IOException */ @@ -2435,6 +2512,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ try { Store store = stores.get(family); + // TODO call the proper GET API // Get the old value: Get get = new Get(row); get.addColumn(family, qualifier); @@ -2500,7 +2578,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( (5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + - (19 * ClassSize.REFERENCE) + ClassSize.OBJECT); + (20 * ClassSize.REFERENCE) + ClassSize.OBJECT); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) + diff --git src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java index de3df22..440f5a7 100644 --- src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java +++ src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java @@ -1,5 +1,5 @@ /** - * Copyright 2009 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,6 +19,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.KeyValue; + import java.util.Collection; import java.util.Comparator; import java.util.Iterator; @@ -28,8 +30,6 @@ import java.util.SortedSet; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; -import org.apache.hadoop.hbase.KeyValue; - /** * A {@link java.util.Set} of {@link KeyValue}s implemented on top of a * {@link java.util.concurrent.ConcurrentSkipListMap}. Works like a @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.KeyValue; * has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent * get and set and won't throw ConcurrentModificationException when iterating. */ -class KeyValueSkipListSet implements NavigableSet, Cloneable { +class KeyValueSkipListSet implements NavigableSet { private ConcurrentNavigableMap delegatee; KeyValueSkipListSet(final KeyValue.KVComparator c) { @@ -167,6 +167,7 @@ class KeyValueSkipListSet implements NavigableSet, Cloneable { } public boolean contains(Object o) { + //noinspection SuspiciousMethodCalls return this.delegatee.containsKey(o); } @@ -201,17 +202,4 @@ class KeyValueSkipListSet implements NavigableSet, Cloneable { public T[] toArray(T[] a) { throw new UnsupportedOperationException("Not implemented"); } - - @Override - public KeyValueSkipListSet clone() { - assert this.delegatee.getClass() == ConcurrentSkipListMap.class; - KeyValueSkipListSet clonedSet = null; - try { - clonedSet = (KeyValueSkipListSet) super.clone(); - } catch (CloneNotSupportedException e) { - throw new InternalError(e.getMessage()); - } - clonedSet.delegatee = ((ConcurrentSkipListMap) this.delegatee).clone(); - return clonedSet; - } } \ No newline at end of file diff --git src/java/org/apache/hadoop/hbase/regionserver/MemStore.java src/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 7afe297..3ca35ae 100644 --- src/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ src/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -28,7 +28,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.SortedSet; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -77,7 +79,6 @@ public class MemStore implements HeapSize { // Used to track own heapSize final AtomicLong size; - /** * Default constructor. Used for tests. */ @@ -110,7 +111,7 @@ public class MemStore implements HeapSize { /** * Creates a snapshot of the current memstore. - * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.Map)} + * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.SortedSet)} * To get the snapshot made by this method, use {@link #getSnapshot()} */ void snapshot() { @@ -140,7 +141,7 @@ public class MemStore implements HeapSize { * call to {@link #snapshot()} * @return Return snapshot. * @see {@link #snapshot()} - * @see {@link #clearSnapshot(java.util.Map)} + * @see {@link #clearSnapshot(java.util.SortedSet)} */ KeyValueSkipListSet getSnapshot() { return this.snapshot; @@ -187,7 +188,7 @@ public class MemStore implements HeapSize { return s; } - /** + /** * Write a delete * @param delete * @return approximate size of the passed key and value. @@ -195,69 +196,8 @@ public class MemStore implements HeapSize { long delete(final KeyValue delete) { long s = 0; this.lock.readLock().lock(); - //Have to find out what we want to do here, to find the fastest way of - //removing things that are under a delete. - //Actions that will take place here are: - //1. Insert a delete and remove all the affected entries already in memstore - //2. In the case of a Delete and the matching put is found then don't insert - // the delete - //TODO Would be nice with if we had an iterator for this, so we could remove - //things that needs to be removed while iterating and don't have to go - //back and do it afterwards try { - boolean notpresent = false; - List deletes = new ArrayList(); - SortedSet tail = this.kvset.tailSet(delete); - - //Parse the delete, so that it is only done once - byte [] deleteBuffer = delete.getBuffer(); - int deleteOffset = delete.getOffset(); - - int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset); - deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT; - - short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset); - deleteOffset += Bytes.SIZEOF_SHORT; - int deleteRowOffset = deleteOffset; - - deleteOffset += deleteRowLen; - - byte deleteFamLen = deleteBuffer[deleteOffset]; - deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen; - - int deleteQualifierOffset = deleteOffset; - int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen - - Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG - - Bytes.SIZEOF_BYTE; - - deleteOffset += deleteQualifierLen; - - int deleteTimestampOffset = deleteOffset; - deleteOffset += Bytes.SIZEOF_LONG; - byte deleteType = deleteBuffer[deleteOffset]; - - //Comparing with tail from memstore - for (KeyValue kv : tail) { - DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer, - deleteRowOffset, deleteRowLen, deleteQualifierOffset, - deleteQualifierLen, deleteTimestampOffset, deleteType, - comparator.getRawComparator()); - if (res == DeleteCode.DONE) { - break; - } else if (res == DeleteCode.DELETE) { - deletes.add(kv); - } // SKIP - } - - //Delete all the entries effected by the last added delete - for (KeyValue kv : deletes) { - notpresent = this.kvset.remove(kv); - s -= heapSizeChange(kv, notpresent); - } - - // Adding the delete to memstore. Add any value, as long as - // same instance each time. s += heapSizeChange(delete, this.kvset.add(delete)); } finally { this.lock.readLock().unlock(); @@ -265,7 +205,7 @@ public class MemStore implements HeapSize { this.size.addAndGet(s); return s; } - + /** * @param kv Find the row that comes after this one. If null, we return the * first. @@ -318,7 +258,7 @@ public class MemStore implements HeapSize { } /** - * @param state + * @param state column/delete tracking state */ void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) { this.lock.readLock().lock(); @@ -442,8 +382,7 @@ public class MemStore implements HeapSize { this.lock.readLock().lock(); try { KeyValueScanner [] scanners = new KeyValueScanner[1]; - scanners[0] = new MemStoreScanner(this.kvset.clone(), - this.snapshot.clone(), this.comparator); + scanners[0] = new MemStoreScanner(); return scanners; } finally { this.lock.readLock().unlock(); @@ -465,10 +404,8 @@ public class MemStore implements HeapSize { * @param matcher Column matcher * @param result List to add results to * @return true if done with store (early-out), false if not - * @throws IOException */ - public boolean get(QueryMatcher matcher, List result) - throws IOException { + public boolean get(QueryMatcher matcher, List result) { this.lock.readLock().lock(); try { if(internalGet(this.kvset, matcher, result) || matcher.isDone()) { @@ -485,11 +422,11 @@ public class MemStore implements HeapSize { * Gets from either the memstore or the snapshop, and returns a code * to let you know which is which. * - * @param matcher - * @param result + * @param matcher query matcher + * @param result puts results here * @return 1 == memstore, 2 == snapshot, 0 == none */ - int getWithCode(QueryMatcher matcher, List result) throws IOException { + int getWithCode(QueryMatcher matcher, List result) { this.lock.readLock().lock(); try { boolean fromMemstore = internalGet(this.kvset, matcher, result); @@ -517,18 +454,16 @@ public class MemStore implements HeapSize { void readLockUnlock() { this.lock.readLock().unlock(); } - + /** * * @param set memstore or snapshot * @param matcher query matcher * @param result list to add results to * @return true if done with store (early-out), false if not - * @throws IOException */ boolean internalGet(final NavigableSet set, - final QueryMatcher matcher, final List result) - throws IOException { + final QueryMatcher matcher, final List result) { if(set.isEmpty()) return false; // Seek to startKey SortedSet tail = set.tailSet(matcher.getStartKey()); @@ -550,11 +485,151 @@ public class MemStore implements HeapSize { } return false; } + + + /* + * MemStoreScanner implements the KeyValueScanner. + * It lets the caller scan the contents of a memstore -- both current + * map and snapshot. + * This behaves as if it were a real scanner but does not maintain position. + */ + protected class MemStoreScanner implements KeyValueScanner { + // Next row information for either kvset or snapshot + private KeyValue kvsetNextRow = null; + private KeyValue snapshotNextRow = null; + + // iterator based scanning. + Iterator kvsetIt; + Iterator snapshotIt; + + /* + Some notes... + + So memstorescanner is fixed at creation time. this includes pointers/iterators into + existing kvset/snapshot. during a snapshot creation, the kvset is null, and the + snapshot is moved. since kvset is null there is no point on reseeking on both, + we can save us the trouble. During the snapshot->hfile transition, the memstore + scanner is re-created by StoreScanner#updateReaders(). StoreScanner should + potentially do something smarter by adjusting the existing memstore scanner. + + But there is a greater problem here, that being once a scanner has progressed + during a snapshot scenario, we currently iterate past the kvset then 'finish' up. + if a scan lasts a little while, there is a chance for new entries in kvset to + become available but we will never see them. This needs to be handled at the + StoreScanner level with coordination with MemStoreScanner. + + */ + + + MemStoreScanner() { + super(); + + //DebugPrint.println(" MS new@" + hashCode()); + } + + protected KeyValue getNext(Iterator it) { + KeyValue ret = null; + long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); + //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint); + + while (ret == null && it.hasNext()) { + KeyValue v = it.next(); + if (v.getMemstoreTS() <= readPoint) { + ret = v; + } + } + return ret; + } + + public synchronized boolean seek(KeyValue key) { + if (key == null) { + close(); + return false; + } + + // kvset and snapshot will never be empty. + // if tailSet cant find anything, SS is empty (not null). + SortedSet kvTail = kvset.tailSet(key); + SortedSet snapshotTail = snapshot.tailSet(key); + + kvsetIt = kvTail.iterator(); + snapshotIt = snapshotTail.iterator(); + + kvsetNextRow = getNext(kvsetIt); + snapshotNextRow = getNext(snapshotIt); + long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); + + //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " + + // kvset.size() + " threadread = " + readPoint); + //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " + + // snapshot.size() + " threadread = " + readPoint); + + KeyValue lowest = getLowest(); + // has data := (lowest != null) + return lowest != null; + } + + public synchronized KeyValue peek() { + //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest()); + return getLowest(); + } + + + public synchronized KeyValue next() { + KeyValue theNext = getLowest(); + + if (theNext == null) { + return null; + } + + // Advance one of the iterators + if (theNext == kvsetNextRow) { + kvsetNextRow = getNext(kvsetIt); + } else { + snapshotNextRow = getNext(snapshotIt); + } + //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); + //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + + // getLowest() + " threadpoint=" + readpoint); + + return theNext; + } + + protected KeyValue getLowest() { + return getLower(kvsetNextRow, + snapshotNextRow); + } + + /* + * Returns the lower of the two key values, or null if they are both null. + * This uses comparator.compare() to compare the KeyValue using the memstore + * comparator. + */ + protected KeyValue getLower(KeyValue first, KeyValue second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = comparator.compare(first, second); + return (compare <= 0 ? first : second); + } + return (first != null ? first : second); + } + + public synchronized void close() { + // Accelerate the GC a bit perhaps? + this.kvsetIt = null; + this.snapshotIt = null; + + this.kvsetNextRow = null; + this.snapshotNextRow = null; + } + } public final static long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + (7 * ClassSize.REFERENCE)); - + public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST + @@ -568,11 +643,11 @@ public class MemStore implements HeapSize { * @return Size */ long heapSizeChange(final KeyValue kv, final boolean notpresent) { - return notpresent ? + return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()): 0; } - + /** * Get the entire heap usage for this MemStore not including keys in the * snapshot. @@ -581,7 +656,7 @@ public class MemStore implements HeapSize { public long heapSize() { return size.get(); } - + /** * Get the heap usage of KVs in this MemStore. */ @@ -603,7 +678,7 @@ public class MemStore implements HeapSize { * enough. See hbase-900. Fills memstores then waits so user can heap * dump and bring up resultant hprof in something like jprofiler which * allows you get 'deep size' on objects. - * @param args + * @param args main args */ public static void main(String [] args) { RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); @@ -638,5 +713,4 @@ public class MemStore implements HeapSize { } LOG.info("Exiting."); } - } diff --git src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java deleted file mode 100644 index d342bab..0000000 --- src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.util.Iterator; -import java.util.SortedSet; -import java.util.TreeSet; - -/** - * MemStoreScanner implements the KeyValueScanner. - * It lets the caller scan the contents of a memstore -- both current - * map and snapshot. - *

- * The memstore scanner keeps its own reference to the main and snapshot - * key/value sets. Keeping those references allows the scanner to be indifferent - * to memstore flushes. Calling the {@link #close()} method ensures that the - * references to those classes are null'd allowing the GC to pick them up. - */ -class MemStoreScanner implements KeyValueScanner { - private static final Log LOG = LogFactory.getLog(MemStoreScanner.class); - - private static final - SortedSet EMPTY_SET = new TreeSet(); - private static final Iterator EMPTY_ITERATOR = - new Iterator() { - - @Override - public boolean hasNext() { - return false; - } - @Override - public KeyValue next() { - return null; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - - - private SortedSet kvsetRef; - private SortedSet snapshotRef; - private KeyValue.KVComparator comparatorRef; - private Iterator kvsetIterator; - private Iterator snapshotIterator; - - private KeyValue currentKvsetKV; - private KeyValue currentSnapshotKV; - private KeyValue nextKV; - - /** - * Create a new memstore scanner. - * - * @param kvset the main key value set - * @param snapshot the snapshot set - * @param comparator the comparator to use - */ - MemStoreScanner(KeyValueSkipListSet kvset, - KeyValueSkipListSet snapshot, KeyValue.KVComparator comparator) { - super(); - this.kvsetRef = kvset; - this.snapshotRef = snapshot != null ? snapshot : EMPTY_SET; - this.comparatorRef = comparator; - this.kvsetIterator = kvsetRef.iterator(); - this.snapshotIterator = snapshotRef.iterator(); - this.nextKV = currentKvsetKV = currentSnapshotKV = null; - } - - private void fill() { - if (nextKV == null) { - if (currentSnapshotKV == null && snapshotIterator.hasNext()) { - currentSnapshotKV = snapshotIterator.next(); - } - - if (currentKvsetKV == null && kvsetIterator.hasNext()) { - currentKvsetKV = kvsetIterator.next(); - } - - if (currentSnapshotKV != null && currentKvsetKV != null) { - int cmp = comparatorRef.compare(currentSnapshotKV, currentKvsetKV); - if (cmp <= 0) { - nextKV = currentSnapshotKV; - currentSnapshotKV = null; - } else { - nextKV = currentKvsetKV; - currentKvsetKV = null; - } - } else if (currentSnapshotKV != null) { - nextKV = currentSnapshotKV; - currentSnapshotKV = null; - } else { - nextKV = currentKvsetKV; - currentKvsetKV = null; - } - } - } - - @Override - public synchronized boolean seek(KeyValue key) { - if (key == null) { - close(); - return false; - } - SortedSet kvsetTail = kvsetRef.tailSet(key); - SortedSet snapshotTail = snapshotRef != null ? - snapshotRef.tailSet(key) : EMPTY_SET; - - kvsetIterator = kvsetTail.iterator(); - snapshotIterator = snapshotTail.iterator(); - - currentKvsetKV = null; - currentSnapshotKV = null; - nextKV = null; - - return kvsetIterator.hasNext() || snapshotIterator.hasNext(); - } - - @Override - public synchronized KeyValue peek() { - fill(); - return nextKV; - } - - @Override - public synchronized KeyValue next() { - fill(); - KeyValue next = nextKV; - nextKV = null; - return next; - } - - public synchronized void close() { - this.kvsetRef = EMPTY_SET; - this.snapshotRef = EMPTY_SET; - this.kvsetIterator = EMPTY_ITERATOR; - this.snapshotIterator = EMPTY_ITERATOR; - this.currentKvsetKV = null; - this.currentSnapshotKV = null; - this.nextKV = null; - } -} diff --git src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java new file mode 100644 index 0000000..b1f1368 --- /dev/null +++ src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Manages the read/write consistency within memstore. This provides + * an interface for readers to determine what entries to ignore, and + * a mechanism for writers to obtain new write numbers, then "commit" + * the new writes for readers to read (thus forming atomic transactions). + */ +public class ReadWriteConsistencyControl { + private final AtomicLong memstoreRead = new AtomicLong(); + private final AtomicLong memstoreWrite = new AtomicLong(); + // This is the pending queue of writes. + private final LinkedList writeQueue = + new LinkedList(); + + private static final ThreadLocal perThreadReadPoint = + new ThreadLocal(); + + public static long getThreadReadPoint() { + return perThreadReadPoint.get(); + } + + public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) { + perThreadReadPoint.set(rwcc.memstoreReadPoint()); + return getThreadReadPoint(); + } + + public WriteEntry beginMemstoreInsert() { + synchronized (writeQueue) { + long nextWriteNumber = memstoreWrite.incrementAndGet(); + WriteEntry e = new WriteEntry(nextWriteNumber); + writeQueue.add(e); + return e; + } + } + public void completeMemstoreInsert(WriteEntry e) { + synchronized (writeQueue) { + e.markCompleted(); + + long nextReadValue = -1; + boolean ranOnce=false; + while (!writeQueue.isEmpty()) { + ranOnce=true; + WriteEntry queueFirst = writeQueue.getFirst(); + + if (nextReadValue > 0) { + if (nextReadValue+1 != queueFirst.getWriteNumber()) { + throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " + + nextReadValue + " next: " + queueFirst.getWriteNumber()); + } + } + + if (queueFirst.isCompleted()) { + nextReadValue = queueFirst.getWriteNumber(); + writeQueue.removeFirst(); + } else { + break; + } + } + + if (!ranOnce) { + throw new RuntimeException("never was a first"); + } + + if (nextReadValue > 0) { + memstoreRead.set(nextReadValue); + } + } + + // Spin until any other concurrent puts have finished. This makes sure that + // if we move on to construct a scanner, we'll get read-your-own-writes + // consistency. We anticipate that since puts to the memstore are very fast, + // this will be on the order of microseconds - so spinning should be faster + // than a condition variable. + int spun = 0; + while (memstoreRead.get() < e.getWriteNumber()) { + spun++; + } + // Could potentially expose spun as a metric + } + + public long memstoreReadPoint() { + return memstoreRead.get(); + } + + + public static class WriteEntry { + private long writeNumber; + private boolean completed = false; + WriteEntry(long writeNumber) { + this.writeNumber = writeNumber; + } + void markCompleted() { + this.completed = true; + } + boolean isCompleted() { + return this.completed; + } + long getWriteNumber() { + return this.writeNumber; + } + } +} diff --git src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index a42289d..4a71876 100644 --- src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -55,7 +55,11 @@ public class ScanQueryMatcher extends QueryMatcher { this.rowComparator = rowComparator; this.deletes = new ScanDeleteTracker(); this.startKey = KeyValue.createFirstOnRow(scan.getStartRow()); - this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow()); + if (scan.isGetScan()) { + this.stopKey = KeyValue.createLastOnRow(scan.getStopRow()); + } else { + this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow()); + } this.filter = scan.getFilter(); this.oldFilter = scan.getOldFilter(); diff --git src/java/org/apache/hadoop/hbase/regionserver/Store.java src/java/org/apache/hadoop/hbase/regionserver/Store.java index 4e2ed99..8ff49ee 100644 --- src/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -495,7 +495,7 @@ public class Store implements HConstants, HeapSize { /** * Snapshot this stores memstore. Call before running - * {@link #flushCache(long)} so it has some work to do. + * {@link #flushCache(long, java.util.SortedSet)} so it has some work to do. */ void snapshot() { this.memstore.snapshot(); @@ -600,9 +600,12 @@ public class Store implements HConstants, HeapSize { this.lock.writeLock().lock(); try { this.storefiles.put(Long.valueOf(logCacheFlushId), sf); + + this.memstore.clearSnapshot(set); + // Tell listeners of the change in readers. notifyChangedReadersObservers(); - this.memstore.clearSnapshot(set); + return this.storefiles.size() >= this.compactionThreshold; } finally { this.lock.writeLock().unlock(); @@ -630,10 +633,8 @@ public class Store implements HConstants, HeapSize { * @param o Observer no longer interested in changes in set of Readers. */ void deleteChangedReaderObserver(ChangedReadersObserver o) { - if(this.changedReaderObservers.size() > 0) { - if (!this.changedReaderObservers.remove(o)) { - LOG.warn("Not in set" + o); - } + if (!this.changedReaderObservers.remove(o)) { + LOG.warn("Not in set" + o); } } @@ -852,7 +853,6 @@ public class Store implements HConstants, HeapSize { /** * Do a minor/major compaction. Uses the scan infrastructure to make it easy. * - * @param writer output writer * @param filesToCompact which files to compact * @param majorCompaction true to major compact (prune all deletes, max versions, etc) * @param maxId Readers maximum sequence id. @@ -985,6 +985,10 @@ public class Store implements HConstants, HeapSize { Long orderVal = Long.valueOf(result.getMaxSequenceId()); this.storefiles.put(orderVal, result); } + + // WARN ugly hack here, but necessary sadly. + ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + // Tell observers that list of StoreFiles has changed. notifyChangedReadersObservers(); // Finally, delete old store files. @@ -1475,7 +1479,12 @@ public class Store implements HConstants, HeapSize { } /** - * Increments the value for the given row/family/qualifier + * Increments the value for the given row/family/qualifier. + * + * This function will always be seen as atomic by other readers + * because it only puts a single KV to memstore. Thus no + * read/write control necessary. + * * @param row * @param f * @param qualifier diff --git src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 4a6ce1b..72e0286 100644 --- src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NavigableSet; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,21 +45,29 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb private boolean cacheBlocks; // Used to indicate that the scanner has closed (see HBASE-1107) - private final AtomicBoolean closing = new AtomicBoolean(false); + private boolean closing = false; + private final boolean isGet; /** * Opens a scanner across memstore, snapshot, and all StoreFiles. + * + * @param store who we scan + * @param scan the spec + * @param columns which columns we are scanning */ StoreScanner(Store store, Scan scan, final NavigableSet columns) { + //DebugPrint.println("SS new"); this.store = store; this.cacheBlocks = scan.getCacheBlocks(); matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), columns, store.ttl, store.comparator.getRawComparator(), store.versionsToReturn(scan.getMaxVersions())); + this.isGet = scan.isGetScan(); List scanners = getScanners(); // Seek all scanners to the initial key + // TODO if scan.isGetScan, use bloomfilters to skip seeking for(KeyValueScanner scanner : scanners) { scanner.seek(matcher.getStartKey()); } @@ -70,16 +77,21 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb scanners.toArray(new KeyValueScanner[scanners.size()]), store.comparator); this.store.addChangedReaderObserver(this); + } /** * Used for major compactions.

* * Opens a scanner across specified StoreFiles. + * @param store who we scan + * @param scan the spec + * @param scanners ancilliary scanners */ StoreScanner(Store store, Scan scan, KeyValueScanner [] scanners) { this.store = store; this.cacheBlocks = false; + this.isGet = false; matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), null, store.ttl, store.comparator.getRawComparator(), store.versionsToReturn(scan.getMaxVersions())); @@ -99,6 +111,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb final NavigableSet columns, final KeyValueScanner [] scanners) { this.store = null; + this.isGet = false; this.cacheBlocks = scan.getCacheBlocks(); this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, comparator.getRawComparator(), scan.getMaxVersions()); @@ -132,7 +145,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb } public synchronized void close() { - this.closing.set(true); + this.closing = true; // under test, we dont have a this.store if (this.store != null) this.store.deleteChangedReaderObserver(this); @@ -145,10 +158,11 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb /** * Get the next row of values from this Store. - * @param result + * @param outResult * @return true if there are more rows, false if scanner is done */ public synchronized boolean next(List outResult) throws IOException { + //DebugPrint.println("SS.next"); KeyValue peeked = this.heap.peek(); if (peeked == null) { close(); @@ -159,6 +173,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb List results = new ArrayList(); while((kv = this.heap.peek()) != null) { QueryMatcher.MatchCode qcode = matcher.match(kv); + //DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode); switch(qcode) { case INCLUDE: KeyValue next = this.heap.next(); @@ -219,8 +234,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb LOG.warn("StoreFile " + sf + " has null Reader"); continue; } - // Get a scanner that does not use pread. - s.add(r.getScanner(this.cacheBlocks, false)); + // If isGet, use pread, else false, dont use pread + s.add(r.getScanner(this.cacheBlocks, isGet)); } List scanners = new ArrayList(s.size()+1); @@ -232,12 +247,20 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb // Implementation of ChangedReadersObserver public synchronized void updateReaders() throws IOException { - if (this.closing.get()) return; + if (this.closing) return; KeyValue topKey = this.peek(); if (topKey == null) return; + + //DebugPrint.println("SS updateReaders, topKey = " + topKey); + + // TODO perhaps do something more elegant than new-memstore scanner like adjust with a seek? + + // close the previous scanners: + this.heap.close(); // bubble thru and close all scanners. + this.heap = null; // the re-seeks could be slow, free up memory ASAP. + List scanners = getScanners(); - // Seek all scanners to the initial key for(KeyValueScanner scanner : scanners) { scanner.seek(topKey); } diff --git src/java/org/apache/hadoop/hbase/util/FSUtils.java src/java/org/apache/hadoop/hbase/util/FSUtils.java index 1bddbf5..09bac3d 100644 --- src/java/org/apache/hadoop/hbase/util/FSUtils.java +++ src/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -367,6 +367,7 @@ public class FSUtils { return true; } + // TODO move this method OUT of FSUtils. No dependencies to HMaster /** * Expects to find -ROOT- directory. * @param fs diff --git src/test/org/apache/hadoop/hbase/TestKeyValue.java src/test/org/apache/hadoop/hbase/TestKeyValue.java index 1b74ae1..6f56769 100644 --- src/test/org/apache/hadoop/hbase/TestKeyValue.java +++ src/test/org/apache/hadoop/hbase/TestKeyValue.java @@ -276,4 +276,49 @@ public class TestKeyValue extends TestCase { // TODO actually write this test! } + + private final byte[] rowA = Bytes.toBytes("rowA"); + private final byte[] rowB = Bytes.toBytes("rowB"); + + private final byte[] family = Bytes.toBytes("family"); + private final byte[] qualA = Bytes.toBytes("qfA"); + + private void assertKVLess(KeyValue.KVComparator c, + KeyValue less, + KeyValue greater) { + int cmp = c.compare(less,greater); + assertTrue(cmp < 0); + cmp = c.compare(greater,less); + assertTrue(cmp > 0); + } + + public void testFirstLastOnRow() { + final KVComparator c = KeyValue.COMPARATOR; + long ts = 1; + + // These are listed in sort order (ie: every one should be less + // than the one on the next line). + final KeyValue firstOnRowA = KeyValue.createFirstOnRow(rowA); + final KeyValue kvA_1 = new KeyValue(rowA, null, null, ts, Type.Put); + final KeyValue kvA_2 = new KeyValue(rowA, family, qualA, ts, Type.Put); + + final KeyValue lastOnRowA = KeyValue.createLastOnRow(rowA); + final KeyValue firstOnRowB = KeyValue.createFirstOnRow(rowB); + final KeyValue kvB = new KeyValue(rowB, family, qualA, ts, Type.Put); + + assertKVLess(c, firstOnRowA, firstOnRowB); + assertKVLess(c, firstOnRowA, kvA_1); + assertKVLess(c, firstOnRowA, kvA_2); + assertKVLess(c, kvA_1, kvA_2); + assertKVLess(c, kvA_2, firstOnRowB); + assertKVLess(c, kvA_1, firstOnRowB); + + assertKVLess(c, lastOnRowA, firstOnRowB); + assertKVLess(c, firstOnRowB, kvB); + assertKVLess(c, lastOnRowA, kvB); + + assertKVLess(c, kvA_2, lastOnRowA); + assertKVLess(c, kvA_1, lastOnRowA); + assertKVLess(c, firstOnRowA, lastOnRowA); + } } diff --git src/test/org/apache/hadoop/hbase/client/TestClient.java src/test/org/apache/hadoop/hbase/client/TestClient.java index d8b58bc..e75c4c2 100644 --- src/test/org/apache/hadoop/hbase/client/TestClient.java +++ src/test/org/apache/hadoop/hbase/client/TestClient.java @@ -67,7 +67,7 @@ public class TestClient extends HBaseClusterTestCase { super(); } - /** + /* * Test from client side of an involved filter against a multi family that * involves deletes. * @@ -196,7 +196,7 @@ public class TestClient extends HBaseClusterTestCase { } } - /** + /* * Test filters when multiple regions. It does counts. Needs eye-balling of * logs to ensure that we're not scanning more regions that we're supposed to. * Related to the TestFilterAcrossRegions over in the o.a.h.h.filter package. @@ -253,7 +253,7 @@ public class TestClient extends HBaseClusterTestCase { assertEquals(rowCount - endKeyCount, countGreater); } - /** + /* * Load table with rows from 'aaa' to 'zzz'. * @param t * @return Count of rows loaded. @@ -418,7 +418,7 @@ public class TestClient extends HBaseClusterTestCase { scanner.close(); } - /** + /* * Test simple table and non-existent row cases. */ public void testSimpleMissing() throws Exception { @@ -531,7 +531,7 @@ public class TestClient extends HBaseClusterTestCase { assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); } - /** + /* * Test basic puts, gets, scans, and deletes for a single row * in a multiple family table. */ @@ -1438,7 +1438,7 @@ public class TestClient extends HBaseClusterTestCase { ht.put(put); delete = new Delete(ROW); - delete.deleteColumn(FAMILIES[0], QUALIFIER); + delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4] ht.delete(delete); get = new Get(ROW); @@ -1473,20 +1473,21 @@ public class TestClient extends HBaseClusterTestCase { // But alas, this is not to be. We can't put them back in either case. put = new Put(ROW); - put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); - put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000 + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000 ht.put(put); - // The Get returns the latest value but then does not return the - // oldest, which was never deleted, ts[1]. - + + // It used to be due to the internal implementation of Get, that + // the Get() call would return ts[4] UNLIKE the Scan below. With + // the switch to using Scan for Get this is no longer the case. get = new Get(ROW); get.addFamily(FAMILIES[0]); get.setMaxVersions(Integer.MAX_VALUE); result = ht.get(get); assertNResult(result, ROW, FAMILIES[0], QUALIFIER, - new long [] {ts[2], ts[3], ts[4]}, - new byte[][] {VALUES[2], VALUES[3], VALUES[4]}, + new long [] {ts[1], ts[2], ts[3]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, 0, 2); // The Scanner returns the previous values, the expected-unexpected behavior @@ -1544,7 +1545,7 @@ public class TestClient extends HBaseClusterTestCase { result = ht.get(get); assertTrue("Expected 2 keys but received " + result.size(), result.size() == 2); - assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, + assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, new long [] {ts[0], ts[1]}, new byte[][] {VALUES[0], VALUES[1]}, 0, 1); @@ -1582,9 +1583,9 @@ public class TestClient extends HBaseClusterTestCase { get.addFamily(FAMILIES[2]); get.setMaxVersions(Integer.MAX_VALUE); result = ht.get(get); - assertTrue("Expected 1 key but received " + result.size(), - result.size() == 1); - assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, + System.out.println(result); + assertEquals(1, result.size()); + assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, new long [] {ts[2]}, new byte[][] {VALUES[2]}, 0, 0); @@ -1594,9 +1595,8 @@ public class TestClient extends HBaseClusterTestCase { scan.addFamily(FAMILIES[2]); scan.setMaxVersions(Integer.MAX_VALUE); result = getSingleScanResult(ht, scan); - assertTrue("Expected 1 key but received " + result.size(), - result.size() == 1); - assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, + assertEquals(1, result.size()); + assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, new long [] {ts[2]}, new byte[][] {VALUES[2]}, 0, 0); @@ -1682,7 +1682,7 @@ public class TestClient extends HBaseClusterTestCase { } } - /** + /* * Baseline "scalability" test. * * Tests one hundred families, one million columns, one million versions @@ -1729,7 +1729,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * Explicitly test JIRAs related to HBASE-880 / Client API */ public void testJIRAs() throws Exception { @@ -1745,7 +1745,7 @@ public class TestClient extends HBaseClusterTestCase { // JIRA Testers // - /** + /* * HBASE-867 * If millions of columns in a column family, hbase scanner won't come up * @@ -1835,7 +1835,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * HBASE-861 * get with timestamp will return a value if there is a version with an * earlier timestamp @@ -1898,7 +1898,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * HBASE-33 * Add a HTable get/obtainScanner method that retrieves all versions of a * particular column and row between two timestamps @@ -1947,7 +1947,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * HBASE-1014 * commit(BatchUpdate) method should return timestamp */ @@ -1971,7 +1971,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * HBASE-1182 * Scan for columns > some timestamp */ @@ -2016,7 +2016,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * HBASE-52 * Add a means of scanning over all versions */ @@ -2414,7 +2414,7 @@ public class TestClient extends HBaseClusterTestCase { - /** + /* * Verify a single column using gets. * Expects family and qualifier arrays to be valid for at least * the range: idx-2 < idx < idx+2 @@ -2471,7 +2471,7 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * Verify a single column using scanners. * Expects family and qualifier arrays to be valid for at least * the range: idx-2 to idx+2 @@ -2533,11 +2533,11 @@ public class TestClient extends HBaseClusterTestCase { } - /** + /* * Verify we do not read any values by accident around a single column * Same requirements as getVerifySingleColumn */ - private void getVerifySingleEmpty(HTable ht, + private void getVerifySingleEmpty(HTable ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES, int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX) @@ -2659,12 +2659,11 @@ public class TestClient extends HBaseClusterTestCase { "Got row [" + Bytes.toString(result.getRow()) +"]", equals(row, result.getRow())); int expectedResults = end - start + 1; - assertTrue("Expected " + expectedResults + " keys but result contains " - + result.size(), result.size() == expectedResults); - + assertEquals(expectedResults, result.size()); + KeyValue [] keys = result.sorted(); - for(int i=0;i kvs = new ArrayList(); kvs.add(new KeyValue(row1, fam4, null, null)); @@ -1431,6 +1456,41 @@ public class TestHRegion extends HBaseTestCase { assertICV(row, fam1, qual1, value+amount); } + public void testIncrementColumnValue_BumpSnapshot() throws IOException { + initHRegion(tableName, getName(), fam1); + + long value = 42L; + long incr = 44L; + + // first put something in kvset, then snapshot it. + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + region.put(put); + + // get the store in question: + Store s = region.getStore(fam1); + s.snapshot(); //bam + + // now increment: + long newVal = region.incrementColumnValue(row, fam1, qual1, + incr, false); + + assertEquals(value+incr, newVal); + + // get both versions: + Get get = new Get(row); + get.setMaxVersions(); + get.addColumn(fam1,qual1); + + Result r = region.get(get, null); + assertEquals(2, r.size()); + KeyValue first = r.raw()[0]; + KeyValue second = r.raw()[1]; + + assertTrue("ICV failed to upgrade timestamp", + first.getTimestamp() != second.getTimestamp()); + } + public void testIncrementColumnValue_ConcurrentFlush() throws IOException { initHRegion(tableName, getName(), fam1); @@ -1644,7 +1704,7 @@ public class TestHRegion extends HBaseTestCase { assertEquals(expected.get(i), actual.get(i)); } } - + ////////////////////////////////////////////////////////////////////////////// // Split test ////////////////////////////////////////////////////////////////////////////// @@ -1942,7 +2002,7 @@ public class TestHRegion extends HBaseTestCase { } if (i != 0 && i % flushInterval == 0) { - //System.out.println("scan iteration = " + i); + //System.out.println("flush scan iteration = " + i); flushThread.flush(); } @@ -1951,9 +2011,10 @@ public class TestHRegion extends HBaseTestCase { InternalScanner scanner = region.getScanner(scan); while (scanner.next(res)) ; if (!res.isEmpty() || !previousEmpty || i > compactInterval) { - Assert.assertEquals("i=" + i, expectedCount, res.size()); + assertEquals("i=" + i, expectedCount, res.size()); long timestamp = res.get(0).getTimestamp(); - Assert.assertTrue(timestamp >= prevTimestamp); + assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp, + timestamp >= prevTimestamp); prevTimestamp = timestamp; } } @@ -2003,15 +2064,16 @@ public class TestHRegion extends HBaseTestCase { for (int r = 0; r < numRows; r++) { byte[] row = Bytes.toBytes("row" + r); Put put = new Put(row); - for (int f = 0; f < families.length; f++) { - for (int q = 0; q < qualifiers.length; q++) { - put.add(families[f], qualifiers[q], (long) val, - Bytes.toBytes(val)); + for (byte[] family : families) { + for (byte[] qualifier : qualifiers) { + put.add(family, qualifier, (long) val, + Bytes.toBytes(val)); } } +// System.out.println("Putting of kvsetsize=" + put.size()); region.put(put); - if (val > 0 && val % 47 == 0){ - //System.out.println("put iteration = " + val); + if (val > 0 && val % 47 == 0) { + System.out.println("put iteration = " + val); Delete delete = new Delete(row, (long)val-30, null); region.delete(delete, null, true); } diff --git src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 1e602e7..28a01a2 100644 --- src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -46,10 +47,12 @@ public class TestMemStore extends TestCase { private static final byte [] FAMILY = Bytes.toBytes("column"); private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic"); private static final String CONTENTSTR = "contentstr"; + private ReadWriteConsistencyControl rwcc; @Override public void setUp() throws Exception { super.setUp(); + this.rwcc = new ReadWriteConsistencyControl(); this.memstore = new MemStore(); } @@ -75,6 +78,7 @@ public class TestMemStore extends TestCase { KeyValueScanner [] memstorescanners = this.memstore.getScanners(); Scan scan = new Scan(); List result = new ArrayList(); + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, this.memstore.comparator, null, memstorescanners); int count = 0; @@ -93,6 +97,8 @@ public class TestMemStore extends TestCase { for (int i = 0; i < memstorescanners.length; i++) { memstorescanners[0].close(); } + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, @@ -137,9 +143,9 @@ public class TestMemStore extends TestCase { if (count == snapshotIndex) { this.memstore.snapshot(); this.memstore.clearSnapshot(this.memstore.getSnapshot()); - // Added more rows into kvset. + // Added more rows into kvset. But the scanner wont see these rows. addRows(this.memstore, ts); - LOG.info("Snapshotted, cleared it and then added values"); + LOG.info("Snapshotted, cleared it and then added values (which wont be seen)"); } result.clear(); } @@ -149,6 +155,181 @@ public class TestMemStore extends TestCase { assertEquals(rowCount, count); } + /** + * A simple test which verifies the 3 possible states when scanning across snapshot. + */ + public void testScanAcrossSnapshot2() { + // we are going to the scanning across snapshot with two kvs + // kv1 should always be returned before kv2 + final byte[] one = Bytes.toBytes(1); + final byte[] two = Bytes.toBytes(2); + final byte[] f = Bytes.toBytes("f"); + final byte[] q = Bytes.toBytes("q"); + final byte[] v = Bytes.toBytes(3); + + final KeyValue kv1 = new KeyValue(one, f, q, v); + final KeyValue kv2 = new KeyValue(two, f, q, v); + + // use case 1: both kvs in kvset + this.memstore.add(kv1.clone()); + this.memstore.add(kv2.clone()); + verifyScanAcrossSnapshot2(kv1, kv2); + + // use case 2: both kvs in snapshot + this.memstore.snapshot(); + verifyScanAcrossSnapshot2(kv1, kv2); + + // use case 3: first in snapshot second in kvset + this.memstore = new MemStore(); + this.memstore.add(kv1.clone()); + this.memstore.snapshot(); + this.memstore.add(kv2.clone()); + verifyScanAcrossSnapshot2(kv1, kv2); + } + + private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) { + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + KeyValueScanner[] memstorescanners = this.memstore.getScanners(); + assertEquals(1, memstorescanners.length); + final KeyValueScanner scanner = memstorescanners[0]; + scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW)); + assertEquals(kv1, scanner.next()); + assertEquals(kv2, scanner.next()); + assertNull(scanner.next()); + } + + private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) { + scanner.seek(KeyValue.createFirstOnRow(new byte[]{})); + for (KeyValue kv : expected) { + assertTrue(0 == + KeyValue.COMPARATOR.compare(kv, + scanner.next())); + } + assertNull(scanner.peek()); + } + + public void testMemstoreConcurrentControl() { + final byte[] row = Bytes.toBytes(1); + final byte[] f = Bytes.toBytes("family"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] v = Bytes.toBytes("value"); + + ReadWriteConsistencyControl.WriteEntry w = + rwcc.beginMemstoreInsert(); + + KeyValue kv1 = new KeyValue(row, f, q1, v); + kv1.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv1); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + KeyValueScanner[] s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{}); + + rwcc.completeMemstoreInsert(w); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{kv1}); + + w = rwcc.beginMemstoreInsert(); + KeyValue kv2 = new KeyValue(row, f, q2, v); + kv2.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv2); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{kv1}); + + rwcc.completeMemstoreInsert(w); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{kv1, kv2}); + } + + private static class ReadOwnWritesTester extends Thread { + final int id; + static final int NUM_TRIES = 1000; + + final byte[] row; + + final byte[] f = Bytes.toBytes("family"); + final byte[] q1 = Bytes.toBytes("q1"); + + final ReadWriteConsistencyControl rwcc; + final MemStore memstore; + + AtomicReference caughtException; + + + public ReadOwnWritesTester(int id, + MemStore memstore, + ReadWriteConsistencyControl rwcc, + AtomicReference caughtException) + { + this.id = id; + this.rwcc = rwcc; + this.memstore = memstore; + this.caughtException = caughtException; + row = Bytes.toBytes(id); + } + + public void run() { + try { + internalRun(); + } catch (Throwable t) { + caughtException.compareAndSet(null, t); + } + } + + private void internalRun() { + for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { + ReadWriteConsistencyControl.WriteEntry w = + rwcc.beginMemstoreInsert(); + + // Insert the sequence value (i) + byte[] v = Bytes.toBytes(i); + + KeyValue kv = new KeyValue(row, f, q1, i, v); + kv.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv); + rwcc.completeMemstoreInsert(w); + + // Assert that we can read back + + KeyValueScanner s = this.memstore.getScanners()[0]; + s.seek(kv); + + KeyValue ret = s.next(); + assertNotNull("Didnt find own write at all", ret); + assertEquals("Didnt read own writes", + kv.getTimestamp(), ret.getTimestamp()); + } + } + } + + public void no_testReadOwnWritesUnderConcurrency() throws Throwable { + + int NUM_THREADS = 8; + + ReadOwnWritesTester threads[] = new ReadOwnWritesTester[NUM_THREADS]; + AtomicReference caught = new AtomicReference(); + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught); + threads[i].start(); + } + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].join(); + } + + if (caught.get() != null) { + throw caught.get(); + } + } + /** * Test memstore snapshots * @throws IOException @@ -442,9 +623,10 @@ public class TestMemStore extends TestCase { List expected = new ArrayList(); expected.add(put3); expected.add(del2); + expected.add(put2); expected.add(put1); - - assertEquals(3, memstore.kvset.size()); + + assertEquals(4, memstore.kvset.size()); int i = 0; for(KeyValue kv : memstore.kvset) { assertEquals(expected.get(i++), kv); @@ -476,8 +658,11 @@ public class TestMemStore extends TestCase { List expected = new ArrayList(); expected.add(put3); expected.add(del2); + expected.add(put2); + expected.add(put1); + - assertEquals(2, memstore.kvset.size()); + assertEquals(4, memstore.kvset.size()); int i = 0; for (KeyValue kv: memstore.kvset) { assertEquals(expected.get(i++), kv); @@ -510,9 +695,14 @@ public class TestMemStore extends TestCase { List expected = new ArrayList(); expected.add(del); + expected.add(put1); + expected.add(put2); expected.add(put4); + expected.add(put3); + + - assertEquals(2, memstore.kvset.size()); + assertEquals(5, memstore.kvset.size()); int i = 0; for (KeyValue kv: memstore.kvset) { assertEquals(expected.get(i++), kv); @@ -528,7 +718,7 @@ public class TestMemStore extends TestCase { memstore.add(new KeyValue(row, fam, qf, ts, val)); KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } @@ -541,7 +731,7 @@ public class TestMemStore extends TestCase { "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } public void testRetainsDeleteColumn() throws IOException { @@ -553,7 +743,7 @@ public class TestMemStore extends TestCase { KeyValue.Type.DeleteColumn, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } public void testRetainsDeleteFamily() throws IOException { @@ -565,7 +755,7 @@ public class TestMemStore extends TestCase { KeyValue.Type.DeleteFamily, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } @@ -573,13 +763,13 @@ public class TestMemStore extends TestCase { ////////////////////////////////////////////////////////////////////////////// // Helpers ////////////////////////////////////////////////////////////////////////////// - private byte [] makeQualifier(final int i1, final int i2){ + private static byte [] makeQualifier(final int i1, final int i2){ return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2)); } /** - * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT} + * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to. * @return How many rows we added. * @throws IOException @@ -589,7 +779,7 @@ public class TestMemStore extends TestCase { } /** - * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT} + * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to. * @return How many rows we added. * @throws IOException @@ -643,4 +833,57 @@ public class TestMemStore extends TestCase { return new KeyValue(row, Bytes.toBytes("test_col:"), HConstants.LATEST_TIMESTAMP, value); } + private static void addRows(int count, final MemStore mem) { + long nanos = System.nanoTime(); + + for (int i = 0 ; i < count ; i++) { + if (i % 1000 == 0) { + + System.out.println(i + " Took for 1k usec: " + (System.nanoTime() - nanos)/1000); + nanos = System.nanoTime(); + } + long timestamp = System.currentTimeMillis(); + + for (int ii = 0; ii < QUALIFIER_COUNT ; ii++) { + byte [] row = Bytes.toBytes(i); + byte [] qf = makeQualifier(i, ii); + mem.add(new KeyValue(row, FAMILY, qf, timestamp, qf)); + } + } + } + + + static void doScan(MemStore ms, int iteration) { + long nanos = System.nanoTime(); + KeyValueScanner [] ss = ms.getScanners(); + KeyValueScanner s = ss[0]; + s.seek(KeyValue.createFirstOnRow(new byte[]{})); + + System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000); + int cnt=0; + while(s.next() != null) ++cnt; + + System.out.println(iteration + " took usec: " + (System.nanoTime() - nanos)/1000 + " for: " + cnt); + + } + + public static void main(String [] args) { + ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + MemStore ms = new MemStore(); + + long n1 = System.nanoTime(); + addRows(25000, ms); + System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000); + + + System.out.println("foo"); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + + for (int i = 0 ; i < 50 ; i++) + doScan(ms, i); + + } + + } diff --git src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java new file mode 100644 index 0000000..78fe59c --- /dev/null +++ src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java @@ -0,0 +1,109 @@ +package org.apache.hadoop.hbase.regionserver; + +import junit.framework.TestCase; + +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class TestReadWriteConsistencyControl extends TestCase { + static class Writer implements Runnable { + final AtomicBoolean finished; + final ReadWriteConsistencyControl rwcc; + final AtomicBoolean status; + + Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) { + this.finished = finished; + this.rwcc = rwcc; + this.status = status; + } + private Random rnd = new Random(); + public boolean failed = false; + + public void run() { + while (!finished.get()) { + ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert(); +// System.out.println("Begin write: " + e.getWriteNumber()); + // 10 usec - 500usec (including 0) + int sleepTime = rnd.nextInt(500); + // 500 * 1000 = 500,000ns = 500 usec + // 1 * 100 = 100ns = 1usec + try { + if (sleepTime > 0) + Thread.sleep(0, sleepTime * 1000); + } catch (InterruptedException e1) { + } + try { + rwcc.completeMemstoreInsert(e); + } catch (RuntimeException ex) { + // got failure + System.out.println(ex.toString()); + ex.printStackTrace(); + status.set(false); + return; + // Report failure if possible. + } + } + } + } + + public void testParallelism() throws Exception { + final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + + final AtomicBoolean finished = new AtomicBoolean(false); + + // fail flag for the reader thread + final AtomicBoolean readerFailed = new AtomicBoolean(false); + final AtomicLong failedAt = new AtomicLong(); + Runnable reader = new Runnable() { + public void run() { + long prev = rwcc.memstoreReadPoint(); + while (!finished.get()) { + long newPrev = rwcc.memstoreReadPoint(); + if (newPrev < prev) { + // serious problem. + System.out.println("Reader got out of order, prev: " + + prev + " next was: " + newPrev); + readerFailed.set(true); + // might as well give up + failedAt.set(newPrev); + return; + } + } + } + }; + + // writer thread parallelism. + int n = 20; + Thread [] writers = new Thread[n]; + AtomicBoolean [] statuses = new AtomicBoolean[n]; + Thread readThread = new Thread(reader); + + for (int i = 0 ; i < n ; ++i ) { + statuses[i] = new AtomicBoolean(true); + writers[i] = new Thread(new Writer(finished, rwcc, statuses[i])); + writers[i].start(); + } + readThread.start(); + + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException ex) { + } + + finished.set(true); + + readThread.join(); + for (int i = 0; i < n; ++i) { + writers[i].join(); + } + + // check failure. + assertFalse(readerFailed.get()); + for (int i = 0; i < n; ++i) { + assertTrue(statuses[i].get()); + } + + + } +} diff --git src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index f1ec15b..76ab7b5 100644 --- src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -1,5 +1,5 @@ /* - * Copyright 2009 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,24 +20,23 @@ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.NavigableSet; -import java.util.TreeSet; - import junit.framework.TestCase; - import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + public class TestStoreScanner extends TestCase { private final String CF_STR = "cf"; final byte [] CF = Bytes.toBytes(CF_STR); - /** + /* * Test utility for building a NavigableSet for scanners. * @param strCols * @return @@ -128,7 +127,7 @@ public class TestStoreScanner extends TestCase { assertEquals(kvs[0], results.get(0)); } - /** + /* * Test test shows exactly how the matcher's return codes confuses the StoreScanner * and prevent it from doing the right thing. Seeking once, then nexting twice * should return R1, then R2, but in this case it doesnt. @@ -189,7 +188,7 @@ public class TestStoreScanner extends TestCase { assertEquals(0, results.size()); } - /** + /* * Test the case where there is a delete row 'in front of' the next row, the scanner * will move to the next row. */ @@ -408,7 +407,7 @@ public class TestStoreScanner extends TestCase { assertEquals(false, scan.next(results)); } - /** + /* * Test expiration of KeyValues in combination with a configured TTL for * a column family (as should be triggered in a major compaction). */