Index: src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -42,18 +42,18 @@ */ public class ClientScanner extends AbstractClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); - private Scan scan; + protected Scan scan; private boolean closed = false; // Current region scanner is against. Gets cleared if current region goes // wonky: e.g. if it splits on us. - private HRegionInfo currentRegion = null; - private ScannerCallable callable = null; + protected HRegionInfo currentRegion = null; + protected ScannerCallable callable = null; private final LinkedList cache = new LinkedList(); private final int caching; private long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. private Result lastResult = null; - private ScanMetrics scanMetrics = null; + protected ScanMetrics scanMetrics = null; private final long maxScannerResultSize; private final HConnection connection; private final byte[] tableName; @@ -137,7 +137,7 @@ } // returns true if the passed region endKey - private boolean checkScanStopRow(final byte [] endKey) { + protected boolean checkScanStopRow(final byte [] endKey) { if (this.scan.getStopRow().length > 0) { // there is a stop row, check to see if we are past it. byte [] stopRow = scan.getStopRow(); @@ -161,7 +161,7 @@ * @param nbRows * @param done Server-side says we're done scanning. */ - private boolean nextScanner(int nbRows, final boolean done) + protected boolean nextScanner(int nbRows, final boolean done) throws IOException { // Close the previous scanner if it's open if (this.callable != null) { Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -695,6 +695,10 @@ if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } + if (scan.isReversed()) { + return new ReversedClientScanner(getConfiguration(), scan, + getTableName(), this.connection); + } return new ClientScanner(getConfiguration(), scan, getTableName(), this.connection); } Index: src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (working copy) @@ -0,0 +1,168 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A reversed client scanner which support backward scanning + */ +public class ReversedClientScanner extends ClientScanner { + private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); + // A byte array in which all elements are the max byte, and it is used to + // construct closest front row + static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); + + /** + * Create a new ReversibleClientScanner for the specified table Note that the + * passed {@link Scan}'s start row maybe changed. + * @param conf The {@link Configuration} to use. + * @param scan {@link Scan} to use in this scanner + * @param tableName The table that we wish to scan + * @param connection Connection identifying the cluster + * @throws IOException + */ + public ReversedClientScanner(Configuration conf, Scan scan, + byte[] tableName, HConnection connection) throws IOException { + super(conf, scan, tableName, connection); + } + + @Override + protected boolean nextScanner(int nbRows, final boolean done) + throws IOException { + // Close the previous scanner if it's open + if (this.callable != null) { + this.callable.setClose(); + this.callable.withRetries(); + this.callable = null; + } + + // Where to start the next scanner + byte[] localStartKey; + boolean locateTheClosestFrontRow = true; + // if we're at start of table, close and return false to stop iterating + if (this.currentRegion != null) { + byte[] startKey = this.currentRegion.getStartKey(); + if (startKey == null + || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) + || checkScanStopRow(startKey) || done) { + close(); + if (LOG.isDebugEnabled()) { + LOG.debug("Finished " + this.currentRegion); + } + return false; + } + localStartKey = startKey; + if (LOG.isDebugEnabled()) { + LOG.debug("Finished " + this.currentRegion); + } + } else { + localStartKey = this.scan.getStartRow(); + if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) { + locateTheClosestFrontRow = false; + } + } + + if (LOG.isDebugEnabled() && this.currentRegion != null) { + // Only worth logging if NOT first region in scan. + LOG.debug("Advancing internal scanner to startKey at '" + + Bytes.toStringBinary(localStartKey) + "'"); + } + try { + // In reversed scan, we want to locate the previous region through current + // region's start key. In order to get that previous region, first we + // create a closest row before the start key of current region, then + // locate all the regions from the created closest row to start key of + // current region, thus the last one of located regions should be the + // previous region of current region. The related logic of locating + // regions is implemented in ReversedScannerCallable + byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey) + : null; + callable = getScannerCallable(localStartKey, nbRows, locateStartRow); + // Open a scanner on the region server starting at the + // beginning of the region + callable.withRetries(); + this.currentRegion = callable.getHRegionInfo(); + if (this.scanMetrics != null) { + this.scanMetrics.countOfRegions.inc(); + } + } catch (IOException e) { + close(); + throw e; + } + return true; + } + + protected ScannerCallable getScannerCallable(byte[] localStartKey, + int nbRows, byte[] locateStartRow) { + scan.setStartRow(localStartKey); + ScannerCallable s = new ReversedScannerCallable(getConnection(), + getTableName(), scan, this.scanMetrics, locateStartRow); + s.setCaching(nbRows); + return s; + } + + @Override + // returns true if stopRow >= passed region startKey + protected boolean checkScanStopRow(final byte[] startKey) { + if (this.scan.getStopRow().length > 0) { + // there is a stop row, check to see if we are past it. + byte[] stopRow = scan.getStopRow(); + int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, startKey, 0, + startKey.length); + if (cmp >= 0) { + // stopRow >= startKey (stopRow is equals to or larger than endKey) + // This is a stop. + return true; + } + } + return false; // unlikely. + } + + /** + * Create the closest row before the specified row + * @param row + * @return a new byte array which is the closest front row of the specified + * one + */ + private byte[] createClosestRowBefore(byte[] row) { + if (row == null) { + throw new IllegalArgumentException("The passed row is empty"); + } + if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) { + return MAX_BYTE_ARRAY; + } + if (row[row.length - 1] == 0) { + return Arrays.copyOf(row, row.length - 1); + } else { + byte[] closestFrontRow = Arrays.copyOf(row, row.length); + closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1); + closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY); + return closestFrontRow; + } + } + +} Index: src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java (working copy) @@ -0,0 +1,138 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A reversed ScannerCallable which supports backward scanning. + */ +public class ReversedScannerCallable extends ScannerCallable { + /** + * The start row for locating regions. In reversed scanner, may locate the + * regions for a range of keys when doing + * {@link ReversedClientScanner#nextScanner(int, boolean)} + */ + protected final byte[] locateStartRow; + + /** + * + * @param connection + * @param tableName + * @param scan + * @param scanMetrics + * @param locateStartRow The start row for locating regions + */ + public ReversedScannerCallable(HConnection connection, byte[] tableName, + Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) { + super(connection, tableName, scan, scanMetrics); + this.locateStartRow = locateStartRow; + } + + /** + * @param reload force reload of server location + * @throws IOException + */ + @Override + public void connect(boolean reload) throws IOException { + if (!instantiated || reload) { + if (locateStartRow == null) { + // Just locate the region with the row + this.location = connection.getRegionLocation(tableName, row, reload); + if (this.location == null) { + throw new IOException("Failed to find location, tableName=" + + tableName + ", row=" + Bytes.toString(row) + ", reload=" + + reload); + } + } else { + // Need to locate the regions with the range, and the target location is + // the last one which is the previous region of last region scanner + List locatedRegions = locateRegionsInRange( + locateStartRow, row, reload); + if (locatedRegions.isEmpty()) { + throw new DoNotRetryIOException( + "Does .META. exist hole? Couldn't get regions for the range from " + + Bytes.toStringBinary(locateStartRow) + " to " + + Bytes.toStringBinary(row)); + } + this.location = locatedRegions.get(locatedRegions.size() - 1); + } + this.server = connection.getHRegionConnection(location.getHostname(), + location.getPort()); + checkIfRegionServerIsRemote(); + instantiated = true; + } + + // check how often we retry. + // HConnectionManager will call instantiateServer with reload==true + // if and only if for retries. + if (reload && this.scanMetrics != null) { + this.scanMetrics.countOfRPCRetries.inc(); + if (isRegionServerRemote) { + this.scanMetrics.countOfRemoteRPCRetries.inc(); + } + } + } + + /** + * Get the corresponding regions for an arbitrary range of keys. + * @param tableName + * @param startKey Starting row in range, inclusive + * @param endKey Ending row in range, exclusive + * @param reload force reload of server location + * @return A list of HRegionLocation corresponding to the regions that contain + * the specified range + * @throws IOException + */ + private List locateRegionsInRange(byte[] startKey, + byte[] endKey, boolean reload) throws IOException { + final boolean endKeyIsEndOfTable = Bytes.equals(endKey, + HConstants.EMPTY_END_ROW); + if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { + throw new IllegalArgumentException("Invalid range: " + + Bytes.toStringBinary(startKey) + " > " + + Bytes.toStringBinary(endKey)); + } + List regionList = new ArrayList(); + byte[] currentKey = startKey; + do { + HRegionLocation regionLocation = connection.getRegionLocation(tableName, + currentKey, reload); + if (regionLocation.getRegionInfo().containsRow(currentKey)) { + regionList.add(regionLocation); + } else { + throw new DoNotRetryIOException("Does .META. exist hole? Locating row " + + Bytes.toStringBinary(currentKey) + " returns incorrect region " + + regionLocation.getRegionInfo()); + } + currentKey = regionLocation.getRegionInfo().getEndKey(); + } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) + && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0)); + return regionList; + } + +} Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -83,6 +83,7 @@ private static final String RAW_ATTR = "_raw_"; private static final String ONDEMAND_ATTR = "_ondemand_"; private static final String ISOLATION_LEVEL = "_isolationlevel_"; + private static final String REVERSED_ATTR = "_reversed_"; private static final byte SCAN_VERSION = (byte)2; private byte [] startRow = HConstants.EMPTY_START_ROW; @@ -99,6 +100,7 @@ // scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)) static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name"; + private transient Boolean reversed; /* * -1 means no caching */ @@ -692,4 +694,28 @@ return attr == null ? IsolationLevel.READ_COMMITTED : IsolationLevel.fromBytes(attr); } + + /** + * Set whether this scan is a reversed one + *

+ * This is false by default which means forward(normal) scan. + * + * @param reversed if true, scan will be backward order + */ + public void setReversed(boolean reversed) { + setAttribute(REVERSED_ATTR, Bytes.toBytes(reversed)); + this.reversed = reversed; + } + + /** + * Get whether this scan is a reversed one. + * @return true if backward scan, false if forward(default) scan + */ + public boolean isReversed() { + if (this.reversed == null) { + byte[] attr = getAttribute(REVERSED_ATTR); + this.reversed = attr == null ? false : Bytes.toBoolean(attr); + } + return this.reversed; + } } Index: src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -45,11 +45,11 @@ public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; private static final Log LOG = LogFactory.getLog(ScannerCallable.class); private long scannerId = -1L; - private boolean instantiated = false; + protected boolean instantiated = false; private boolean closed = false; private Scan scan; private int caching = 1; - private ScanMetrics scanMetrics; + protected ScanMetrics scanMetrics; private boolean logScannerActivity = false; private int logCutOffLatency = 1000; private static String myAddress; @@ -62,7 +62,7 @@ } // indicate if it is a remote server call - private boolean isRegionServerRemote = true; + protected boolean isRegionServerRemote = true; /** * @param connection which connection @@ -108,7 +108,7 @@ * compare the local machine hostname with region server's hostname * to decide if hbase client connects to a remote region server */ - private void checkIfRegionServerIsRemote() { + protected void checkIfRegionServerIsRemote() { if (this.location.getHostname().equalsIgnoreCase(myAddress)) { isRegionServerRemote = false; } else { Index: src/main/java/org/apache/hadoop/hbase/filter/Filter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/Filter.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/filter/Filter.java (working copy) @@ -169,4 +169,15 @@ * not sure which key to seek to next. */ public KeyValue getNextKeyHint(final KeyValue currentKV); + + /** + * alter the reversed scan flag + * @param reversed flag + */ + public void setReversed(boolean reversed); + + /** + * @return true if it is applied in reversed scan + */ + public boolean isReversed(); } Index: src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (working copy) @@ -33,6 +33,7 @@ * that is one that never filters anything. */ public abstract class FilterBase implements Filter { + protected boolean reversed; /** * Filters that are purely stateless and do nothing in their reset() methods can inherit @@ -144,6 +145,18 @@ } /** + * alter the reversed scan flag + * @param reversed flag + */ + public void setReversed(boolean reversed) { + this.reversed = reversed; + } + + public boolean isReversed() { + return this.reversed; + } + + /** * Check that given column family is essential for filter to check row. * This accommodates Filter implementation which didn't have this capability * Index: src/main/java/org/apache/hadoop/hbase/filter/FilterList.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (working copy) @@ -51,6 +51,7 @@ *

