Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1179409) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl; +import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.regionserver.StoreFile; @@ -1558,7 +1558,7 @@ */ public static List getFromStoreFile(Store store, Get get) throws IOException { - ReadWriteConsistencyControl.resetThreadReadPoint(); + MultiVersionConsistencyControl.resetThreadReadPoint(); Scan scan = new Scan(get); InternalScanner scanner = (InternalScanner) store.getScanner(scan, scan.getFamilyMap().get(store.getFamily().getName())); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (revision 1179409) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (working copy) @@ -1,128 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import junit.framework.TestCase; - -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -public class TestReadWriteConsistencyControl extends TestCase { - static class Writer implements Runnable { - final AtomicBoolean finished; - final ReadWriteConsistencyControl rwcc; - final AtomicBoolean status; - - Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) { - this.finished = finished; - this.rwcc = rwcc; - this.status = status; - } - private Random rnd = new Random(); - public boolean failed = false; - - public void run() { - while (!finished.get()) { - ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert(); -// System.out.println("Begin write: " + e.getWriteNumber()); - // 10 usec - 500usec (including 0) - int sleepTime = rnd.nextInt(500); - // 500 * 1000 = 500,000ns = 500 usec - // 1 * 100 = 100ns = 1usec - try { - if (sleepTime > 0) - Thread.sleep(0, sleepTime * 1000); - } catch (InterruptedException e1) { - } - try { - rwcc.completeMemstoreInsert(e); - } catch (RuntimeException ex) { - // got failure - System.out.println(ex.toString()); - ex.printStackTrace(); - status.set(false); - return; - // Report failure if possible. - } - } - } - } - - public void testParallelism() throws Exception { - final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); - - final AtomicBoolean finished = new AtomicBoolean(false); - - // fail flag for the reader thread - final AtomicBoolean readerFailed = new AtomicBoolean(false); - final AtomicLong failedAt = new AtomicLong(); - Runnable reader = new Runnable() { - public void run() { - long prev = rwcc.memstoreReadPoint(); - while (!finished.get()) { - long newPrev = rwcc.memstoreReadPoint(); - if (newPrev < prev) { - // serious problem. - System.out.println("Reader got out of order, prev: " + - prev + " next was: " + newPrev); - readerFailed.set(true); - // might as well give up - failedAt.set(newPrev); - return; - } - } - } - }; - - // writer thread parallelism. - int n = 20; - Thread [] writers = new Thread[n]; - AtomicBoolean [] statuses = new AtomicBoolean[n]; - Thread readThread = new Thread(reader); - - for (int i = 0 ; i < n ; ++i ) { - statuses[i] = new AtomicBoolean(true); - writers[i] = new Thread(new Writer(finished, rwcc, statuses[i])); - writers[i].start(); - } - readThread.start(); - - try { - Thread.sleep(10 * 1000); - } catch (InterruptedException ex) { - } - - finished.set(true); - - readThread.join(); - for (int i = 0; i < n; ++i) { - writers[i].join(); - } - - // check failure. - assertFalse(readerFailed.get()); - for (int i = 0; i < n; ++i) { - assertTrue(statuses[i].get()); - } - - - } -} Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java (revision 0) @@ -0,0 +1,128 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import junit.framework.TestCase; + +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class TestMultiVersionConsistencyControl extends TestCase { + static class Writer implements Runnable { + final AtomicBoolean finished; + final MultiVersionConsistencyControl mvcc; + final AtomicBoolean status; + + Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) { + this.finished = finished; + this.mvcc = mvcc; + this.status = status; + } + private Random rnd = new Random(); + public boolean failed = false; + + public void run() { + while (!finished.get()) { + MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert(); +// System.out.println("Begin write: " + e.getWriteNumber()); + // 10 usec - 500usec (including 0) + int sleepTime = rnd.nextInt(500); + // 500 * 1000 = 500,000ns = 500 usec + // 1 * 100 = 100ns = 1usec + try { + if (sleepTime > 0) + Thread.sleep(0, sleepTime * 1000); + } catch (InterruptedException e1) { + } + try { + mvcc.completeMemstoreInsert(e); + } catch (RuntimeException ex) { + // got failure + System.out.println(ex.toString()); + ex.printStackTrace(); + status.set(false); + return; + // Report failure if possible. + } + } + } + } + + public void testParallelism() throws Exception { + final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl(); + + final AtomicBoolean finished = new AtomicBoolean(false); + + // fail flag for the reader thread + final AtomicBoolean readerFailed = new AtomicBoolean(false); + final AtomicLong failedAt = new AtomicLong(); + Runnable reader = new Runnable() { + public void run() { + long prev = mvcc.memstoreReadPoint(); + while (!finished.get()) { + long newPrev = mvcc.memstoreReadPoint(); + if (newPrev < prev) { + // serious problem. + System.out.println("Reader got out of order, prev: " + + prev + " next was: " + newPrev); + readerFailed.set(true); + // might as well give up + failedAt.set(newPrev); + return; + } + } + } + }; + + // writer thread parallelism. + int n = 20; + Thread [] writers = new Thread[n]; + AtomicBoolean [] statuses = new AtomicBoolean[n]; + Thread readThread = new Thread(reader); + + for (int i = 0 ; i < n ; ++i ) { + statuses[i] = new AtomicBoolean(true); + writers[i] = new Thread(new Writer(finished, mvcc, statuses[i])); + writers[i].start(); + } + readThread.start(); + + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException ex) { + } + + finished.set(true); + + readThread.join(); + for (int i = 0; i < n; ++i) { + writers[i].join(); + } + + // check failure. + assertFalse(readerFailed.get()); + for (int i = 0; i < n; ++i) { + assertTrue(statuses[i].get()); + } + + + } +} Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (revision 1179409) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (working copy) @@ -57,12 +57,12 @@ private static final byte [] CONTENTS = Bytes.toBytes("contents"); private static final byte [] BASIC = Bytes.toBytes("basic"); private static final String CONTENTSTR = "contentstr"; - private ReadWriteConsistencyControl rwcc; + private MultiVersionConsistencyControl mvcc; @Override public void setUp() throws Exception { super.setUp(); - this.rwcc = new ReadWriteConsistencyControl(); + this.mvcc = new MultiVersionConsistencyControl(); this.memstore = new MemStore(); } @@ -88,7 +88,7 @@ List memstorescanners = this.memstore.getScanners(); Scan scan = new Scan(); List result = new ArrayList(); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, this.memstore.comparator, null, memstorescanners); int count = 0; @@ -108,7 +108,7 @@ scanner.close(); } - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, @@ -200,7 +200,7 @@ private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); List memstorescanners = this.memstore.getScanners(); assertEquals(1, memstorescanners.size()); final KeyValueScanner scanner = memstorescanners.get(0); @@ -235,35 +235,35 @@ final byte[] q2 = Bytes.toBytes("q2"); final byte[] v = Bytes.toBytes("value"); - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv1 = new KeyValue(row, f, q1, v); kv1.setMemstoreTS(w.getWriteNumber()); memstore.add(kv1); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{}); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kv2 = new KeyValue(row, f, q2, v); kv2.setMemstoreTS(w.getWriteNumber()); memstore.add(kv2); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1}); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv1, kv2}); } @@ -283,8 +283,8 @@ final byte[] v2 = Bytes.toBytes("value2"); // INSERT 1: Write both columns val1 - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMemstoreTS(w.getWriteNumber()); @@ -293,15 +293,15 @@ KeyValue kv12 = new KeyValue(row, f, q2, v1); kv12.setMemstoreTS(w.getWriteNumber()); memstore.add(kv12); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kv21 = new KeyValue(row, f, q1, v2); kv21.setMemstoreTS(w.getWriteNumber()); memstore.add(kv21); @@ -311,17 +311,17 @@ memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE INSERT 2 - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // See HBASE-1485 for discussion about what we should do with // the duplicate-TS inserts - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); } @@ -338,8 +338,8 @@ final byte[] q2 = Bytes.toBytes("q2"); final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMemstoreTS(w.getWriteNumber()); @@ -348,30 +348,30 @@ KeyValue kv12 = new KeyValue(row, f, q2, v1); kv12.setMemstoreTS(w.getWriteNumber()); memstore.add(kv12); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // BEFORE STARTING INSERT 2, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = rwcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsert(); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); kvDel.setMemstoreTS(w.getWriteNumber()); memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kv12}); // COMPLETE DELETE - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // NOW WE SHOULD SEE DELETE - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); s = this.memstore.getScanners().get(0); assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); } @@ -385,7 +385,7 @@ final byte[] f = Bytes.toBytes("family"); final byte[] q1 = Bytes.toBytes("q1"); - final ReadWriteConsistencyControl rwcc; + final MultiVersionConsistencyControl mvcc; final MemStore memstore; AtomicReference caughtException; @@ -393,10 +393,10 @@ public ReadOwnWritesTester(int id, MemStore memstore, - ReadWriteConsistencyControl rwcc, + MultiVersionConsistencyControl mvcc, AtomicReference caughtException) { - this.rwcc = rwcc; + this.mvcc = mvcc; this.memstore = memstore; this.caughtException = caughtException; row = Bytes.toBytes(id); @@ -412,8 +412,8 @@ private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { - ReadWriteConsistencyControl.WriteEntry w = - rwcc.beginMemstoreInsert(); + MultiVersionConsistencyControl.WriteEntry w = + mvcc.beginMemstoreInsert(); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); @@ -421,10 +421,10 @@ KeyValue kv = new KeyValue(row, f, q1, i, v); kv.setMemstoreTS(w.getWriteNumber()); memstore.add(kv); - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); // Assert that we can read back - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); KeyValueScanner s = this.memstore.getScanners().get(0); s.seek(kv); @@ -445,7 +445,7 @@ AtomicReference caught = new AtomicReference(); for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught); + threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught); threads[i].start(); } @@ -946,7 +946,7 @@ } public static void main(String [] args) throws IOException { - ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl(); MemStore ms = new MemStore(); long n1 = System.nanoTime(); @@ -956,7 +956,7 @@ System.out.println("foo"); - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); for (int i = 0 ; i < 50 ; i++) doScan(ms, i); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1179409) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -1448,12 +1448,12 @@ scan.addFamily(fam2); scan.addFamily(fam4); is = (RegionScannerImpl) region.getScanner(scan); - ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); scan = new Scan(); is = (RegionScannerImpl) region.getScanner(scan); - ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC()); + MultiVersionConsistencyControl.resetThreadReadPoint(region.getMVCC()); assertEquals(families.length -1, ((RegionScannerImpl)is).storeHeap.getHeap().size()); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (revision 1179409) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (working copy) @@ -1,169 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.LinkedList; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; - -/** - * Manages the read/write consistency within memstore. This provides - * an interface for readers to determine what entries to ignore, and - * a mechanism for writers to obtain new write numbers, then "commit" - * the new writes for readers to read (thus forming atomic transactions). - */ -public class ReadWriteConsistencyControl { - private volatile long memstoreRead = 0; - private volatile long memstoreWrite = 0; - - private final Object readWaiters = new Object(); - - // This is the pending queue of writes. - private final LinkedList writeQueue = - new LinkedList(); - - private static final ThreadLocal perThreadReadPoint = - new ThreadLocal(); - - /** - * Get this thread's read point. Used primarily by the memstore scanner to - * know which values to skip (ie: have not been completed/committed to - * memstore). - */ - public static long getThreadReadPoint() { - return perThreadReadPoint.get(); - } - - /** - * Set the thread read point to the given value. The thread RWCC - * is used by the Memstore scanner so it knows which values to skip. - * Give it a value of 0 if you want everything. - */ - public static void setThreadReadPoint(long readPoint) { - perThreadReadPoint.set(readPoint); - } - - /** - * Set the thread RWCC read point to whatever the current read point is in - * this particular instance of RWCC. Returns the new thread read point value. - */ - public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) { - perThreadReadPoint.set(rwcc.memstoreReadPoint()); - return getThreadReadPoint(); - } - - /** - * Set the thread RWCC read point to 0 (include everything). - */ - public static void resetThreadReadPoint() { - perThreadReadPoint.set(0L); - } - - public WriteEntry beginMemstoreInsert() { - synchronized (writeQueue) { - long nextWriteNumber = ++memstoreWrite; - WriteEntry e = new WriteEntry(nextWriteNumber); - writeQueue.add(e); - return e; - } - } - - public void completeMemstoreInsert(WriteEntry e) { - synchronized (writeQueue) { - e.markCompleted(); - - long nextReadValue = -1; - boolean ranOnce=false; - while (!writeQueue.isEmpty()) { - ranOnce=true; - WriteEntry queueFirst = writeQueue.getFirst(); - - if (nextReadValue > 0) { - if (nextReadValue+1 != queueFirst.getWriteNumber()) { - throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " - + nextReadValue + " next: " + queueFirst.getWriteNumber()); - } - } - - if (queueFirst.isCompleted()) { - nextReadValue = queueFirst.getWriteNumber(); - writeQueue.removeFirst(); - } else { - break; - } - } - - if (!ranOnce) { - throw new RuntimeException("never was a first"); - } - - if (nextReadValue > 0) { - synchronized (readWaiters) { - memstoreRead = nextReadValue; - readWaiters.notifyAll(); - } - - } - } - - boolean interrupted = false; - synchronized (readWaiters) { - while (memstoreRead < e.getWriteNumber()) { - try { - readWaiters.wait(0); - } catch (InterruptedException ie) { - // We were interrupted... finish the loop -- i.e. cleanup --and then - // on our way out, reset the interrupt flag. - interrupted = true; - } - } - } - if (interrupted) Thread.currentThread().interrupt(); - } - - public long memstoreReadPoint() { - return memstoreRead; - } - - - public static class WriteEntry { - private long writeNumber; - private boolean completed = false; - WriteEntry(long writeNumber) { - this.writeNumber = writeNumber; - } - void markCompleted() { - this.completed = true; - } - boolean isCompleted() { - return this.completed; - } - long getWriteNumber() { - return this.writeNumber; - } - } - - public static final long FIXED_SIZE = ClassSize.align( - ClassSize.OBJECT + - 2 * Bytes.SIZEOF_LONG + - 2 * ClassSize.REFERENCE); - -} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1179409) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -259,8 +259,8 @@ private boolean splitRequest; private byte[] explicitSplitPoint = null; - private final ReadWriteConsistencyControl rwcc = - new ReadWriteConsistencyControl(); + private final MultiVersionConsistencyControl mvcc = + new MultiVersionConsistencyControl(); // Coprocessor host private RegionCoprocessorHost coprocessorHost; @@ -614,8 +614,8 @@ } } - public ReadWriteConsistencyControl getRWCC() { - return rwcc; + public MultiVersionConsistencyControl getMVCC() { + return mvcc; } /** @@ -2076,11 +2076,11 @@ * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap) { - ReadWriteConsistencyControl.WriteEntry w = null; + MultiVersionConsistencyControl.WriteEntry w = null; long size = 0; try { - w = rwcc.beginMemstoreInsert(); - + w = mvcc.beginMemstoreInsert(); + for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); List edits = e.getValue(); @@ -2092,7 +2092,7 @@ } } } finally { - rwcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsert(w); } return size; } @@ -2653,7 +2653,7 @@ // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); List scanners = new ArrayList(); if (additionalScanners != null) { @@ -2694,7 +2694,7 @@ try { // This could be a new thread from the last time we called next(). - ReadWriteConsistencyControl.setThreadReadPoint(this.readPt); + MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); results.clear(); @@ -3736,7 +3736,7 @@ ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock ClassSize.ARRAYLIST + // recentFlushes - ReadWriteConsistencyControl.FIXED_SIZE // rwcc + MultiVersionConsistencyControl.FIXED_SIZE // mvcc ; @Override @@ -3745,7 +3745,7 @@ for(Store store : this.stores.values()) { heapSize += store.heapSize(); } - // this does not take into account row locks, recent flushes, rwcc entries + // this does not take into account row locks, recent flushes, mvcc entries return heapSize; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java (revision 0) @@ -0,0 +1,192 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.LinkedList; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +/** + * Manages the read/write consistency within memstore. This provides + * an interface for readers to determine what entries to ignore, and + * a mechanism for writers to obtain new write numbers, then "commit" + * the new writes for readers to read (thus forming atomic transactions). + */ +public class MultiVersionConsistencyControl { + private volatile long memstoreRead = 0; + private volatile long memstoreWrite = 0; + + private final Object readWaiters = new Object(); + + // This is the pending queue of writes. + private final LinkedList writeQueue = + new LinkedList(); + + private static final ThreadLocal perThreadReadPoint = + new ThreadLocal(); + + public MultiVersionConsistencyControl() { + this(0); + } + + public MultiVersionConsistencyControl(long start) { + this.memstoreRead = this.memstoreWrite = start; + } + /** + * Get this thread's read point. Used primarily by the memstore scanner to + * know which values to skip (ie: have not been completed/committed to + * memstore). + */ + public static long getThreadReadPoint() { + return perThreadReadPoint.get(); + } + + public void initialize(long startPoint) { + synchronized (writeQueue) { + if (this.memstoreWrite != this.memstoreRead) + throw new RuntimeException("Already used this mvcc. Too late to initialize"); + + if (this.memstoreWrite > startPoint) + throw new RuntimeException("Cannot decrease MVCC timestamp"); + + this.memstoreRead = this.memstoreWrite = startPoint; + } + } + + /** + * Set the thread read point to the given value. The thread RWCC + * is used by the Memstore scanner so it knows which values to skip. + * Give it a value of 0 if you want everything. + */ + public static void setThreadReadPoint(long readPoint) { + perThreadReadPoint.set(readPoint); + } + + /** + * Set the thread RWCC read point to whatever the current read point is in + * this particular instance of RWCC. Returns the new thread read point value. + */ + public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) { + perThreadReadPoint.set(mvcc.memstoreReadPoint()); + return getThreadReadPoint(); + } + + /** + * Set the thread RWCC read point to 0 (include everything). + */ + public static void resetThreadReadPoint() { + perThreadReadPoint.set(0L); + } + + public WriteEntry beginMemstoreInsert() { + synchronized (writeQueue) { + long nextWriteNumber = ++memstoreWrite; + WriteEntry e = new WriteEntry(nextWriteNumber); + writeQueue.add(e); + return e; + } + } + + public void completeMemstoreInsert(WriteEntry e) { + synchronized (writeQueue) { + e.markCompleted(); + + long nextReadValue = -1; + boolean ranOnce=false; + while (!writeQueue.isEmpty()) { + ranOnce=true; + WriteEntry queueFirst = writeQueue.getFirst(); + + if (nextReadValue > 0) { + if (nextReadValue+1 != queueFirst.getWriteNumber()) { + throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " + + nextReadValue + " next: " + queueFirst.getWriteNumber()); + } + } + + if (queueFirst.isCompleted()) { + nextReadValue = queueFirst.getWriteNumber(); + writeQueue.removeFirst(); + } else { + break; + } + } + + if (!ranOnce) { + throw new RuntimeException("never was a first"); + } + + if (nextReadValue > 0) { + synchronized (readWaiters) { + memstoreRead = nextReadValue; + readWaiters.notifyAll(); + } + + } + } + + boolean interrupted = false; + synchronized (readWaiters) { + while (memstoreRead < e.getWriteNumber()) { + try { + readWaiters.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + } + } + } + if (interrupted) Thread.currentThread().interrupt(); + + } + + public long memstoreReadPoint() { + return memstoreRead; + } + + + public static class WriteEntry { + private long writeNumber; + private boolean completed = false; + WriteEntry(long writeNumber) { + this.writeNumber = writeNumber; + } + void markCompleted() { + this.completed = true; + } + boolean isCompleted() { + return this.completed; + } + long getWriteNumber() { + return this.writeNumber; + } + } + + public static final long FIXED_SIZE = ClassSize.align( + ClassSize.OBJECT + + 2 * Bytes.SIZEOF_LONG + + 2 * ClassSize.REFERENCE); + +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1179409) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -674,7 +674,7 @@ } protected KeyValue getNext(Iterator it) { - long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); + long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); while (it.hasNext()) { KeyValue v = it.next();