diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java index 01aba6f..c871d0a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java @@ -36,8 +36,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceStability.Stable public enum IsolationLevel { - READ_COMMITTED(1), - READ_UNCOMMITTED(2); + SNAPSHOT(1), + READ_UNCOMMITTED(2), + READ_COMMITTED(3); IsolationLevel(int value) {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4b4ba9b..63a093e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -19,19 +19,6 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Closeables; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; -import com.google.protobuf.TextFormat; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -195,6 +182,20 @@ import org.apache.hadoop.util.StringUtils; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import com.google.protobuf.TextFormat; + @SuppressWarnings("deprecation") @InterfaceAudience.Private @@ -5580,10 +5581,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected final HRegion region; protected final CellComparator comparator; - private final long readPt; + private long readPt; private final long maxResultSize; private final ScannerContext defaultScannerContext; private final FilterWrapper filter; + private IsolationLevel isolationLevel; + private boolean needUpdateReadPt; @Override public HRegionInfo getRegionInfo() { @@ -5619,7 +5622,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // synchronize on scannerReadPoints so that nobody calculates // getSmallestReadPoint, before scannerReadPoints is updated. - IsolationLevel isolationLevel = scan.getIsolationLevel(); + isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { this.readPt = getReadPoint(isolationLevel); scannerReadPoints.put(this, this.readPt); @@ -5740,8 +5743,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // partial Result means that we should not reset the filters; filters // should only be reset in // between rows - if (!scannerContext.midRowResultFormed()) + if (!scannerContext.midRowResultFormed()) { resetFilters(); + updateMvccReadPoint(); + } if (isFilterDoneInternal()) { moreValues = false; @@ -6101,6 +6106,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.storeHeap.next(MOCKED_LIST); } resetFilters(); + updateMvccReadPoint(); // Calling the hook in CP which allows it to do a fast forward return this.region.getCoprocessorHost() == null @@ -6193,6 +6199,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // callback this.close(); } + + @Override + public void setUpdateMvccReadPoint(boolean updateReadPoint) { + if (isolationLevel == IsolationLevel.READ_COMMITTED) { + this.needUpdateReadPt = updateReadPoint; + } + } + + public void updateMvccReadPoint() { + if (needUpdateReadPt) { + synchronized (scannerReadPoints) { + this.readPt = getReadpoint(isolationLevel); + scannerReadPoints.put(this, this.readPt); + } + storeHeap.updateMvccReadPoint(readPt); + if (joinedHeap != null) { + joinedHeap.updateMvccReadPoint(readPt); + } + needUpdateReadPt = false; + } + } } // Utility methods diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 9ece14b..c164495 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -66,6 +66,8 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner protected KVScannerComparator comparator; + private List scanners; + /** * Constructor. This KeyValueHeap will handle closing of passed in * KeyValueScanners. @@ -85,6 +87,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner */ KeyValueHeap(List scanners, KVScannerComparator comparator) throws IOException { + this.scanners = scanners; this.comparator = comparator; if (!scanners.isEmpty()) { this.heap = new PriorityQueue(scanners.size(), @@ -432,4 +435,11 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner } } } + + @Override + public void updateMvccReadPoint(long readPt) { + for (KeyValueScanner scanner : this.scanners) { + scanner.updateMvccReadPoint(readPt); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index 44b081b..f83033f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.Closeable; import java.io.IOException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; /** @@ -171,4 +171,9 @@ public interface KeyValueScanner extends Shipper, Closeable { * see HFileWriterImpl#getMidpoint, or null if not known. */ public Cell getNextIndexedKey(); + + /** + * update the scanner's MVCC readPt + */ + public void updateMvccReadPoint(long readPt); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index 01a7ff3..9b72f02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -346,4 +346,12 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { throw new IllegalStateException("Traversing forward with backward scan"); } } + + @Override + public void updateMvccReadPoint(long readPt) { + this.readPoint = readPt; + for (SegmentScanner scanner : scanners) { + scanner.updateMvccReadPoint(readPt); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index f45fc69..ad382b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2553,6 +2553,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new NotServingRegionException("Region was re-opened after the scanner" + scannerName + " was created: " + hri.getRegionNameAsString()); } + scanner.setUpdateMvccReadPoint(true); } else { region = getRegion(request.getRegion()); ClientProtos.Scan protoScan = request.getScan(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 5b33db4..67974d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -68,6 +68,11 @@ public interface RegionScanner extends InternalScanner, Shipper { long getMvccReadPoint(); /** + * Whether need to update MVCC readPt with this scanner. + */ + void setUpdateMvccReadPoint(boolean updateReadPoint); + + /** * @return The limit on the number of cells to retrieve on each call to next(). See * {@link org.apache.hadoop.hbase.client.Scan#setBatch(int)} */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index a04c1da..7bf1007 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -373,4 +373,8 @@ public class SegmentScanner implements KeyValueScanner { return (first != null ? first : second); } + @Override + public void updateMvccReadPoint(long readPt) { + this.readPoint = readPt; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 4955ffe..27e8db2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -26,13 +26,13 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.HFileScanner; @@ -548,4 +548,9 @@ public class StoreFileScanner implements KeyValueScanner { public void shipped() throws IOException { this.hfs.shipped(); } + + @Override + public void updateMvccReadPoint(long readPt) { + this.readPt = readPt; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7ebae94..36c4576 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -134,7 +134,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // flush update lock private ReentrantLock flushLock = new ReentrantLock(); - protected final long readPt; + protected long readPt; // used by the injection framework to test race between StoreScanner construction and compaction enum StoreScannerCompactionRace { @@ -973,5 +973,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.heap.shipped(); } } -} + @Override + public void updateMvccReadPoint(long readPt) { + this.readPt = readPt; + if (this.heap != null) { + this.heap.updateMvccReadPoint(readPt); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java index 3f05969..4a740a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java @@ -24,9 +24,9 @@ import java.util.Iterator; import java.util.List; import java.util.SortedSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.NonReversedNonLazyKeyValueScanner; /** @@ -128,4 +128,9 @@ public class CollectionBackedScanner extends NonReversedNonLazyKeyValueScanner { public void close() { // do nothing } + + @Override + public void updateMvccReadPoint(long readPt) { + // do nothing + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java index ba75d6e..27e32a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -1557,6 +1557,11 @@ public class TestBlockEvictionFromClient { public void shipped() throws IOException { this.delegate.shipped(); } + + @Override + public void setUpdateMvccReadPoint(boolean updateReadPoint) { + delegate.setUpdateMvccReadPoint(updateReadPoint); + } } public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index d4d319a..3c3cfdd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -721,5 +721,163 @@ public class TestScannersFromClientSide { assertEquals(expKvList.size(), result.size()); } + @Test + public void testScanSnapshotIsolationLevel() throws Exception { + TableName TABLE = TableName.valueOf("testScanSnapshotIsolationLevel"); + Table ht = TEST_UTIL.createTable(TABLE, FAMILY); + + Put put; + Scan scan; + Result result; + ResultScanner scanner; + + for (int i = 0; i < 4; i++) { + byte[] row = Bytes.toBytes(i); + put = new Put(row); + KeyValue kv = new KeyValue(Bytes.toBytes(i), FAMILY, QUALIFIER, i, + Bytes.toBytes("" + i)); + put.add(kv); + ht.put(put); + } + scan = new Scan(); + scan.setCaching(1); + scan.setIsolationLevel(IsolationLevel.SNAPSHOT); + scanner = ht.getScanner(scan); + + // r0:0 + result = scanner.next(); + assertEquals("0", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + + byte[] row = Bytes.toBytes(3); + put = new Put(row); + KeyValue kv = new KeyValue(row, FAMILY, QUALIFIER, 3, Bytes.toBytes("31")); + put.add(kv); + ht.put(put); + + // r1:1 + result = scanner.next(); + assertEquals("1", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + + // r2:2 + result = scanner.next(); + assertEquals("2", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + + // r3:3 + result = scanner.next(); + assertEquals("3", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + } + + @Test + public void testScanReadCommittedIsolationLevel() throws Exception { + TableName TABLE = TableName.valueOf("testScanReadCommittedIsolationLevel"); + Table ht = TEST_UTIL.createTable(TABLE, FAMILY); + + Put put; + Scan scan; + Result result; + ResultScanner scanner; + boolean toLog = true; + List kvListExp; + + for (int i = 0; i < 4; i++) { + byte[] row = Bytes.toBytes(i); + put = new Put(row); + KeyValue kv = new KeyValue(Bytes.toBytes(i), FAMILY, QUALIFIER, i, + Bytes.toBytes("" + i)); + put.add(kv); + ht.put(put); + } + + scan = new Scan(); + scan.setCaching(1); + scan.setIsolationLevel(IsolationLevel.READ_COMMITTED); + scanner = ht.getScanner(scan); + + // r0:0 + kvListExp = new ArrayList(); + kvListExp.add(new KeyValue(Bytes.toBytes(0), FAMILY, QUALIFIER, 0, Bytes + .toBytes("0"))); + result = scanner.next(); + verifyResult(result, kvListExp, toLog, "Testing first row of scan"); + assertEquals("0", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + + byte[] row = Bytes.toBytes(3); + put = new Put(row); + KeyValue kv = new KeyValue(row, FAMILY, QUALIFIER, 3, Bytes.toBytes("32")); + put.add(kv); + ht.put(put); + + // r1:1 + result = scanner.next(); + assertEquals("1", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + + // r2:2 + result = scanner.next(); + assertEquals("2", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + + // r3:32 + result = scanner.next(); + assertEquals("32", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + } + + @Test + public void testScanReadUncommittedIsolationLevel() throws Exception { + TableName TABLE = TableName + .valueOf("testScanReadUncommittedIsolationLevel"); + Table ht = TEST_UTIL.createTable(TABLE, FAMILY); + + Put put; + Scan scan; + Result result; + ResultScanner scanner; + + for (int i = 0; i < 4; i++) { + byte[] row = Bytes.toBytes(i); + put = new Put(row); + KeyValue kv = new KeyValue(Bytes.toBytes(i), FAMILY, QUALIFIER, i, + Bytes.toBytes("" + i)); + put.add(kv); + ht.put(put); + } + + scan = new Scan(); + scan.setCaching(1); + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + scanner = ht.getScanner(scan); + + // r0:0 + result = scanner.next(); + assertEquals("0", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + + byte[] row = Bytes.toBytes(3); + put = new Put(row); + KeyValue kv = new KeyValue(row, FAMILY, QUALIFIER, 4, Bytes.toBytes("33")); + put.add(kv); + ht.put(put); + + // r1:1 + result = scanner.next(); + assertEquals("1", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + + // r2:2 + result = scanner.next(); + assertEquals("2", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + + // r3:33 + result = scanner.next(); + assertEquals("33", Bytes.toString(result.getColumnLatestCell(FAMILY, + QUALIFIER).getValueArray())); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index b2ef1bd..9e57ad6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -149,6 +149,11 @@ public class TestCoprocessorInterface { public void shipped() throws IOException { this.delegate.shipped(); } + + @Override + public void setUpdateMvccReadPoint(boolean updateReadPoint) { + delegate.setUpdateMvccReadPoint(updateReadPoint); + } } public static class CoprocessorImpl extends BaseRegionObserver {