TODO: Fix creation of Configuration on serialization and deserialization. */ public class FilterList implements Filter { + protected boolean reversed; /** set operator */ public static enum Operator { /** !AND */ @@ -159,6 +160,11 @@ * @param filter another filter */ public void addFilter(Filter filter) { + if (this.isReversed() != filter.isReversed()) { + throw new IllegalArgumentException( + "Filters in the list must have the same reversed flag, this.reversed=" + + this.isReversed()); + } this.filters.add(filter); } @@ -374,6 +380,18 @@ } @Override + public void setReversed(boolean reversed) { + for (Filter filter : filters) { + filter.setReversed(reversed); + } + this.reversed = reversed; + } + + public boolean isReversed() { + return this.reversed; + } + + @Override public String toString() { return toString(MAX_LOG_FILTERS); } Index: src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/filter/PrefixFilter.java (working copy) @@ -59,7 +59,7 @@ // if we are passed the prefix, set flag int cmp = Bytes.compareTo(buffer, offset, this.prefix.length, this.prefix, 0, this.prefix.length); - if(cmp > 0) { + if ((!isReversed() && cmp > 0) || (isReversed() && cmp < 0)) { passedPrefix = true; } filterRow = (cmp != 0); Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (working copy) @@ -551,7 +551,7 @@ ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock); if (reader.getComparator().compare(firstKey.array(), - firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0) + firstKey.arrayOffset(), firstKey.limit(), key, offset, length) >= 0) { long previousBlockOffset = seekToBlock.getPrevBlockOffset(); // The key we are interested in Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1251,6 +1251,13 @@ return size; } + /** + * @return KeyValue Comparator + */ + public KeyValue.KVComparator getComparator() { + return this.comparator; + } + /* * Do preparation for pending compaction. * @throws IOException @@ -1824,6 +1831,12 @@ protected RegionScanner instantiateRegionScanner(Scan scan, List additionalScanners) throws IOException { + if (scan.isReversed()) { + if (scan.getFilter() != null) { + scan.getFilter().setReversed(true); + } + return new ReversedRegionScannerImpl(scan, additionalScanners, this); + } return new RegionScannerImpl(scan, additionalScanners, this); } @@ -3789,16 +3802,16 @@ /** * If the joined heap data gathering is interrupted due to scan limits, this will * contain the row for which we are populating the values.*/ - private KeyValue joinedContinuationRow = null; + protected KeyValue joinedContinuationRow = null; // KeyValue indicating that limit is reached when scanning private final KeyValue KV_LIMIT = new KeyValue(); - private final byte [] stopRow; + protected final byte[] stopRow; private Filter filter; private int batch; - private int isScan; + protected int isScan; private boolean filterClosed = false; private long readPt; - private HRegion region; + protected HRegion region; public HRegionInfo getRegionInfo() { return regionInfo; @@ -3852,16 +3865,22 @@ joinedScanners.add(scanner); } } - this.storeHeap = new KeyValueHeap(scanners, comparator); - if (!joinedScanners.isEmpty()) { - this.joinedHeap = new KeyValueHeap(joinedScanners, comparator); - } + initializeKVHeap(scanners, joinedScanners, region); } RegionScannerImpl(Scan scan, HRegion region) throws IOException { this(scan, null, region); } + protected void initializeKVHeap(List scanners, + List joinedScanners, HRegion region) + throws IOException { + this.storeHeap = new KeyValueHeap(scanners, region.comparator); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator); + } + } + @Override public long getMvccReadPoint() { return this.readPt; @@ -4141,7 +4160,7 @@ return true; } - private boolean isStopRow(byte [] currentRow, int offset, short length) { + protected boolean isStopRow(byte[] currentRow, int offset, short length) { return currentRow == null || (stopRow != null && comparator.compareRows(stopRow, 0, stopRow.length, Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2536,7 +2536,15 @@ requestCount.incrementAndGet(); try { HRegion r = getRegion(regionName); - r.checkRow(scan.getStartRow(), "Scan"); + boolean checkRow = true; + if (scan.isReversed() && Bytes.equals(r.getEndKey(), scan.getStartRow())) { + // In reversed scan, start row of scan is allowed to be equal with + // region's end key + checkRow = false; + } + if (checkRow) { + r.checkRow(scan.getStartRow(), "Scan"); + } scan.setLoadColumnFamiliesOnDemand(r.isLoadingCfsOnDemandDefault() || scan.doLoadColumnFamiliesOnDemand()); r.prepareScanner(scan); Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -40,9 +40,9 @@ * also implements InternalScanner. WARNING: As is, if you try to use this * as an InternalScanner at the Store level, you will get runtime exceptions. */ -public class KeyValueHeap extends NonLazyKeyValueScanner +public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner { - private PriorityQueue heap = null; + protected PriorityQueue heap = null; /** * The current sub-scanner, i.e. the one that contains the next key/value @@ -54,9 +54,9 @@ * Bloom filter optimization, which is OK to propagate to StoreScanner. In * order to ensure that, always use {@link #pollRealKV()} to update current. */ - private KeyValueScanner current = null; + protected KeyValueScanner current = null; - private KVScannerComparator comparator; + protected KVScannerComparator comparator; /** * Constructor. This KeyValueHeap will handle closing of passed in @@ -66,7 +66,18 @@ */ public KeyValueHeap(List scanners, KVComparator comparator) throws IOException { - this.comparator = new KVScannerComparator(comparator); + this(scanners, new KVScannerComparator(comparator)); + } + + /** + * Constructor. + * @param scanners + * @param comparator + * @throws IOException + */ + KeyValueHeap(List scanners, + KVScannerComparator comparator) throws IOException { + this.comparator = comparator; if (!scanners.isEmpty()) { this.heap = new PriorityQueue(scanners.size(), this.comparator); @@ -177,8 +188,8 @@ return next(result, -1, metric); } - private static class KVScannerComparator implements Comparator { - private KVComparator kvComparator; + protected static class KVScannerComparator implements Comparator { + protected KVComparator kvComparator; /** * Constructor * @param kvComparator @@ -344,7 +355,7 @@ * this scanner heap if (1) it has done a real seek and (2) its KV is the top * among all top KVs (some of which are fake) in the scanner heap. */ - private KeyValueScanner pollRealKV() throws IOException { + protected KeyValueScanner pollRealKV() throws IOException { KeyValueScanner kvScanner = heap.poll(); if (kvScanner == null) { return null; Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (working copy) @@ -121,4 +121,37 @@ * assumed. */ public boolean isFileScanner(); + + // Support for "Reversed Scanner" + /** + * Seek the scanner at or before the row of specified KeyValue, it firstly + * tries to seek the scanner at or after the specified KeyValue, return if + * peek KeyValue of scanner has the same row with specified KeyValue, + * otherwise seek the scanner at the first KeyValue of the row which is the + * previous row of specified KeyValue + * + * @param key seek KeyValue + * @return true if the scanner is at the valid KeyValue, false if such + * KeyValue does not exist + * + */ + public boolean backwardSeek(KeyValue key) throws IOException; + + /** + * Seek the scanner at the first KeyValue of the row which is the previous row + * of specified key + * @param key seek value + * @return true if the scanner at the first valid KeyValue of previous row, + * false if not existing such KeyValue + */ + public boolean seekToPreviousRow(KeyValue key) throws IOException; + + /** + * Seek the scanner at the first KeyValue of last row + * + * @return true if scanner has values left, false if the underlying data is + * empty + * @throws IOException + */ + public boolean seekToLastRow() throws IOException; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -718,6 +718,10 @@ private KeyValue theNext; private long readPoint; + // A flag represents whether could stop skipping KeyValues for MVCC + // if have encountered the next row. Only used for reversed scan + private boolean stopSkippingKVsIfNextRow = false; + /* Some notes... @@ -748,6 +752,7 @@ } private KeyValue getNext(Iterator it) { + KeyValue startKV = theNext; KeyValue v = null; try { while (it.hasNext()) { @@ -755,6 +760,10 @@ if (v.getMemstoreTS() <= readPoint) { return v; } + if (stopSkippingKVsIfNextRow && startKV != null + && comparator.compareRows(v, startKV) > 0) { + return null; + } } return null; @@ -924,6 +933,70 @@ long oldestUnexpiredTS) { return shouldSeek(scan, oldestUnexpiredTS); } + + /** + * Seek scanner to the given key first. If it returns false(means + * peek()==null) or scanner's peek row is bigger than row of given key, seek + * the scanner to the previous row of given key + */ + @Override + public synchronized boolean backwardSeek(KeyValue key) { + seek(key); + if (peek() == null || comparator.compareRows(peek(), key) > 0) { + return seekToPreviousRow(key); + } + return true; + } + + /** + * Separately get the KeyValue before the specified key from kvset and + * snapshotset, and use the row of higher one as the previous row of + * specified key, then seek to the first KeyValue of previous row + */ + @Override + public synchronized boolean seekToPreviousRow(KeyValue key) { + KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow()); + SortedSet kvHead = kvsetAtCreation.headSet(firstKeyOnRow); + KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last(); + SortedSet snapshotHead = snapshotAtCreation + .headSet(firstKeyOnRow); + KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead + .last(); + KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow); + if (lastKVBeforeRow == null) { + theNext = null; + return false; + } + KeyValue firstKeyOnPreviousRow = KeyValue + .createFirstOnRow(lastKVBeforeRow.getRow()); + this.stopSkippingKVsIfNextRow = true; + seek(firstKeyOnPreviousRow); + this.stopSkippingKVsIfNextRow = false; + if (peek() == null + || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) { + return seekToPreviousRow(lastKVBeforeRow); + } + return true; + } + + @Override + public synchronized boolean seekToLastRow() { + KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation + .last(); + KeyValue second = snapshotAtCreation.isEmpty() ? null + : snapshotAtCreation.last(); + KeyValue higherKv = getHighest(first, second); + if (higherKv == null) { + return false; + } + KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow()); + if (seek(firstKvOnLastRow)) { + return true; + } else { + return seekToPreviousRow(higherKv); + } + + } } public final static long FIXED_OVERHEAD = ClassSize.align( Index: src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/NonReversedNonLazyKeyValueScanner.java (working copy) @@ -0,0 +1,54 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; + +/** + * A "non-reversed & non-lazy" scanner which does not support backward scanning + * and always does a real seek operation. Most scanners are inherited from this + * class. + */ +@InterfaceAudience.Private +public abstract class NonReversedNonLazyKeyValueScanner extends + NonLazyKeyValueScanner { + + @Override + public boolean backwardSeek(KeyValue key) throws IOException { + throw new NotImplementedException("backwardSeek must not be called on a " + + "non-reversed scanner"); + } + + @Override + public boolean seekToPreviousRow(KeyValue key) throws IOException { + throw new NotImplementedException( + "seekToPreviousRow must not be called on a " + "non-reversed scanner"); + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new NotImplementedException("seekToLastRow must not be called on a " + + "non-reversed scanner"); + } + +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java (working copy) @@ -0,0 +1,190 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; + +/** + * ReversedKeyValueHeap is used for supporting reversed scanning. Compared with + * KeyValueHeap, its scanner comparator is a little different (see + * ReversedKVScannerComparator), all seek is backward seek(see + * {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row + * if it is already at the end of one row when calling next(). + */ +public class ReversedKeyValueHeap extends KeyValueHeap { + + /** + * @param scanners + * @param comparator + * @throws IOException + */ + public ReversedKeyValueHeap(List scanners, + KVComparator comparator) throws IOException { + super(scanners, new ReversedKVScannerComparator(comparator)); + } + + @Override + public boolean seek(KeyValue seekKey) throws IOException { + throw new IllegalStateException( + "seek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean reseek(KeyValue seekKey) throws IOException { + throw new IllegalStateException( + "reseek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean requestSeek(KeyValue key, boolean forward, boolean useBloom) + throws IOException { + throw new IllegalStateException( + "requestSeek cannot be called on ReversedKeyValueHeap"); + } + + @Override + public boolean seekToPreviousRow(KeyValue seekKey) throws IOException { + if (current == null) { + return false; + } + heap.add(current); + current = null; + + KeyValueScanner scanner; + while ((scanner = heap.poll()) != null) { + KeyValue topKey = scanner.peek(); + if (comparator.getComparator().compareRows(topKey.getBuffer(), + topKey.getRowOffset(), topKey.getRowLength(), seekKey.getBuffer(), + seekKey.getRowOffset(), seekKey.getRowLength()) < 0) { + // Row of Top KeyValue is before Seek row. + heap.add(scanner); + current = pollRealKV(); + return current != null; + } + + if (!scanner.seekToPreviousRow(seekKey)) { + scanner.close(); + } else { + heap.add(scanner); + } + } + + // Heap is returning empty, scanner is done + return false; + } + + @Override + public boolean backwardSeek(KeyValue seekKey) throws IOException { + if (current == null) { + return false; + } + heap.add(current); + current = null; + + KeyValueScanner scanner; + while ((scanner = heap.poll()) != null) { + KeyValue topKey = scanner.peek(); + if ((comparator.getComparator().matchingRows(seekKey, topKey) && comparator + .getComparator().compare(seekKey, topKey) <= 0) + || comparator.getComparator().compareRows(seekKey, topKey) > 0) { + heap.add(scanner); + current = pollRealKV(); + return current != null; + } + if (!scanner.backwardSeek(seekKey)) { + scanner.close(); + } else { + heap.add(scanner); + } + } + return false; + } + + @Override + public KeyValue next() throws IOException { + if (this.current == null) { + return null; + } + KeyValue kvReturn = this.current.next(); + KeyValue kvNext = this.current.peek(); + if (kvNext == null + || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) { + if (this.current.seekToPreviousRow(kvReturn)) { + this.heap.add(this.current); + } else { + this.current.close(); + } + this.current = pollRealKV(); + } else { + KeyValueScanner topScanner = this.heap.peek(); + if (topScanner != null + && this.comparator.compare(this.current, topScanner) > 0) { + this.heap.add(this.current); + this.current = pollRealKV(); + } + } + return kvReturn; + } + + /** + * In ReversedKVScannerComparator, we compare the row of scanners' peek values + * first, sort bigger one before the smaller one. Then compare the KeyValue if + * they have the equal row, sort smaller one before the bigger one + */ + private static class ReversedKVScannerComparator extends KVScannerComparator { + + /** + * Constructor + * @param kvComparator + */ + public ReversedKVScannerComparator(KVComparator kvComparator) { + super(kvComparator); + } + + @Override + public int compare(KeyValueScanner left, KeyValueScanner right) { + int rowComparison = compareRows(left.peek(), right.peek()); + if (rowComparison != 0) { + return -rowComparison; + } + return super.compare(left, right); + } + + /** + * Compares rows of two KeyValue + * @param left + * @param right + * @return less than 0 if left is smaller, 0 if equal etc.. + */ + public int compareRows(KeyValue left, KeyValue right) { + return super.kvComparator.compareRows(left, right); + } + } + + @Override + public boolean seekToLastRow() throws IOException { + throw new NotImplementedException("Not implemented"); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java (working copy) @@ -0,0 +1,79 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; + +/** + * ReversibleRegionScannerImpl extends from RegionScannerImpl, and is used to + * support reversed scanning. + */ +class ReversedRegionScannerImpl extends RegionScannerImpl { + + /** + * @param scan + * @param additionalScanners + * @param region + * @throws IOException + */ + ReversedRegionScannerImpl(Scan scan, + List additionalScanners, HRegion region) + throws IOException { + region.super(scan, additionalScanners, region); + } + + @Override + protected void initializeKVHeap(List scanners, + List joinedScanners, HRegion region) throws IOException { + this.storeHeap = new ReversedKeyValueHeap(scanners, region.getComparator()); + if (!joinedScanners.isEmpty()) { + this.joinedHeap = new ReversedKeyValueHeap(joinedScanners, + region.getComparator()); + } + } + + @Override + protected boolean isStopRow(byte[] currentRow, int offset, short length) { + return currentRow == null + || (super.stopRow != null && region.getComparator().compareRows( + stopRow, 0, stopRow.length, currentRow, offset, length) >= super.isScan); + } + + @Override + protected boolean nextRow(byte[] currentRow, int offset, short length) + throws IOException { + assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; + byte row[] = new byte[length]; + System.arraycopy(currentRow, offset, row, 0, length); + this.storeHeap.seekToPreviousRow(KeyValue.createFirstOnRow(row)); + resetFilters(); + // Calling the hook in CP which allows it to do a fast forward + if (this.region.getCoprocessorHost() != null) { + return this.region.getCoprocessorHost().postScannerFilterRow(this, + currentRow); + } + return true; + } + +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java (working copy) @@ -0,0 +1,133 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; + +/** + * ReversedStoreScanner extends from StoreScanner, and is used to support + * reversed scanning. + */ +class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { + + /** + * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we + * are not in a compaction. + * + * @param store who we scan + * @param scanInfo + * @param scan the spec + * @param columns which columns we are scanning + * @throws IOException + */ + ReversedStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + NavigableSet columns) throws IOException { + super(store, scanInfo, scan, columns); + } + + /** Constructor for testing. */ + ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, + final NavigableSet columns, final List scanners) + throws IOException { + super(scan, scanInfo, scanType, columns, scanners, + HConstants.LATEST_TIMESTAMP); + } + + @Override + protected void resetKVHeap(List scanners, + KVComparator comparator) throws IOException { + // Combine all seeked scanners with a heap + heap = new ReversedKeyValueHeap(scanners, comparator); + } + + @Override + protected void seekScanners(List scanners, + KeyValue seekKey, boolean isLazy) throws IOException { + // Seek all scanners to the start of the Row (or if the exact matching row + // key does not exist, then to the start of the previous matching Row). + if (seekKey.matchingRow(HConstants.EMPTY_START_ROW)) { + for (KeyValueScanner scanner : scanners) { + scanner.seekToLastRow(); + } + } else { + for (KeyValueScanner scanner : scanners) { + scanner.backwardSeek(seekKey); + } + } + } + + @Override + protected synchronized boolean seekToNextRow(KeyValue kv) throws IOException { + return seekToPreviousRow(kv); + } + + /** + * Do a backwardSeek in a reversed StoreScanner(scan backward) + */ + @Override + protected synchronized boolean seekAsDirection(KeyValue kv) + throws IOException { + return backwardSeek(kv); + } + + @Override + protected void checkScanOrder(KeyValue prevKV, KeyValue kv, + KeyValue.KVComparator comparator) throws IOException { + // Check that the heap gives us KVs in an increasing order for same row and + // decreasing order for different rows. + assert prevKV == null + || comparator == null + || comparator.compareRows(kv, prevKV) < 0 + || (comparator.matchingRows(kv, prevKV) && comparator.compare(kv, + prevKV) > 0) : "Key " + prevKV + " followed by a " + + "error order key " + kv + " in cf " + store + " in reversed scan"; + } + + @Override + public synchronized boolean reseek(KeyValue kv) throws IOException { + throw new IllegalStateException( + "reseek cannot be called on ReversedStoreScanner"); + } + + @Override + public synchronized boolean seek(KeyValue key) throws IOException { + throw new IllegalStateException( + "seek cannot be called on ReversedStoreScanner"); + } + + @Override + public boolean seekToPreviousRow(KeyValue key) throws IOException { + checkReseek(); + return this.heap.seekToPreviousRow(key); + } + + @Override + public boolean backwardSeek(KeyValue key) throws IOException { + checkReseek(); + return this.heap.backwardSeek(key); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -125,6 +125,8 @@ private final boolean isUserScan; + private final boolean isReversed; + /** * Construct a QueryMatcher for a scan * @param scan @@ -179,6 +181,7 @@ this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); } + this.isReversed = scan.isReversed(); } /* @@ -229,15 +232,24 @@ int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength, bytes, offset, rowLength); - if (ret <= -1) { - return MatchCode.DONE; - } else if (ret >= 1) { - // could optimize this, if necessary? - // Could also be called SEEK_TO_CURRENT_ROW, but this - // should be rare/never happens. - return MatchCode.SEEK_NEXT_ROW; + if (!this.isReversed) { + if (ret <= -1) { + return MatchCode.DONE; + } else if (ret >= 1) { + // could optimize this, if necessary? + // Could also be called SEEK_TO_CURRENT_ROW, but this + // should be rare/never happens. + return MatchCode.SEEK_NEXT_ROW; + } + } else { + if (ret <= -1) { + return MatchCode.SEEK_NEXT_ROW; + } else if (ret >= 1) { + return MatchCode.DONE; + } } + // optimize case. if (this.stickyNextRow) return MatchCode.SEEK_NEXT_ROW; @@ -397,6 +409,15 @@ } public boolean moreRowsMayExistAfter(KeyValue kv) { + if (this.isReversed) { + if (rowComparator.compareRows(kv.getBuffer(), kv.getRowOffset(), + kv.getRowLength(), stopRow, 0, stopRow.length) <= 0) { + return false; + } else { + return true; + } + } + if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) && rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(), kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -2208,7 +2208,9 @@ scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); } if (scanner == null) { - scanner = new StoreScanner(this, getScanInfo(), scan, targetCols); + scanner = scan.isReversed() ? new ReversedStoreScanner(this, + getScanInfo(), scan, targetCols) : new StoreScanner(this, + getScanInfo(), scan, targetCols); } return scanner; } finally { Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -1680,11 +1680,18 @@ && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { return true; } - KeyValue startKeyValue = KeyValue.createFirstOnRow(scan.getStartRow()); - KeyValue stopKeyValue = KeyValue.createLastOnRow(scan.getStopRow()); - boolean nonOverLapping = (getComparator().compare(this.getFirstKey(), - stopKeyValue.getKey()) > 0 && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) - || getComparator().compare(this.getLastKey(), startKeyValue.getKey()) < 0; + KeyValue smallestScanKeyValue = scan.isReversed() ? KeyValue + .createFirstOnRow(scan.getStopRow()) : KeyValue.createFirstOnRow(scan + .getStartRow()); + KeyValue largestScanKeyValue = scan.isReversed() ? KeyValue + .createLastOnRow(scan.getStartRow()) : KeyValue.createLastOnRow(scan + .getStopRow()); + boolean nonOverLapping = (getComparator().compare( + this.getFirstKey(), largestScanKeyValue.getKey()) > 0 && !Bytes + .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(), + HConstants.EMPTY_END_ROW)) + || getComparator().compare(this.getLastKey(), + smallestScanKeyValue.getKey()) < 0; return !nonOverLapping; } @@ -1789,6 +1796,10 @@ return reader.getLastKey(); } + public byte[] getLastRowKey() { + return reader.getLastRowKey(); + } + public byte[] midkey() throws IOException { return reader.midkey(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; +import org.apache.hadoop.hbase.util.Bytes; /** * KeyValueScanner adaptor over the Reader. It also provides hooks into @@ -52,6 +53,9 @@ private boolean enforceMVCC = false; private boolean hasMVCCInfo = false; + // A flag represents whether could stop skipping KeyValues for MVCC + // if have encountered the next row. Only used for reversed scan + private boolean stopSkippingKVsIfNextRow = false; private static AtomicLong seekCount; @@ -179,6 +183,7 @@ protected boolean skipKVsNewerThanReadpoint() throws IOException { long readPoint = MultiVersionConsistencyControl.getThreadReadPoint(); + KeyValue startKV = cur; // We want to ignore all key-values that are newer than our current // readPoint @@ -187,6 +192,12 @@ && (cur.getMemstoreTS() > readPoint)) { hfs.next(); cur = hfs.getKeyValue(); + if (this.stopSkippingKVsIfNextRow + && Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(), + cur.getRowLength(), startKV.getBuffer(), startKV.getRowOffset(), + startKV.getRowLength()) > 0) { + return false; + } } if (cur == null) { @@ -378,4 +389,76 @@ return reader.passesTimerangeFilter(scan, oldestUnexpiredTS) && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns); } + + @Override + public boolean seekToPreviousRow(KeyValue key) throws IOException { + try { + try { + KeyValue seekKey = KeyValue.createFirstOnRow(key.getRow()); + seekCount.incrementAndGet(); + if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(), + seekKey.getKeyLength())) { + close(); + return false; + } + KeyValue firstKeyOfPreviousRow = KeyValue.createFirstOnRow(hfs + .getKeyValue().getRow()); + + seekCount.incrementAndGet(); + if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) { + close(); + return false; + } + + cur = hfs.getKeyValue(); + this.stopSkippingKVsIfNextRow = true; + boolean resultOfSkipKVs; + try { + resultOfSkipKVs = skipKVsNewerThanReadpoint(); + } finally { + this.stopSkippingKVsIfNextRow = false; + } + if (!resultOfSkipKVs + || Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(), + cur.getRowLength(), firstKeyOfPreviousRow.getBuffer(), + firstKeyOfPreviousRow.getRowOffset(), + firstKeyOfPreviousRow.getRowLength()) > 0) { + return seekToPreviousRow(firstKeyOfPreviousRow); + } + + return true; + } finally { + realSeekDone = true; + } + } catch (IOException ioe) { + throw new IOException("Could not seekToPreviousRow " + this + " to key " + + key, ioe); + } + } + + @Override + public boolean seekToLastRow() throws IOException { + byte[] lastRow = reader.getLastRowKey(); + if (lastRow == null) { + return false; + } + KeyValue seekKey = KeyValue.createFirstOnRow(lastRow); + if (seek(seekKey)) { + return true; + } else { + return seekToPreviousRow(seekKey); + } + } + + @Override + public boolean backwardSeek(KeyValue key) throws IOException { + seek(key); + if (cur == null + || Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(), + cur.getRowLength(), key.getBuffer(), key.getRowOffset(), + key.getRowLength()) > 0) { + return seekToPreviousRow(key); + } + return true; + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; @@ -42,12 +43,12 @@ * Scanner scans both the memstore and the HStore. Coalesce KeyValue stream * into List for a single row. */ -public class StoreScanner extends NonLazyKeyValueScanner +public class StoreScanner extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver { static final Log LOG = LogFactory.getLog(StoreScanner.class); - private Store store; + protected Store store; private ScanQueryMatcher matcher; - private KeyValueHeap heap; + protected KeyValueHeap heap; private boolean cacheBlocks; @@ -118,22 +119,11 @@ // Pass columns to try to filter out unnecessary StoreFiles. List scanners = getScannersNoCompaction(); - // Seek all scanners to the start of the Row (or if the exact matching row - // key does not exist, then to the start of the next matching Row). - // Always check bloom filter to optimize the top row seek for delete - // family marker. - if (explicitColumnQuery && lazySeekEnabledGlobally) { - for (KeyValueScanner scanner : scanners) { - scanner.requestSeek(matcher.getStartKey(), false, true); - } - } else { - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } - } + seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery + && lazySeekEnabledGlobally); // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.comparator); + resetKVHeap(scanners, store.getComparator()); this.store.addChangedReaderObserver(this); } @@ -161,12 +151,10 @@ scanners = selectScannersFrom(scanners); // Seek all scanners to the initial key - for(KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } + seekScanners(scanners, matcher.getStartKey(), false); // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.comparator); + resetKVHeap(scanners, store.getComparator()); } /** Constructor for testing. */ @@ -189,10 +177,8 @@ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); // Seek all scanners to the initial key - for (KeyValueScanner scanner : scanners) { - scanner.seek(matcher.getStartKey()); - } - heap = new KeyValueHeap(scanners, scanInfo.getComparator()); + seekScanners(scanners, matcher.getStartKey(), false); + resetKVHeap(scanners, scanInfo.getComparator()); } /** @@ -213,6 +199,37 @@ } /** + * Seek the specified scanners with the given key + * @param scanners + * @param seekKey + * @param isLazy true if using lazy seek + * @throws IOException + */ + protected void seekScanners(List scanners, + KeyValue seekKey, boolean isLazy) + throws IOException { + // Seek all scanners to the start of the Row (or if the exact matching row + // key does not exist, then to the start of the next matching Row). + // Always check bloom filter to optimize the top row seek for delete + // family marker. + if (isLazy) { + for (KeyValueScanner scanner : scanners) { + scanner.requestSeek(seekKey, false, true); + } + } else { + for (KeyValueScanner scanner : scanners) { + scanner.seek(seekKey); + } + } + } + + protected void resetKVHeap(List scanners, + KVComparator comparator) throws IOException { + // Combine all seeked scanners with a heap + heap = new KeyValueHeap(scanners, comparator); + } + + /** * Get a filtered list of scanners. Assumes we are not in a compaction. * @return list of scanners to seek */ @@ -358,9 +375,7 @@ int count = 0; try { LOOP: while((kv = this.heap.peek()) != null) { - // Check that the heap gives us KVs in an increasing order. - assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : - "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store; + checkScanOrder(prevKV, kv, comparator); prevKV = kv; ScanQueryMatcher.MatchCode qcode = matcher.match(kv); switch(qcode) { @@ -376,9 +391,9 @@ if (!matcher.moreRowsMayExistAfter(kv)) { return false; } - reseek(matcher.getKeyForNextRow(kv)); + seekToNextRow(kv); } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { - reseek(matcher.getKeyForNextColumn(kv)); + seekAsDirection(matcher.getKeyForNextColumn(kv)); } else { this.heap.next(); } @@ -404,11 +419,11 @@ return false; } - reseek(matcher.getKeyForNextRow(kv)); + seekToNextRow(kv); break; case SEEK_NEXT_COL: - reseek(matcher.getKeyForNextColumn(kv)); + seekAsDirection(matcher.getKeyForNextColumn(kv)); break; case SKIP: @@ -418,7 +433,7 @@ case SEEK_NEXT_USING_HINT: KeyValue nextKV = matcher.getNextKeyHint(kv); if (nextKV != null) { - reseek(nextKV); + seekAsDirection(nextKV); } else { heap.next(); } @@ -484,7 +499,7 @@ * next KV) * @throws IOException */ - private boolean checkReseek() throws IOException { + protected boolean checkReseek() throws IOException { if (this.heap == null && this.lastTop != null) { resetScannerStack(this.lastTop); if (this.heap.peek() == null @@ -510,12 +525,10 @@ * could have done it now by storing the scan object from the constructor */ List scanners = getScannersNoCompaction(); - for(KeyValueScanner scanner : scanners) { - scanner.seek(lastTopKey); - } + seekScanners(scanners, lastTopKey, false); // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, store.comparator); + resetKVHeap(scanners, store.getComparator()); // Reset the state of the Query Matcher and set to top row. // Only reset and call setRow if the row changes; avoids confusing the @@ -533,6 +546,36 @@ } } + /** + * Check whether scan as expected order + * @param prevKV + * @param kv + * @param comparator + * @throws IOException + */ + protected void checkScanOrder(KeyValue prevKV, KeyValue kv, + KeyValue.KVComparator comparator) throws IOException { + // Check that the heap gives us KVs in an increasing order. + assert prevKV == null || comparator == null + || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV + + " followed by a " + "smaller key " + kv + " in cf " + store; + } + + protected synchronized boolean seekToNextRow(KeyValue kv) throws IOException { + return reseek(matcher.getKeyForNextRow(kv)); + } + + /** + * Do a reseek in a normal StoreScanner(scan forward) + * @param kv + * @return true if scanner has values left, false if end of scanner + * @throws IOException + */ + protected synchronized boolean seekAsDirection(KeyValue kv) + throws IOException { + return reseek(kv); + } + @Override public synchronized boolean reseek(KeyValue kv) throws IOException { //Heap will not be null, if this is called from next() which. Index: src/main/java/org/apache/hadoop/hbase/util/Bytes.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/Bytes.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/util/Bytes.java (working copy) @@ -1667,4 +1667,36 @@ return toString(b, 0, n); } + /** + * Create a max byte array with the specified max byte count + * @param maxByteCount the length of returned byte array + * @return the created max byte array + */ + public static byte[] createMaxByteArray(int maxByteCount) { + byte[] maxByteArray = new byte[maxByteCount]; + for (int i = 0; i < maxByteArray.length; i++) { + maxByteArray[i] = (byte) 0xff; + } + return maxByteArray; + } + + /** + * Create a byte array which is multiple given bytes + * @param srcBytes + * @param multiNum + * @return byte array + */ + public static byte[] multiple(byte[] srcBytes, int multiNum) { + if (multiNum <= 0) { + return new byte[0]; + } + byte[] result = new byte[srcBytes.length * multiNum]; + for (int i = 0; i < multiNum; i++) { + System.arraycopy(srcBytes, 0, result, i * srcBytes.length, + srcBytes.length); + } + return result; + + } + } Index: src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (revision 1533970) +++ src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java (working copy) @@ -26,13 +26,13 @@ import java.util.SortedSet; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.regionserver.NonLazyKeyValueScanner; +import org.apache.hadoop.hbase.regionserver.NonReversedNonLazyKeyValueScanner; /** * Utility scanner that wraps a sortable collection and serves * as a KeyValueScanner. */ -public class CollectionBackedScanner extends NonLazyKeyValueScanner { +public class CollectionBackedScanner extends NonReversedNonLazyKeyValueScanner { final private Iterable data; final KeyValue.KVComparator comparator; private Iterator iter; Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1533970) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -978,6 +978,26 @@ } /** + * Create a table. + * @param tableName + * @param family + * @param splitRows + * @return An HTable instance for the created table. + * @throws IOException + */ + public HTable createTable(byte[] tableName, byte[] family, byte[][] splitRows) + throws IOException { + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(family); + desc.addFamily(hcd); + getHBaseAdmin().createTable(desc, splitRows); + // HBaseAdmin only waits for regions to appear in hbase:meta we should wait + // until they are assigned + waitUntilAllRegionsAssigned(tableName); + return new HTable(getConfiguration(), tableName); + } + + /** * Drop an existing table * @param tableName existing table */ Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 1533970) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -2505,26 +2505,34 @@ } - private void scanTestNull(HTable ht, byte [] row, byte [] family, - byte [] value) - throws Exception { + private void scanTestNull(HTable ht, byte[] row, byte[] family, byte[] value) + throws Exception { + scanTestNull(ht, row, family, value, false); + } + private void scanTestNull(HTable ht, byte[] row, byte[] family, byte[] value, + boolean isReversedScan) throws Exception { + Scan scan = new Scan(); + scan.setReversed(isReversedScan); scan.addColumn(family, null); Result result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReversed(isReversedScan); scan.addColumn(family, HConstants.EMPTY_BYTE_ARRAY); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReversed(isReversedScan); scan.addFamily(family); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); scan = new Scan(); + scan.setReversed(isReversedScan); result = getSingleScanResult(ht, scan); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); @@ -4940,6 +4948,475 @@ TEST_UTIL.deleteTable(TABLE); } + @Test + public void testSuperSimpleWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testSuperSimpleWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + Put put = new Put(Bytes.toBytes("0-b11111-0000000000000000000")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000002")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000004")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000006")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b11111-0000000000000000008")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000001")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000003")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000005")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000007")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + put = new Put(Bytes.toBytes("0-b22222-0000000000000000009")); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + ht.flushCommits(); + Scan scan = new Scan(Bytes.toBytes("0-b11111-9223372036854775807"), + Bytes.toBytes("0-b11111-0000000000000000000")); + scan.setReversed(true); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertTrue(Bytes.equals(result.getRow(), + Bytes.toBytes("0-b11111-0000000000000000008"))); + scanner.close(); + ht.close(); + } + + @Test + public void testFiltersWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testFiltersWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte[][] ROWS = makeN(ROW, 10); + byte[][] QUALIFIERS = { Bytes.toBytes("col0--"), + Bytes.toBytes("col1--"), + Bytes.toBytes("col2--"), + Bytes.toBytes("col3--"), + Bytes.toBytes("col4--"), + Bytes.toBytes("col5--"), + Bytes.toBytes("col6--"), + Bytes.toBytes("col7--"), + Bytes.toBytes("col8--"), + Bytes.toBytes("col9--") }; + for (int i = 0; i < 10; i++) { + Put put = new Put(ROWS[i]); + put.add(FAMILY, QUALIFIERS[i], VALUE); + ht.put(put); + } + Scan scan = new Scan(); + scan.setReversed(true); + scan.addFamily(FAMILY); + Filter filter = new QualifierFilter(CompareOp.EQUAL, + new RegexStringComparator("col[1-5]")); + scan.setFilter(filter); + ResultScanner scanner = ht.getScanner(scan); + int expectedIndex = 5; + for (Result result : scanner) { + assertEquals(result.size(), 1); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[expectedIndex])); + assertTrue(Bytes.equals(result.raw()[0].getQualifier(), + QUALIFIERS[expectedIndex])); + expectedIndex--; + } + assertEquals(expectedIndex, 0); + scanner.close(); + ht.close(); + } + + @Test + public void testKeyOnlyFilterWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testKeyOnlyFilterWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte[][] ROWS = makeN(ROW, 10); + byte[][] QUALIFIERS = { Bytes.toBytes("col0--"), + Bytes.toBytes("col1--"), + Bytes.toBytes("col2--"), + Bytes.toBytes("col3--"), + Bytes.toBytes("col4--"), + Bytes.toBytes("col5--"), + Bytes.toBytes("col6--"), + Bytes.toBytes("col7--"), + Bytes.toBytes("col8--"), + Bytes.toBytes("col9--") }; + for (int i = 0; i < 10; i++) { + Put put = new Put(ROWS[i]); + put.add(FAMILY, QUALIFIERS[i], VALUE); + ht.put(put); + } + Scan scan = new Scan(); + scan.setReversed(true); + scan.addFamily(FAMILY); + Filter filter = new KeyOnlyFilter(true); + scan.setFilter(filter); + ResultScanner scanner = ht.getScanner(scan); + int count = 0; + for (Result result : ht.getScanner(scan)) { + assertEquals(result.size(), 1); + assertEquals(result.raw()[0].getValueLength(), Bytes.SIZEOF_INT); + assertEquals(Bytes.toInt(result.raw()[0].getValue()), VALUE.length); + count++; + } + assertEquals(count, 10); + scanner.close(); + ht.close(); + } + + /** + * Test simple table and non-existent row cases. + */ + @Test + public void testSimpleMissingWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testSimpleMissingWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + byte[][] ROWS = makeN(ROW, 4); + + // Try to get a row on an empty table + Scan scan = new Scan(); + scan.setReversed(true); + Result result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(ROWS[0]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(ROWS[0], ROWS[1]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.setReversed(true); + scan.addFamily(FAMILY); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + scan = new Scan(); + scan.setReversed(true); + scan.addColumn(FAMILY, QUALIFIER); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + + // Insert a row + + Put put = new Put(ROWS[2]); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + + // Make sure we can scan the row + scan = new Scan(); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + scan = new Scan(ROWS[3], ROWS[0]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + scan = new Scan(ROWS[2], ROWS[1]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE); + + // Try to scan empty rows around it + // Introduced MemStore#shouldSeekForReverseScan to fix the following + scan = new Scan(ROWS[1]); + scan.setReversed(true); + result = getSingleScanResult(ht, scan); + assertNullResult(result); + ht.close(); + } + + @Test + public void testNullWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testNullWithReverseScan"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + // Null qualifier (should work) + Put put = new Put(ROW); + put.add(FAMILY, null, VALUE); + ht.put(put); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + Delete delete = new Delete(ROW); + delete.deleteColumns(FAMILY, null); + ht.delete(delete); + // Use a new table + byte[] TABLE2 = Bytes.toBytes("testNull2WithReverseScan"); + ht = TEST_UTIL.createTable(TABLE2, FAMILY); + // Empty qualifier, byte[0] instead of null (should work) + put = new Put(ROW); + put.add(FAMILY, HConstants.EMPTY_BYTE_ARRAY, VALUE); + ht.put(put); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + TEST_UTIL.flush(); + scanTestNull(ht, ROW, FAMILY, VALUE, true); + delete = new Delete(ROW); + delete.deleteColumns(FAMILY, HConstants.EMPTY_BYTE_ARRAY); + ht.delete(delete); + // Null value + put = new Put(ROW); + put.add(FAMILY, QUALIFIER, null); + ht.put(put); + Scan scan = new Scan(); + scan.setReversed(true); + scan.addColumn(FAMILY, QUALIFIER); + Result result = getSingleScanResult(ht, scan); + assertSingleResult(result, ROW, FAMILY, QUALIFIER, null); + ht.close(); + } + + @Test + public void testDeletesWithReverseScan() throws Exception { + byte[] TABLE = Bytes.toBytes("testDeletesWithReverseScan"); + byte[][] ROWS = makeNAscii(ROW, 6); + byte[][] FAMILIES = makeNAscii(FAMILY, 3); + byte[][] VALUES = makeN(VALUE, 5); + long[] ts = { 1000, 2000, 3000, 4000, 5000 }; + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, + TEST_UTIL.getConfiguration(), 3); + + Put put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[0], QUALIFIER, ts[1], VALUES[1]); + ht.put(put); + + Delete delete = new Delete(ROW); + delete.deleteFamily(FAMILIES[0], ts[0]); + ht.delete(delete); + + Scan scan = new Scan(ROW); + scan.setReversed(true); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + Result result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1] }, + new byte[][] { VALUES[1] }, 0, 0); + + // Test delete latest version + put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); + put.add(FAMILIES[0], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[0], QUALIFIER, ts[3], VALUES[3]); + put.add(FAMILIES[0], null, ts[4], VALUES[4]); + put.add(FAMILIES[0], null, ts[2], VALUES[2]); + put.add(FAMILIES[0], null, ts[3], VALUES[3]); + ht.put(put); + + delete = new Delete(ROW); + delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4] + ht.delete(delete); + + scan = new Scan(ROW); + scan.setReversed(true); + scan.addColumn(FAMILIES[0], QUALIFIER); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1], + ts[2], ts[3] }, new byte[][] { VALUES[1], VALUES[2], VALUES[3] }, 0, 2); + + // Test for HBASE-1847 + delete = new Delete(ROW); + delete.deleteColumn(FAMILIES[0], null); + ht.delete(delete); + + // Cleanup null qualifier + delete = new Delete(ROW); + delete.deleteColumns(FAMILIES[0], null); + ht.delete(delete); + + // Expected client behavior might be that you can re-put deleted values + // But alas, this is not to be. We can't put them back in either case. + + put = new Put(ROW); + put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000 + put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000 + ht.put(put); + + // The Scanner returns the previous values, the expected-naive-unexpected + // behavior + + scan = new Scan(ROW); + scan.setReversed(true); + scan.addFamily(FAMILIES[0]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1], + ts[2], ts[3] }, new byte[][] { VALUES[1], VALUES[2], VALUES[3] }, 0, 2); + + // Test deleting an entire family from one row but not the other various + // ways + + put = new Put(ROWS[0]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + put = new Put(ROWS[1]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + put = new Put(ROWS[2]); + put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]); + put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]); + put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]); + ht.put(put); + + delete = new Delete(ROWS[0]); + delete.deleteFamily(FAMILIES[2]); + ht.delete(delete); + + delete = new Delete(ROWS[1]); + delete.deleteColumns(FAMILIES[1], QUALIFIER); + ht.delete(delete); + + delete = new Delete(ROWS[2]); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + delete.deleteColumn(FAMILIES[2], QUALIFIER); + ht.delete(delete); + + scan = new Scan(ROWS[0]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, new long[] { ts[0], + ts[1] }, new byte[][] { VALUES[0], VALUES[1] }, 0, 1); + + scan = new Scan(ROWS[1]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + + scan = new Scan(ROWS[2]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + result = getSingleScanResult(ht, scan); + assertEquals(1, result.size()); + assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, + new long[] { ts[2] }, new byte[][] { VALUES[2] }, 0, 0); + + // Test if we delete the family first in one row (HBASE-1541) + + delete = new Delete(ROWS[3]); + delete.deleteFamily(FAMILIES[1]); + ht.delete(delete); + + put = new Put(ROWS[3]); + put.add(FAMILIES[2], QUALIFIER, VALUES[0]); + ht.put(put); + + put = new Put(ROWS[4]); + put.add(FAMILIES[1], QUALIFIER, VALUES[1]); + put.add(FAMILIES[2], QUALIFIER, VALUES[2]); + ht.put(put); + + scan = new Scan(ROWS[4]); + scan.setReversed(true); + scan.addFamily(FAMILIES[1]); + scan.addFamily(FAMILIES[2]); + scan.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = ht.getScanner(scan); + result = scanner.next(); + assertTrue("Expected 2 keys but received " + result.size(), + result.size() == 2); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[4])); + assertTrue(Bytes.equals(result.raw()[1].getRow(), ROWS[4])); + assertTrue(Bytes.equals(result.raw()[0].getValue(), VALUES[1])); + assertTrue(Bytes.equals(result.raw()[1].getValue(), VALUES[2])); + result = scanner.next(); + assertTrue("Expected 1 key but received " + result.size(), + result.size() == 1); + assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[3])); + assertTrue(Bytes.equals(result.raw()[0].getValue(), VALUES[0])); + scanner.close(); + ht.close(); + } + + /** + * Tests reversed scan under multi regions + */ + @Test + public void testReversedScanUnderMultiRegions() throws Exception { + // Test Initialization. + byte[] TABLE = Bytes.toBytes("testReversedScanUnderMultiRegions"); + byte[] maxByteArray = ReversedClientScanner.MAX_BYTE_ARRAY; + byte[][] splitRows = new byte[][] { Bytes.toBytes("005"), + Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)), + Bytes.toBytes("006"), + Bytes.add(Bytes.toBytes("006"), Bytes.multiple(maxByteArray, 8)), + Bytes.toBytes("007"), + Bytes.add(Bytes.toBytes("007"), Bytes.multiple(maxByteArray, 4)), + Bytes.toBytes("008"), Bytes.multiple(maxByteArray, 2) }; + HTable table = TEST_UTIL.createTable(TABLE, FAMILY, splitRows); + TEST_UTIL.waitUntilAllRegionsAssigned(table.getTableName()); + + assertEquals(splitRows.length + 1, table.getRegionLocations().size()); + // Insert one row each region + int insertNum = splitRows.length; + for (int i = 0; i < insertNum; i++) { + Put put = new Put(splitRows[i]); + put.add(FAMILY, QUALIFIER, VALUE); + table.put(put); + } + + // scan forward + ResultScanner scanner = table.getScanner(new Scan()); + int count = 0; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + } + assertEquals(insertNum, count); + + // scan backward + Scan scan = new Scan(); + scan.setReversed(true); + scanner = table.getScanner(scan); + count = 0; + byte[] lastRow = null; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + byte[] thisRow = r.getRow(); + if (lastRow != null) { + assertTrue("Error scan order, last row= " + Bytes.toString(lastRow) + + ",this row=" + Bytes.toString(thisRow), + Bytes.compareTo(thisRow, lastRow) < 0); + } + lastRow = thisRow; + } + assertEquals(insertNum, count); + table.close(); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (revision 1533970) +++ src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (working copy) @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; import org.junit.experimental.categories.Category; import com.google.common.base.Throwables; @@ -1769,6 +1770,151 @@ } } + @Test + public void testPrefixFilterWithReverseScan() throws Exception { + // Grab rows from group one (half of total) + long expectedRows = this.numRows / 2; + long expectedKeys = this.colsPerRow; + Scan s = new Scan(); + s.setReversed(true); + s.setFilter(new PrefixFilter(Bytes.toBytes("testRowOne"))); + verifyScan(s, expectedRows, expectedKeys); + } + + public void testPageFilterWithReverseScan() throws Exception { + // KVs in first 6 rows + KeyValue[] expectedKVs = { + // testRowOne-0 + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowOne-2 + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowOne-3 + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]), + new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]), + // testRowTwo-0 + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-2 + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]), + // testRowTwo-3 + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]), + new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]) }; + + // Grab all 6 rows + long expectedRows = 6; + long expectedKeys = this.colsPerRow; + Scan s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first 4 rows (6 cols per row) + expectedRows = 4; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first 2 rows + expectedRows = 2; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + + // Grab first row + expectedRows = 1; + expectedKeys = this.colsPerRow; + s = new Scan(); + s.setReversed(true); + s.setFilter(new PageFilter(expectedRows)); + verifyScan(s, expectedRows, expectedKeys); + } + + public void testWhileMatchFilterWithFilterRowWithReverseScan() + throws Exception { + final int pageSize = 4; + + Scan s = new Scan(); + s.setReversed(true); + WhileMatchFilter filter = new WhileMatchFilter(new PageFilter(pageSize)); + s.setFilter(filter); + + InternalScanner scanner = this.region.getScanner(s); + int scannerCounter = 0; + while (true) { + boolean isMoreResults = scanner.next(new ArrayList()); + scannerCounter++; + + if (scannerCounter >= pageSize) { + Assert.assertTrue( + "The WhileMatchFilter should now filter all remaining", + filter.filterAllRemaining()); + } + if (!isMoreResults) { + break; + } + } + scanner.close(); + Assert.assertEquals("The page filter returned more rows than expected", + pageSize, scannerCounter); + } + + public void testWhileMatchFilterWithFilterRowKeyWithReverseScan() + throws Exception { + Scan s = new Scan(); + String prefix = "testRowOne"; + WhileMatchFilter filter = new WhileMatchFilter(new PrefixFilter( + Bytes.toBytes(prefix))); + s.setFilter(filter); + s.setReversed(true); + + InternalScanner scanner = this.region.getScanner(s); + while (true) { + ArrayList values = new ArrayList(); + boolean isMoreResults = scanner.next(values); + if (!isMoreResults + || !Bytes.toString(values.get(0).getRow()).startsWith(prefix)) { + Assert.assertTrue( + "The WhileMatchFilter should now filter all remaining", + filter.filterAllRemaining()); + } + if (!isMoreResults) { + break; + } + } + scanner.close(); + } + /** * Filter which makes sleeps for a second between each row of a scan. * This can be useful for manual testing of bugs like HBASE-5973. For example: Index: src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (revision 1533970) +++ src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java (working copy) @@ -74,6 +74,8 @@ public KeyValueScanner preStoreScannerOpen(final ObserverContext c, Store store, final Scan scan, final NavigableSet targetCols, KeyValueScanner s) throws IOException { - return new StoreScanner(store, store.getScanInfo(), scan, targetCols); + return scan.isReversed() ? new ReversedStoreScanner(store, + store.getScanInfo(), scan, targetCols) : new StoreScanner(store, + store.getScanInfo(), scan, targetCols); } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (revision 1533970) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (working copy) @@ -49,8 +49,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; -import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -725,6 +725,49 @@ } /** + * Test that on a major compaction, if all cells are expired or deleted, then + * we'll end up with no product. Make sure scanner over region returns right + * answer in this case - and that it just basically works. + * @throws IOException + */ + public void testMajorCompactingToNoOutputWithReverseScan() throws IOException { + createStoreFile(r); + for (int i = 0; i < compactionThreshold; i++) { + createStoreFile(r); + } + // Now delete everything. + Scan scan = new Scan(); + scan.setReversed(true); + InternalScanner s = r.getScanner(scan); + do { + List results = new ArrayList(); + boolean result = s.next(results); + assertTrue(!results.isEmpty()); + r.delete(new Delete(results.get(0).getRow()), true); + if (!result) + break; + } while (true); + s.close(); + // Flush + r.flushcache(); + // Major compact. + r.compactStores(true); + scan = new Scan(); + scan.setReversed(true); + s = r.getScanner(scan); + int counter = 0; + do { + List results = new ArrayList(); + boolean result = s.next(results); + if (!result) + break; + counter++; + } while (true); + s.close(); + assertEquals(0, counter); + } + + /** * Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes. */ public static class TrackableCompactionRequest extends CompactionRequest { Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1533970) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -4325,7 +4325,566 @@ "value-version-" + ts, Bytes.toString(kv.getValue())); } + public void testReverseScanner_FromMemStore_SingleCF_Normal() + throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, + null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + Scan scan = new Scan(rowC); + scan.setMaxVersions(5); + scan.setReversed(true); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStore_SingleCF_LargerKey() + throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, + null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + + Scan scan = new Scan(rowD); + List currRow = new ArrayList(); + scan.setReversed(true); + scan.setMaxVersions(5); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStore_SingleCF_FullScan() + throws IOException { + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put, + null); + KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowC); + put.add(kv1); + put.add(kv11); + region.put(put); + put = new Put(rowA); + put.add(kv2); + region.put(put); + put = new Put(rowB); + put.add(kv3); + region.put(put); + Scan scan = new Scan(); + List currRow = new ArrayList(); + scan.setReversed(true); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA)); + assertFalse(hasNext); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_moreRowsMayExistAfter() throws IOException { + // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] rowE = Bytes.toBytes("rowE"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + long ts = 1; + String method = this.getName(); + this.region = initHRegion(tableName, method, families); + try { + KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); + KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowA); + put.add(kv1); + region.put(put); + put = new Put(rowB); + put.add(kv2); + region.put(put); + put = new Put(rowC); + put.add(kv3); + region.put(put); + put = new Put(rowD); + put.add(kv4_1); + region.put(put); + put = new Put(rowD); + put.add(kv4_2); + region.put(put); + put = new Put(rowE); + put.add(kv5); + region.put(put); + region.flushcache(); + Scan scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col1); + scan.setReversed(true); + List currRow = new ArrayList(); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertFalse(hasNext); + scanner.close(); + + scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col2); + scan.setReversed(true); + currRow.clear(); + scanner = region.getScanner(scan); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_smaller_blocksize() throws IOException { + // case to ensure no conflict with HFile index optimization + byte[] rowA = Bytes.toBytes("rowA"); + byte[] rowB = Bytes.toBytes("rowB"); + byte[] rowC = Bytes.toBytes("rowC"); + byte[] rowD = Bytes.toBytes("rowD"); + byte[] rowE = Bytes.toBytes("rowE"); + byte[] cf = Bytes.toBytes("CF"); + byte[][] families = { cf }; + byte[] col1 = Bytes.toBytes("col1"); + byte[] col2 = Bytes.toBytes("col2"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration config = new HBaseConfiguration(); + config.setInt("test.block.size", 1); + this.region = initHRegion(tableName, method, config, families); + try { + KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null); + KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null); + KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null); + Put put = null; + put = new Put(rowA); + put.add(kv1); + region.put(put); + put = new Put(rowB); + put.add(kv2); + region.put(put); + put = new Put(rowC); + put.add(kv3); + region.put(put); + put = new Put(rowD); + put.add(kv4_1); + region.put(put); + put = new Put(rowD); + put.add(kv4_2); + region.put(put); + put = new Put(rowE); + put.add(kv5); + region.put(put); + region.flushcache(); + Scan scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col1); + scan.setReversed(true); + List currRow = new ArrayList(); + InternalScanner scanner = region.getScanner(scan); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB)); + assertFalse(hasNext); + scanner.close(); + + scan = new Scan(rowD, rowA); + scan.addColumn(families[0], col2); + scan.setReversed(true); + currRow.clear(); + scanner = region.getScanner(scan); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD)); + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1() + throws IOException { + byte[] row0 = Bytes.toBytes("row0"); // 1 kv + byte[] row1 = Bytes.toBytes("row1"); // 2 kv + byte[] row2 = Bytes.toBytes("row2"); // 4 kv + byte[] row3 = Bytes.toBytes("row3"); // 2 kv + byte[] row4 = Bytes.toBytes("row4"); // 5 kv + byte[] row5 = Bytes.toBytes("row5"); // 2 kv + byte[] cf1 = Bytes.toBytes("CF1"); + byte[] cf2 = Bytes.toBytes("CF2"); + byte[] cf3 = Bytes.toBytes("CF3"); + byte[][] families = { cf1, cf2, cf3 }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + // disable compactions in this test. + conf.setInt("hbase.hstore.compactionThreshold", 10000); + this.region = initHRegion(tableName, method, conf, families); + try { + // kv naming style: kv(row number) + totalKvCountInThisRow + seq no + KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put, + null); + KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1, + KeyValue.Type.Put, null); + KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put, + null); + KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put, + null); + KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4, + KeyValue.Type.Put, null); + KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4, + KeyValue.Type.Put, null); + KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put, + null); + KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put, + null); + KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5, + KeyValue.Type.Put, null); + KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3, + KeyValue.Type.Put, null); + KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put, + null); + KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put, + null); + // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv) + Put put = null; + put = new Put(row1); + put.add(kv1_2_1); + region.put(put); + put = new Put(row2); + put.add(kv2_4_1); + region.put(put); + put = new Put(row4); + put.add(kv4_5_4); + put.add(kv4_5_5); + region.put(put); + region.flushcache(); + // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv) + put = new Put(row4); + put.add(kv4_5_1); + put.add(kv4_5_3); + region.put(put); + put = new Put(row1); + put.add(kv1_2_2); + region.put(put); + put = new Put(row2); + put.add(kv2_4_4); + region.put(put); + region.flushcache(); + // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv) + put = new Put(row4); + put.add(kv4_5_2); + region.put(put); + put = new Put(row2); + put.add(kv2_4_2); + put.add(kv2_4_3); + region.put(put); + put = new Put(row3); + put.add(kv3_2_2); + region.put(put); + region.flushcache(); + // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max) + // ( 2 kv) + put = new Put(row0); + put.add(kv0_1_1); + region.put(put); + put = new Put(row3); + put.add(kv3_2_1); + region.put(put); + put = new Put(row5); + put.add(kv5_2_1); + put.add(kv5_2_2); + region.put(put); + // scan range = ["row4", min), skip the max "row5" + Scan scan = new Scan(row4); + scan.setMaxVersions(5); + scan.setBatch(3); + scan.setReversed(true); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = false; + // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not + // included in scan range + // "row4" takes 2 next() calls since batch=3 + hasNext = scanner.next(currRow); + assertEquals(3, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + // 2. scan out "row3" (2 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row3)); + assertTrue(hasNext); + // 3. scan out "row2" (4 kvs) + // "row2" takes 2 next() calls since batch=3 + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(3, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + // 4. scan out "row1" (2 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(2, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row1)); + assertTrue(hasNext); + // 5. scan out "row0" (1 kv) + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row0)); + assertFalse(hasNext); + + scanner.close(); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2() + throws IOException { + byte[] row1 = Bytes.toBytes("row1"); + byte[] row2 = Bytes.toBytes("row2"); + byte[] row3 = Bytes.toBytes("row3"); + byte[] row4 = Bytes.toBytes("row4"); + byte[] cf1 = Bytes.toBytes("CF1"); + byte[] cf2 = Bytes.toBytes("CF2"); + byte[] cf3 = Bytes.toBytes("CF3"); + byte[] cf4 = Bytes.toBytes("CF4"); + byte[][] families = { cf1, cf2, cf3, cf4 }; + byte[] col = Bytes.toBytes("C"); + long ts = 1; + String method = this.getName(); + HBaseConfiguration conf = new HBaseConfiguration(); + // disable compactions in this test. + conf.setInt("hbase.hstore.compactionThreshold", 10000); + this.region = initHRegion(tableName, method, conf, families); + try { + KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null); + KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null); + KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null); + KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null); + // storefile1 + Put put = new Put(row1); + put.add(kv1); + region.put(put); + region.flushcache(); + // storefile2 + put = new Put(row2); + put.add(kv2); + region.put(put); + region.flushcache(); + // storefile3 + put = new Put(row3); + put.add(kv3); + region.put(put); + region.flushcache(); + // memstore + put = new Put(row4); + put.add(kv4); + region.put(put); + // scan range = ["row4", min) + Scan scan = new Scan(row4); + scan.setReversed(true); + scan.setBatch(10); + InternalScanner scanner = region.getScanner(scan); + List currRow = new ArrayList(); + boolean hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row4)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row3)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row2)); + assertTrue(hasNext); + currRow.clear(); + hasNext = scanner.next(currRow); + assertEquals(1, currRow.size()); + assertTrue(Bytes.equals(currRow.get(0).getRow(), row1)); + assertFalse(hasNext); + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + private static HRegion initHRegion(byte[] tableName, String callingMethod, + byte[]... families) throws IOException { + return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), + families); + } + + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java (working copy) @@ -0,0 +1,698 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; +import org.apache.hadoop.hbase.regionserver.Store.ScanInfo; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +/** + * Test cases against ReversibleKeyValueScanner + */ +@Category(MediumTests.class) +public class TestReversibleScanners { + private static final Log LOG = LogFactory + .getLog(TestReversibleScanners.class); + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static byte[] FAMILYNAME = Bytes.toBytes("testCf"); + private static long TS = System.currentTimeMillis(); + private static int MAXMVCC = 7; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static final int ROWSIZE = 200; + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + private static byte[] QUAL = Bytes.toBytes("testQual"); + private static final int QUALSIZE = 5; + private static byte[][] QUALS = makeN(QUAL, QUALSIZE); + private static byte[] VALUE = Bytes.toBytes("testValue"); + private static final int VALUESIZE = 3; + private static byte[][] VALUES = makeN(VALUE, VALUESIZE); + + @After + public void tearDown() { + MultiVersionConsistencyControl.setThreadReadPoint(Long.MAX_VALUE); + } + + @Test + public void testReversibleStoreFileScanner() throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path hfilePath = new Path(new Path( + TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"), + "regionname"), "familyname"); + CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + StoreFile.Writer writer = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs, 2 * 1024).withOutputDir( + hfilePath).build(); + writeStoreFile(writer); + + StoreFile sf = new StoreFile(fs, writer.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + + List scanners = StoreFileScanner + .getScannersForStoreFiles(Collections.singletonList(sf), false, true, + false); + StoreFileScanner scanner = scanners.get(0); + seekTestOfReversibleKeyValueScanner(scanner); + } + + @Test + public void testReversibleMemstoreScanner() throws IOException { + MemStore memstore = new MemStore(); + writeMemstore(memstore); + List scanners = memstore.getScanners(); + seekTestOfReversibleKeyValueScanner(scanners.get(0)); + } + + @Test + public void testReversibleKeyValueHeap() throws IOException { + // write data to one memstore and two store files + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path hfilePath = new Path(new Path( + TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"), + "familyname"); + CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + StoreFile.Writer writer1 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs, 2 * 1024).withOutputDir( + hfilePath).build(); + StoreFile.Writer writer2 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs, 2 * 1024).withOutputDir( + hfilePath).build(); + + MemStore memstore = new MemStore(); + writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1, + writer2 }); + + StoreFile sf1 = new StoreFile(fs, writer1.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + + StoreFile sf2 = new StoreFile(fs, writer2.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + /** + * Test without MVCC + */ + int startRowNum = ROWSIZE / 2; + ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, + ROWS[startRowNum]); + internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); + + startRowNum = ROWSIZE - 1; + kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, + HConstants.EMPTY_START_ROW); + internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum); + + /** + * Test with MVCC + */ + for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { + LOG.info("Setting read point to " + readPoint); + MultiVersionConsistencyControl.setThreadReadPoint(readPoint); + startRowNum = ROWSIZE - 1; + kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2, + HConstants.EMPTY_START_ROW); + for (int i = startRowNum; i >= 0; i--) { + if (i - 2 < 0) + break; + i = i - 2; + kvHeap.seekToPreviousRow(KeyValue.createFirstOnRow(ROWS[i + 1])); + Pair nextReadableNum = getNextReadableNumWithBackwardScan( + i, 0, readPoint); + if (nextReadableNum == null) + break; + KeyValue expecedKey = makeKV(nextReadableNum.getFirst(), + nextReadableNum.getSecond()); + assertEquals(expecedKey, kvHeap.peek()); + i = nextReadableNum.getFirst(); + int qualNum = nextReadableNum.getSecond(); + if (qualNum + 1 < QUALSIZE) { + kvHeap.backwardSeek(makeKV(i, qualNum + 1)); + nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, + readPoint); + if (nextReadableNum == null) + break; + expecedKey = makeKV(nextReadableNum.getFirst(), + nextReadableNum.getSecond()); + assertEquals(expecedKey, kvHeap.peek()); + i = nextReadableNum.getFirst(); + qualNum = nextReadableNum.getSecond(); + } + + kvHeap.next(); + + if (qualNum + 1 >= QUALSIZE) { + nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0, + readPoint); + } else { + nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1, + readPoint); + } + if (nextReadableNum == null) + break; + expecedKey = makeKV(nextReadableNum.getFirst(), + nextReadableNum.getSecond()); + assertEquals(expecedKey, kvHeap.peek()); + i = nextReadableNum.getFirst(); + } + } + } + + @Test + public void testReversibleStoreScanner() throws IOException { + // write data to one memstore and two store files + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path hfilePath = new Path(new Path( + TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"), + "familyname"); + CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); + StoreFile.Writer writer1 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs, 2 * 1024).withOutputDir( + hfilePath).build(); + StoreFile.Writer writer2 = new StoreFile.WriterBuilder( + TEST_UTIL.getConfiguration(), cacheConf, fs, 2 * 1024).withOutputDir( + hfilePath).build(); + + MemStore memstore = new MemStore(); + writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1, + writer2 }); + + StoreFile sf1 = new StoreFile(fs, writer1.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + + StoreFile sf2 = new StoreFile(fs, writer2.getPath(), + TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, + NoOpDataBlockEncoder.INSTANCE); + + ScanType scanType = ScanType.USER_SCAN; + ScanInfo scanInfo = new ScanInfo(FAMILYNAME, 0, Integer.MAX_VALUE, + Long.MAX_VALUE, false, 0, KeyValue.COMPARATOR); + + // Case 1.Test a full reversed scan + Scan scan = new Scan(); + scan.setReversed(true); + StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, + scan, scanType, scanInfo); + verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false); + + // Case 2.Test reversed scan with a specified start row + int startRowNum = ROWSIZE / 2; + byte[] startRow = ROWS[startRowNum]; + scan.setStartRow(startRow); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, + scanType, scanInfo); + verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), + startRowNum + 1, false); + + // Case 3.Test reversed scan with a specified start row and specified + // qualifiers + assertTrue(QUALSIZE > 2); + scan.addColumn(FAMILYNAME, QUALS[0]); + scan.addColumn(FAMILYNAME, QUALS[2]); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, + scanType, scanInfo); + verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, + false); + + // Case 4.Test reversed scan with mvcc based on case 3 + for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { + LOG.info("Setting read point to " + readPoint); + MultiVersionConsistencyControl.setThreadReadPoint(readPoint); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, + scanType, scanInfo); + int expectedRowCount = 0; + int expectedKVCount = 0; + for (int i = startRowNum; i >= 0; i--) { + int kvCount = 0; + if (makeMVCC(i, 0) <= readPoint) { + kvCount++; + } + if (makeMVCC(i, 2) <= readPoint) { + kvCount++; + } + if (kvCount > 0) { + expectedRowCount++; + expectedKVCount += kvCount; + } + } + verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount, + false); + } + } + + @Test + public void testReversibleRegionScanner() throws IOException { + byte[] tableName = Bytes.toBytes("testtable"); + byte[] FAMILYNAME2 = Bytes.toBytes("testCf2"); + Configuration conf = HBaseConfiguration.create(); + HRegion region = TestHRegion.initHRegion(tableName, + "testReversibleRegionScanner", conf, FAMILYNAME, FAMILYNAME2); + loadDataToRegion(region, FAMILYNAME2); + + // verify row count with forward scan + Scan scan = new Scan(); + InternalScanner scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true); + + // Case1:Full reversed scan + scan.setReversed(true); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false); + + // Case2:Full reversed scan with one family + scan = new Scan(); + scan.setReversed(true); + scan.addFamily(FAMILYNAME); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false); + + // Case3:Specify qualifiers + One family + byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] }; + for (byte[] specifiedQualifier : specifiedQualifiers) + scan.addColumn(FAMILYNAME, specifiedQualifier); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false); + + // Case4:Specify qualifiers + Two families + for (byte[] specifiedQualifier : specifiedQualifiers) + scan.addColumn(FAMILYNAME2, specifiedQualifier); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false); + + // Case5: Case4 + specify start row + int startRowNum = ROWSIZE * 3 / 4; + scan.setStartRow(ROWS[startRowNum]); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1), + false); + + // Case6: Case4 + specify stop row + int stopRowNum = ROWSIZE / 4; + scan.setStartRow(HConstants.EMPTY_BYTE_ARRAY); + scan.setStopRow(ROWS[stopRowNum]); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE + - stopRowNum - 1), false); + + // Case7: Case4 + specify start row + specify stop row + scan.setStartRow(ROWS[startRowNum]); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2, + (startRowNum - stopRowNum), false); + + // Case8: Case7 + SingleColumnValueFilter + int valueNum = startRowNum % VALUESIZE; + Filter filter = new SingleColumnValueFilter(FAMILYNAME, + specifiedQualifiers[0], CompareOp.EQUAL, VALUES[valueNum]); + scan.setFilter(filter); + scanner = region.getScanner(scan); + int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE + + (stopRowNum / VALUESIZE == valueNum ? 0 : 1); + verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum, + false); + + // Case9: Case7 + PageFilter + int pageSize = 10; + filter = new PageFilter(pageSize); + scan.setFilter(filter); + scanner = region.getScanner(scan); + int expectedRowNum = pageSize; + verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); + + // Case10: Case7 + FilterList+MUST_PASS_ONE + SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter( + FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]); + SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter( + FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]); + expectedRowNum = 0; + for (int i = startRowNum; i > stopRowNum; i--) { + if (i % VALUESIZE == 0 || i % VALUESIZE == 1) { + expectedRowNum++; + } + } + filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2); + scan.setFilter(filter); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); + + // Case10: Case7 + FilterList+MUST_PASS_ALL + filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2); + expectedRowNum = 0; + scan.setFilter(filter); + scanner = region.getScanner(scan); + verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false); + } + + private StoreScanner getReversibleStoreScanner(MemStore memstore, + StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType, + ScanInfo scanInfo) throws IOException { + List scanners = getScanners(memstore, sf1, sf2, null, + false); + NavigableSet columns = null; + for (Map.Entry> entry : scan.getFamilyMap() + .entrySet()) { + // Should only one family + columns = entry.getValue(); + } + StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, + scanType, columns, scanners); + return storeScanner; + } + + private void verifyCountAndOrder(InternalScanner scanner, + int expectedKVCount, int expectedRowCount, boolean forward) + throws IOException { + List kvList = new ArrayList(); + Result lastResult = null; + int rowCount = 0; + int kvCount = 0; + try { + while (scanner.next(kvList)) { + if (kvList.isEmpty()) + continue; + rowCount++; + kvCount += kvList.size(); + if (lastResult != null) { + Result curResult = new Result(kvList); + assertEquals("LastResult:" + lastResult + "CurResult:" + curResult, + forward, + Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0); + } + lastResult = new Result(kvList); + kvList.clear(); + } + } finally { + scanner.close(); + } + if (!kvList.isEmpty()) { + rowCount++; + kvCount += kvList.size(); + kvList.clear(); + } + assertEquals(expectedKVCount, kvCount); + assertEquals(expectedRowCount, rowCount); + } + + private void internalTestSeekAndNextForReversibleKeyValueHeap( + ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException { + // Test next and seek + for (int i = startRowNum; i >= 0; i--) { + if (i % 2 == 1 && i - 2 >= 0) { + i = i - 2; + kvHeap.seekToPreviousRow(KeyValue.createFirstOnRow(ROWS[i + 1])); + } + for (int j = 0; j < QUALSIZE; j++) { + if (j % 2 == 1 && (j + 1) < QUALSIZE) { + j = j + 1; + kvHeap.backwardSeek(makeKV(i, j)); + } + assertEquals(makeKV(i, j), kvHeap.peek()); + kvHeap.next(); + } + } + assertEquals(null, kvHeap.peek()); + } + + private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore, + StoreFile sf1, StoreFile sf2, byte[] startRow) throws IOException { + List scanners = getScanners(memstore, sf1, sf2, startRow, + true); + ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners, + KeyValue.COMPARATOR); + return kvHeap; + } + + private List getScanners(MemStore memstore, StoreFile sf1, + StoreFile sf2, byte[] startRow, boolean doSeek) throws IOException { + List fileScanners = StoreFileScanner + .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, + false); + List memScanners = memstore.getScanners(); + List scanners = new ArrayList( + fileScanners.size() + 1); + scanners.addAll(fileScanners); + scanners.addAll(memScanners); + + if (doSeek) { + if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) { + for (KeyValueScanner scanner : scanners) { + scanner.seekToLastRow(); + } + } else { + KeyValue startKey = KeyValue.createFirstOnRow(startRow); + for (KeyValueScanner scanner : scanners) { + scanner.backwardSeek(startKey); + } + } + } + return scanners; + } + + private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner) + throws IOException { + /** + * Test without MVCC + */ + // Test seek to last row + assertTrue(scanner.seekToLastRow()); + assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek()); + + // Test backward seek in three cases + // Case1: seek in the same row in backwardSeek + KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2); + assertTrue(scanner.backwardSeek(seekKey)); + assertEquals(seekKey, scanner.peek()); + + // Case2: seek to the previous row in backwardSeek + int seekRowNum = ROWSIZE - 2; + assertTrue(scanner.backwardSeek(KeyValue.createLastOnRow(ROWS[seekRowNum]))); + KeyValue expectedKey = makeKV(seekRowNum - 1, 0); + assertEquals(expectedKey, scanner.peek()); + + // Case3: unable to backward seek + assertFalse(scanner.backwardSeek(KeyValue.createLastOnRow(ROWS[0]))); + assertEquals(null, scanner.peek()); + + // Test seek to previous row + seekRowNum = ROWSIZE - 4; + assertTrue(scanner.seekToPreviousRow(KeyValue + .createFirstOnRow(ROWS[seekRowNum]))); + expectedKey = makeKV(seekRowNum - 1, 0); + assertEquals(expectedKey, scanner.peek()); + + // Test seek to previous row for the first row + assertFalse(scanner.seekToPreviousRow(makeKV(0, 0))); + assertEquals(null, scanner.peek()); + + /** + * Test with MVCC + */ + for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { + LOG.info("Setting read point to " + readPoint); + MultiVersionConsistencyControl.setThreadReadPoint(readPoint); + // Test seek to last row + expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 1, 0, + readPoint); + assertEquals(expectedKey != null, scanner.seekToLastRow()); + assertEquals(expectedKey, scanner.peek()); + + // Test backward seek in two cases + // Case1: seek in the same row in backwardSeek + expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, + QUALSIZE - 2, readPoint); + assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey)); + assertEquals(expectedKey, scanner.peek()); + + // Case2: seek to the previous row in backwardSeek + seekRowNum = ROWSIZE - 3; + seekKey = KeyValue.createLastOnRow(ROWS[seekRowNum]); + expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, + readPoint); + assertEquals(expectedKey != null, scanner.backwardSeek(seekKey)); + assertEquals(expectedKey, scanner.peek()); + + // Test seek to previous row + seekRowNum = ROWSIZE - 4; + expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, + readPoint); + assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValue + .createFirstOnRow(ROWS[seekRowNum]))); + assertEquals(expectedKey, scanner.peek()); + } + } + + private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, + int startQualNum, int readPoint) { + Pair nextReadableNum = getNextReadableNumWithBackwardScan( + startRowNum, startQualNum, readPoint); + if (nextReadableNum == null) + return null; + return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond()); + } + + private Pair getNextReadableNumWithBackwardScan( + int startRowNum, int startQualNum, int readPoint) { + Pair nextReadableNum = null; + boolean findExpected = false; + for (int i = startRowNum; i >= 0; i--) { + for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) { + if (makeMVCC(i, j) <= readPoint) { + nextReadableNum = new Pair(i, j); + findExpected = true; + break; + } + } + if (findExpected) + break; + } + return nextReadableNum; + } + + private static void loadDataToRegion(HRegion region, byte[] additionalFamily) + throws IOException { + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + for (int j = 0; j < QUALSIZE; j++) { + put.add(makeKV(i, j)); + // put additional family + put.add(makeKV(i, j, additionalFamily)); + } + region.put(put); + if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) { + region.flushcache(); + } + } + } + + private static void writeMemstoreAndStoreFiles(MemStore memstore, + final StoreFile.Writer[] writers) throws IOException { + Random rand = new Random(); + try { + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + if (i % 2 == 0) { + memstore.add(makeKV(i, j)); + } else { + writers[(i + j) % writers.length].append(makeKV(i, j)); + } + } + } + } finally { + for (int i = 0; i < writers.length; i++) { + writers[i].close(); + } + } + } + + private static void writeStoreFile(final StoreFile.Writer writer) + throws IOException { + try { + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + writer.append(makeKV(i, j)); + } + } + } finally { + writer.close(); + } + } + + private static void writeMemstore(MemStore memstore) throws IOException { + // Add half of the keyvalues to memstore + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + if ((i + j) % 2 == 0) { + memstore.add(makeKV(i, j)); + } + } + } + memstore.snapshot(); + // Add another half of the keyvalues to snapshot + for (int i = 0; i < ROWSIZE; i++) { + for (int j = 0; j < QUALSIZE; j++) { + if ((i + j) % 2 == 1) { + memstore.add(makeKV(i, j)); + } + } + } + } + + private static KeyValue makeKV(int rowNum, int cqNum) { + return makeKV(rowNum, cqNum, FAMILYNAME); + } + + private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) { + KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, + VALUES[rowNum % VALUESIZE]); + kv.setMemstoreTS(makeMVCC(rowNum, cqNum)); + return kv; + } + + private static long makeMVCC(int rowNum, int cqNum) { + return (rowNum + cqNum) % (MAXMVCC + 1); + } + + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i))); + } + return ret; + } +}