Index: conf/hbase-default.xml
===================================================================
--- conf/hbase-default.xml (revision 652071)
+++ conf/hbase-default.xml (working copy)
@@ -78,6 +78,12 @@
+ hbase.regionserver.impl
+ org.apache.hadoop.hbase.regionserver.HRegionServer
+ An interface that is assignable to HRegionInterface. Used when starting a region server.
+
+
+
hbase.client.pause
10000
General client pause value. Used mostly as value to wait
Index: src/test/org/apache/hadoop/hbase/TestOrderedScan.java
===================================================================
--- src/test/org/apache/hadoop/hbase/TestOrderedScan.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/TestOrderedScan.java (revision 0)
@@ -0,0 +1,200 @@
+/*
+ * Created on Apr 24, 2008
+ *
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.HOrderedRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.OrderedRegionServer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TestOrderedScan extends HBaseClusterTestCase {
+ private static final Log LOG = LogFactory.getLog(TestOrderedScan.class);
+
+
+ private static final int NUM_VALS = 100;
+ private static final Text VALUE = new Text("value:");
+ private static final Text VALUE_LONG = new Text("value:long");
+ private static final Text OTHER = new Text("other:");
+ private static final Text OTHER_DATA = new Text("other:data");
+ private static final byte [] OTHER_DATA_CONTENTS = "SOME DATA IN ANOTHER COLUMN".getBytes();
+
+
+ private HTableDescriptor desc;
+ private HBaseAdmin admin;
+ private HTable table;
+
+ private static final class LongComparator implements ColumnValueComparator {
+
+ private static final int BYTE_BITS = 8;
+ private static final int BYTE_MASK = 0xFF;
+ private static final int LONG_BYTES = Long.SIZE / BYTE_BITS;
+
+ public Comparator getComparator() {
+ return new Comparator () {
+
+ public int compare(Long arg0, Long arg1) {
+ return arg0.compareTo(arg1);
+ }
+ };
+ }
+
+ public Long makeValue(byte[] bytes) {
+ long result = 0;
+ int shift = BYTE_BITS * (LONG_BYTES - 1);
+ for (int i = 0; i < LONG_BYTES; i++) {
+ result += (long) (bytes[i] & BYTE_MASK) << shift;
+ shift -= BYTE_BITS;
+ }
+ return result;
+
+ }
+
+ public void readFields(DataInput arg0) throws IOException {
+ // No state.
+ }
+
+ public void write(DataOutput arg0) throws IOException {
+ // No state.
+ }
+
+ public static byte[] serialize(final long in) {
+ final byte[] bytes = new byte[LONG_BYTES];
+ int shift = BYTE_BITS * (LONG_BYTES - 1);
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] = (byte) (in >>> shift & BYTE_MASK);
+ shift -= BYTE_BITS;
+ }
+ return bytes;
+ }
+
+ }
+
+
+
+ /** constructor */
+ public TestOrderedScan() {
+ // Make the thread wake frequency a little slower so other threads
+ // can run
+ //conf.setInt("hbase.server.thread.wakefrequency", 2000);
+
+ // Make lease timeout longer, lease checks less frequent
+ //conf.setInt("hbase.master.lease.period", 1000 * 1000);
+ //conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+
+ // Increase the amount of time between client retries
+ //conf.setLong("hbase.client.pause", 15 * 1000);
+
+ conf.set(HConstants.REGION_SERVER_CLASS, HOrderedRegionInterface.class.getName());
+ conf.set(HConstants.REGION_SERVER_IMPL, OrderedRegionServer.class.getName());
+ //conf.setInt("hbase.regionserver.lease.period", 1000 * 1000);
+ //conf.setInt("ipc.client.timeout", 1000 * 1000);
+ }
+
+ /**
+ * Since all the "tests" depend on the results of the previous test, they are
+ * not Junit tests that can stand alone. Consequently we have a single Junit
+ * test that runs the "sub-tests" as private methods.
+ * @throws IOException
+ */
+ public void test() throws IOException {
+ setupTable();
+ writeRows();
+ //scanRowsUnsorted();
+ scanRowsSorted();
+ }
+
+ private void setupTable() throws IOException {
+ desc = new HTableDescriptor("test");
+ HColumnDescriptor colDesc = new HColumnDescriptor(VALUE.toString());
+ colDesc.addSortedColumn(VALUE_LONG, new LongComparator());
+ desc.addFamily(colDesc);
+ desc.addFamily(new HColumnDescriptor(OTHER.toString()));
+ admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ table = new HTable(conf, desc.getName());
+ }
+
+ private void writeRows() throws IOException {
+ long startTime = System.currentTimeMillis();
+
+ // Write out a bunch of values
+ Random r = new Random();
+ for (int k = 1; k <= NUM_VALS; k++) {
+ BatchUpdate b = new BatchUpdate(new Text("row_" + k));
+
+ b.put(VALUE_LONG, LongComparator.serialize(r.nextLong()));
+ b.put(OTHER_DATA, OTHER_DATA_CONTENTS);
+
+ table.commit(b);
+ }
+ LOG.info("Wrote " + NUM_VALS + " rows. Elapsed time: "
+ + ((System.currentTimeMillis() - startTime) / 1000.0));
+ }
+
+
+
+ private long getLong(byte [] bytes) throws IOException {
+ LongWritable longWritable = new LongWritable();
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream in = new DataInputStream(bais);
+ longWritable.readFields(in);
+ in.close();
+ return longWritable.get();
+ }
+
+ private void scanRowsSorted() throws IOException {
+ long startTime = System.currentTimeMillis();
+
+ Scanner scanner = table.getOrderedScanner(VALUE_LONG, true, new Text[] {VALUE_LONG, OTHER_DATA}, HConstants.LATEST_TIMESTAMP, null);
+
+ int numFound = 0;
+ Long lastValue = null;
+ while(true) {
+ RowResult result = scanner.next();
+ if (result == null) {
+ break;
+ }
+ numFound++;
+ LOG.debug("row: " + result.getRow());
+ byte [] value = result.get(VALUE_LONG).getValue();
+ long thisValue = getLong(value);
+ LOG.debug("value: " + thisValue);
+ byte [] otherValue = result.get(OTHER_DATA).getValue();
+ Assert.assertTrue(Arrays.equals(OTHER_DATA_CONTENTS, otherValue));
+
+ if (lastValue != null) {
+ Assert.assertTrue(lastValue <= thisValue);
+ }
+ lastValue = thisValue;
+ }
+
+ Assert.assertEquals(NUM_VALS, numFound);
+
+ LOG.info("Read sorted " + numFound + " rows. Elapsed time: "
+ + ((System.currentTimeMillis() - startTime) / 1000.0));
+ }
+}
Index: src/java/org/apache/hadoop/hbase/HColumnDescriptor.java
===================================================================
--- src/java/org/apache/hadoop/hbase/HColumnDescriptor.java (revision 652071)
+++ src/java/org/apache/hadoop/hbase/HColumnDescriptor.java (working copy)
@@ -22,12 +22,17 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.TextSequence;
/**
@@ -110,6 +115,8 @@
private BloomFilterDescriptor bloomFilter;
// Version number of this class
private byte versionNumber;
+ // Columns which we maintain sorting. Entryes are ColumnName -> WritableComparable bytes for the value
+ private Map> sortedColumns = new HashMap>();
// Family name without the ':'
private transient Text familyName = null;
@@ -298,6 +305,16 @@
bloomFilter.readFields(in);
}
+ Configuration conf = new HBaseConfiguration();
+
+ int numEntries = in.readInt();
+ for (int i=0; i< numEntries; i++) {
+ Text key = new Text();
+ key.readFields(in);
+ ColumnValueComparator> comparator = (ColumnValueComparator>) ObjectWritable.readObject(in, conf);
+ sortedColumns.put(key, comparator);
+ }
+
if (this.versionNumber > 1) {
this.blockCacheEnabled = in.readBoolean();
}
@@ -317,6 +334,14 @@
bloomFilter.write(out);
}
+ Configuration conf = new HBaseConfiguration();
+
+ out.writeInt(sortedColumns.size());
+ for (Entry> entry : sortedColumns.entrySet()) {
+ entry.getKey().write(out);
+ ObjectWritable.writeObject(out, entry.getValue(), ColumnValueComparator.class, conf);
+ }
+
out.writeBoolean(this.blockCacheEnabled);
}
@@ -387,4 +412,21 @@
return result;
}
+
+ /**
+ * Get the sortedColumns.
+ * @return Return a map of the sortedColumns, to the comparator instance.
+ */
+ public Map> getSortedColumns() {
+ return sortedColumns;
+ }
+
+ /** Add a sorted column.
+ *
+ * @param colunmName Qualified with family prefix. Must be same as this family name.
+ * @param comparator used to instantiate column values and compare
+ */
+ public void addSortedColumn(Text colunmName, ColumnValueComparator> comparator) {
+ sortedColumns.put(colunmName, comparator);
+ }
}
Index: src/java/org/apache/hadoop/hbase/ColumnValueComparator.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ColumnValueComparator.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/ColumnValueComparator.java (revision 0)
@@ -0,0 +1,33 @@
+/*
+ * $Id$
+ * Created on Apr 20, 2008
+ *
+ */
+package org.apache.hadoop.hbase;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Allows one to deserialize a byte array to a value of type , as well as get
+ * a comparator for that type.
+ *
+ * @param value type.
+ */
+public interface ColumnValueComparator extends Writable {
+
+ /** Make a value.
+ *
+ * @param bytes
+ * @return value
+ */
+ T makeValue(byte[] bytes);
+
+ /** Get the comparator.
+ *
+ * @return comparator
+ */
+ Comparator getComparator();
+
+}
Index: src/java/org/apache/hadoop/hbase/HConstants.java
===================================================================
--- src/java/org/apache/hadoop/hbase/HConstants.java (revision 652071)
+++ src/java/org/apache/hadoop/hbase/HConstants.java (working copy)
@@ -78,6 +78,9 @@
/** Parameter name for what region server interface to use. */
static final String REGION_SERVER_CLASS = "hbase.regionserver.class";
+ /** Parameter name for what region server implementation to use. */
+ static final String REGION_SERVER_IMPL= "hbase.regionserver.impl";
+
/** Default region server interface class name. */
static final String DEFAULT_REGION_SERVER_CLASS = HRegionInterface.class.getName();
Index: src/java/org/apache/hadoop/hbase/regionserver/ColumnSortedHStore.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/ColumnSortedHStore.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/regionserver/ColumnSortedHStore.java (revision 0)
@@ -0,0 +1,110 @@
+/*
+ * $Id$
+ * Created on Apr 23, 2008
+ *
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ColumnValueComparator;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * HStore which also maintains a sort on column values.
+ *
+ */
+class ColumnSortedHStore extends HStore {
+
+ Map> columnNameToSortedStore = Collections
+ .synchronizedMap(new HashMap>());
+
+ @SuppressWarnings("unchecked")
+ ColumnSortedHStore(final Path basedir, final HRegionInfo info,
+ final HColumnDescriptor family, final FileSystem fs, final Path reconstructionLog,
+ final HBaseConfiguration conf, final Progressable reporter) throws IOException {
+ super(basedir, info, family, fs, reconstructionLog, conf, reporter);
+
+ for (Entry> entry : family.getSortedColumns()
+ .entrySet()) {
+
+ columnNameToSortedStore.put(entry.getKey(), new SortedColumn(entry.getValue()));
+ }
+
+ buildSortedStores();
+ }
+
+ private void buildSortedStores() throws IOException {
+ LOG.debug("Starting sorted set construction");
+ int numKeys = 0;
+
+ long startTime = System.currentTimeMillis();
+ SortedMap results = new TreeMap();
+
+ for (Entry> entry : columnNameToSortedStore.entrySet()) {
+ Text colName = entry.getKey();
+ SortedColumn> sortedColumnStore = entry.getValue();
+
+ InternalScanner scanner = super.getScanner(HConstants.LATEST_TIMESTAMP,
+ new Text[] { colName }, HConstants.EMPTY_START_ROW, null);
+ HStoreKey key = new HStoreKey();
+ while (scanner.next(key, results)) {
+ byte [] value = results.get(colName);
+ sortedColumnStore.add(new HStoreKey(key), value);
+ results.clear(); // Not done by scanner
+ numKeys++;
+ }
+ }
+ LOG.debug("Sorted set constructed. Took: " + (System.currentTimeMillis() - startTime)
+ + " ms for " + numKeys + " keys");
+ }
+
+ /**
+ * Get an iterator ordered by the given columnName.
+ *
+ * @param colName
+ * @param ascending
+ * if true then low to high
+ * @return iterator
+ */
+ public Iterator getOrderedIterator(final Text colName,
+ final boolean ascending) {
+ SortedColumn> scs = columnNameToSortedStore.get(colName);
+ if (scs == null) {
+ return null;
+ }
+ return scs.getKeyIterator(ascending);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ void add(final HStoreKey key, final byte[] value) {
+ super.add(key, value);
+ SortedColumn> scs = columnNameToSortedStore.get(key.getColumn());
+ if (scs != null) {
+ if (HLogEdit.isDeleted(value)) {
+ scs.remove(key);
+ } else {
+ scs.add(key, value);
+ }
+ }
+ }
+
+}
Index: src/java/org/apache/hadoop/hbase/regionserver/SortedColumn.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/SortedColumn.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/regionserver/SortedColumn.java (revision 0)
@@ -0,0 +1,115 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ColumnValueComparator;
+import org.apache.hadoop.hbase.HStoreKey;
+
+/** Maintains a sorted tree for a single column.
+ *
+* @param value type of column
+ */
+class SortedColumn {
+
+ private class SortedColumnValue implements Comparable {
+
+ private HStoreKey key;
+ private T value;
+
+ public SortedColumnValue(HStoreKey key, T value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public int compareTo(SortedColumnValue o) {
+ int result = columnValueComparator.getComparator().compare(this.value, o.value);
+ if (result != 0) {
+ return result;
+ }
+ return key.compareTo(o.key);
+ }
+
+ /**
+ * Get the key.
+ *
+ * @return Return the key.
+ */
+ public HStoreKey getKey() {
+ return key;
+ }
+
+ /**
+ * Get the value.
+ *
+ * @return Return the value.
+ */
+ public T getValue() {
+ return value;
+ }
+ }
+
+ private ColumnValueComparator columnValueComparator;
+ private NavigableSet sortedSet = new TreeSet();
+ private Map keyToValueMap = Collections
+ .synchronizedMap(new HashMap());
+
+ SortedColumn(ColumnValueComparator comparator) {
+ this.columnValueComparator = comparator;
+ }
+
+ public void add(HStoreKey key, byte[] bytes) {
+ T value = columnValueComparator.makeValue(bytes);
+ SortedColumnValue colValue = new SortedColumnValue(key, value);
+
+ SortedColumnValue oldColValue = keyToValueMap.get(key);
+
+ if (oldColValue != null) {
+ keyToValueMap.remove(key);
+ sortedSet.remove(oldColValue);
+ }
+
+ keyToValueMap.put(key, colValue);
+ sortedSet.add(colValue);
+ }
+
+ public void remove(HStoreKey key) {
+ SortedColumnValue value = keyToValueMap.get(key);
+ if (value == null) {
+ throw new RuntimeException("No value for key " + key.toString());
+ }
+ sortedSet.remove(value);
+ keyToValueMap.remove(key);
+ }
+
+ public Iterator getKeyIterator(boolean ascending) {
+ final Iterator iterator;
+ if (ascending) {
+ iterator = sortedSet.iterator();
+ } else {
+ iterator = sortedSet.descendingIterator();
+ }
+
+ return new Iterator() {
+
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ public HStoreKey next() {
+ return iterator.next().getKey();
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException("#remove not allowed!");
+ }
+
+ };
+ }
+}
Index: src/java/org/apache/hadoop/hbase/regionserver/OrderedRegionServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/OrderedRegionServer.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/regionserver/OrderedRegionServer.java (revision 0)
@@ -0,0 +1,231 @@
+/*
+ * $Id$
+ * Created on Apr 24, 2008
+ *
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.LeaseListener;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.HOrderedRegionInterface;
+import org.apache.hadoop.io.Text;
+
+/** RegionServer which provides ordered scanners.
+ *
+ */
+public class OrderedRegionServer extends HRegionServer implements HOrderedRegionInterface {
+
+
+ private Map orderedScanners =
+ Collections.synchronizedMap(new HashMap());
+
+ private final Random rand = new Random();
+
+ /**
+ * Instantiated as a scanner lease.
+ * If the lease times out, the scanner is closed
+ */
+ private class ScannerListener implements LeaseListener {
+ private final String scannerName;
+
+ ScannerListener(final String n) {
+ this.scannerName = n;
+ }
+
+ /** {@inheritDoc} */
+ public void leaseExpired() {
+ LOG.info("Ordered Scanner " + this.scannerName + " lease expired");
+ synchronized(orderedScanners) {
+ orderedScanners.remove(this.scannerName);
+ }
+ }
+ }
+
+ public OrderedRegionServer(HBaseConfiguration conf) throws IOException {
+ this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
+ DEFAULT_REGIONSERVER_ADDRESS)), conf);
+ }
+
+
+
+ public OrderedRegionServer(HServerAddress serverAddress,
+ HBaseConfiguration conf) throws IOException {
+ super(serverAddress, conf);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getProtocolVersion(final String protocol,
+ @SuppressWarnings("unused") final long clientVersion)
+ throws IOException {
+ if (protocol.equals(HOrderedRegionInterface.class.getName())) {
+ return HOrderedRegionInterface.versionID;
+ }
+ throw new IOException("Unknown protocol: " + protocol);
+ }
+
+ /** {@inheritDoc} */
+ public long openOrderedScanner(Text regionName, Text orderByColumn, boolean ascending,
+ Text[] columns, long timestamp, RowFilterInterface filter)
+ throws IOException {
+
+ super.checkOpen();
+ NullPointerException npe = null;
+ if (regionName == null) {
+ npe = new NullPointerException("regionName is null");
+ } else if (columns == null) {
+ npe = new NullPointerException("columns to scan is null");
+ }
+ if (npe != null) {
+ IOException io = new IOException("Invalid arguments to openScanner");
+ io.initCause(npe);
+ throw io;
+ }
+ requestCount.incrementAndGet();
+ try {
+ HRegion r = getRegion(regionName);
+ long scannerId = -1L;
+ Iterator it = r.getSortedIterator(orderByColumn, ascending);
+ OrderedScanner scanner = new OrderedScanner(regionName, it, columns, filter);
+ scannerId = rand.nextLong();
+ String scannerName = String.valueOf(scannerId);
+ synchronized(orderedScanners) {
+ orderedScanners.put(scannerName, scanner);
+ }
+ this.leases.
+ createLease(scannerName, new ScannerListener(scannerName));
+ return scannerId;
+ } catch (IOException e) {
+ LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
+ RemoteExceptionHandler.checkIOException(e));
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public RowResult nextOrdered(final long scannerId) throws IOException {
+ checkOpen();
+ requestCount.incrementAndGet();
+ try {
+ String scannerName = String.valueOf(scannerId);
+ OrderedScanner scanner = orderedScanners.get(scannerName);
+ if (scanner == null) {
+ throw new UnknownScannerException("Name: " + scannerName);
+ }
+ this.leases.renewLease(scannerName);
+ return scanner.next();
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void closeOrdered(long scannerId) throws IOException {
+ checkOpen();
+ requestCount.incrementAndGet();
+ try {
+ String scannerName = String.valueOf(scannerId);
+ OrderedScanner s = null;
+ synchronized(orderedScanners) {
+ s = orderedScanners.remove(scannerName);
+ }
+ if(s == null) {
+ throw new UnknownScannerException(scannerName);
+ }
+ this.leases.cancelLease(scannerName);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ private class OrderedScanner {
+ private Text regionName;
+ private Iterator iterator;
+ private Text [] columns;
+ private RowFilterInterface filter;
+ private SortedMap sortedMap;
+ private List columnsToDrop = null;
+
+
+ public OrderedScanner(Text regionName, Iterator iterator, Text [] columns, RowFilterInterface filter) {
+ this.regionName = regionName;
+ this.iterator = iterator;
+ this.columns = columns;
+ this.filter = filter;
+ if (filter != null) {
+ sortedMap = new TreeMap();
+ columnsToDrop = new LinkedList();
+ }
+ }
+
+ public RowResult next() throws IOException {
+
+ RowResult result = null;
+
+ boolean filtered;
+ do {
+ if (! iterator.hasNext()) {
+ LOG.trace("end of iterator");
+ result = null;
+ break;
+ }
+ HStoreKey nextKey = iterator.next();
+ filtered = false;
+ Text rowKey = nextKey.getRow();
+ filtered = filter == null ? false : filter.filterRowKey(rowKey);
+
+ if (filtered) {
+ LOG.trace("filtered on row key");
+ }
+
+ if (!filtered) {
+ result = OrderedRegionServer.super.getRow(regionName, rowKey, columns, nextKey.getTimestamp());
+ if (filter != null) {
+ sortedMap.clear();
+ columnsToDrop.clear();
+ for (Entry entry : result.entrySet()) {
+ if (!filter.filterColumn(rowKey, entry.getKey(), entry.getValue().getValue())) {
+ sortedMap.put(entry.getKey(), entry.getValue().getValue());
+ } else {
+ columnsToDrop.add(entry.getKey());
+ LOG.trace("filtered column "+ entry.getKey().toString());
+ }
+ }
+ if (!filtered) {
+ for (Text colName : columnsToDrop) {
+ result.remove(colName);
+ }
+ }
+ }
+
+ }
+ }while (filtered);
+
+ return result;
+ }
+
+ }
+
+}
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 652071)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -137,10 +137,10 @@
final Server server;
// Leases
- private final Leases leases;
+ protected final Leases leases;
// Request counter
- private volatile AtomicInteger requestCount = new AtomicInteger();
+ protected volatile AtomicInteger requestCount = new AtomicInteger();
// Info server. Default access so can be used by unit tests. REGIONSERVER
// is name of the webapp and the attribute name used stuffing this instance
@@ -1349,7 +1349,7 @@
*
* @throws IOException
*/
- private void checkOpen() throws IOException {
+ protected void checkOpen() throws IOException {
if (this.stopRequested.get() || this.abortRequested) {
throw new IOException("Server not running");
}
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 652071)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -446,8 +446,14 @@
long maxSeqId = -1;
for(HColumnDescriptor c :
this.regionInfo.getTableDesc().families().values()) {
- HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs,
- oldLogFile, this.conf, reporter);
+ HStore store;
+ if (c.getSortedColumns() != null && c.getSortedColumns().size() > 0) {
+ store = new ColumnSortedHStore(this.basedir, this.regionInfo, c, this.fs,
+ oldLogFile, this.conf, reporter);
+ } else {
+ store = new HStore(this.basedir, this.regionInfo, c, this.fs, oldLogFile,
+ this.conf, reporter);
+ }
stores.put(c.getFamilyName(), store);
long storeSeqId = store.getMaxSequenceId();
if (storeSeqId > maxSeqId) {
@@ -2091,4 +2097,15 @@
fs.mkdirs(HStoreFile.getFilterDir(basedir, encodedRegionName, colFamily));
}
}
+
+ public Iterator getSortedIterator(Text orderByColumn, boolean ascending)
+ throws IOException {
+ Text family = new Text(orderByColumn.toString().split(":")[0]);
+ HStore store = stores.get(family);
+ if (store == null || !(store instanceof ColumnSortedHStore)) {
+ throw new IOException("No sorted store for family: " + family);
+ }
+ ColumnSortedHStore sortedStore = (ColumnSortedHStore) store;
+ return sortedStore.getOrderedIterator(orderByColumn, ascending);
+ }
}
Index: src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
===================================================================
--- src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 652071)
+++ src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy)
@@ -64,6 +66,7 @@
/** 'local:' */
public static final String LOCAL_COLON = LOCAL + ":";
private final HBaseConfiguration conf;
+ private final Class extends HRegionServer> regionServerClass;
/**
* Constructor.
@@ -97,6 +100,7 @@
// start/stop ports at different times during the life of the test.
conf.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0");
this.regionThreads = new ArrayList();
+ regionServerClass = (Class extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
for (int i = 0; i < noRegionServers; i++) {
addRegionServer();
}
@@ -111,7 +115,13 @@
*/
public RegionServerThread addRegionServer() throws IOException {
synchronized (regionThreads) {
- RegionServerThread t = new RegionServerThread(new HRegionServer(conf),
+ HRegionServer server;
+ try {
+ server = regionServerClass.getConstructor(HBaseConfiguration.class).newInstance(conf);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ RegionServerThread t = new RegionServerThread(server,
this.regionThreads.size());
this.regionThreads.add(t);
return t;
Index: src/java/org/apache/hadoop/hbase/ipc/HOrderedRegionInterface.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/HOrderedRegionInterface.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/ipc/HOrderedRegionInterface.java (revision 0)
@@ -0,0 +1,52 @@
+/*
+ * $Id$
+ * Created on Apr 24, 2008
+ *
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.io.Text;
+
+/** Version of HRegionInterface supplemented with ordered scanners.
+ *
+ */
+public interface HOrderedRegionInterface extends HRegionInterface {
+ /** Interface version number */
+ public static final long versionID = 1L;
+
+
+ /** Open an ordered-by scanner.
+ *
+ * @param regionName name of region to scan
+ * @param orderByColumn column to orderBy (must be pre-declared in HColumnDescriptor).
+ * @param ascending if true, then low to high
+ * @param columns columns to scan.
+ * @param timestamp only return values whose timestamp is <= this value
+ * @param filter RowFilter for filtering results at the row-level.
+ * @return
+ * @throws IOException
+ */
+ public long openOrderedScanner(Text regionName, Text orderByColumn, boolean ascending, Text[] columns,
+ long timestamp, RowFilterInterface filter) throws IOException;
+
+ /**
+ * Get the next row of an ordered-by scan.
+ *
+ * @param scannerId returned from openScanner
+ * @return rowResult
+ * @throws IOException
+ */
+ public RowResult nextOrdered(long scannerId) throws IOException;
+
+ /**
+ * Close an ordred scanner
+ *
+ * @param scannerId returned from openScanner
+ * @throws IOException
+ */
+ public void closeOrdered(long scannerId) throws IOException;
+}
Index: src/java/org/apache/hadoop/hbase/client/HTable.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/HTable.java (revision 652071)
+++ src/java/org/apache/hadoop/hbase/client/HTable.java (working copy)
@@ -499,6 +499,28 @@
throws IOException {
return new ClientScanner(columns, startRow, timestamp, filter);
}
+
+ /**
+ * Get a scanner on the current table. The rows are to be iterated ordered-by the
+ * given column's value. The orderBy column must habe been declared when the table
+ * was created.
+ *
+ *
+ * @param orderByColumn to column to orderBy. (Must be declared in table metadata)
+ * @param ascending if true then order low to high.
+ * @param columns columns to scan.
+ * @param timestamp only return results whose timestamp <= this value
+ * @param filter a row filter using row-key regexp and/or column data filter.
+ * @return scanner
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public Scanner getOrderedScanner(Text orderByColumn, boolean ascending, Text[] columns,
+ long timestamp, RowFilterInterface filter)
+ throws IOException {
+ // TODO, verify orderBy column
+ return new OrderedScanner(this,orderByColumn, ascending, columns, timestamp, filter);
+ }
/**
* Delete all cells that match the passed row and column.
Index: src/java/org/apache/hadoop/hbase/client/OrderedScanner.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/OrderedScanner.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/client/OrderedScanner.java (revision 0)
@@ -0,0 +1,259 @@
+/*
+ * $Id$
+ * Created on Apr 24, 2008
+ *
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ColumnValueComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.HOrderedRegionInterface;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Scanner where results are returned ordered-by a given column.
+ *
+ * @param column value type
+ *
+ */
+class OrderedScanner implements Scanner {
+
+ private static final Log LOG = LogFactory.getLog(OrderedScanner.class);
+
+
+ private class RegionScanner implements
+ Comparable {
+ private long scannerId;
+ private HOrderedRegionInterface server;
+ private RowResult rowResult;
+ private T columnValue;
+
+ public RegionScanner(long scannerId, HOrderedRegionInterface server) {
+ this.scannerId = scannerId;
+ this.server = server;
+ }
+
+ public int compareTo(RegionScanner other) {
+ int colResult;
+ if (ascending) {
+ colResult = comparator.getComparator().compare(this.columnValue, other.columnValue);
+ } else {
+ colResult = comparator.getComparator().compare(other.columnValue, this.columnValue);
+ }
+ if (colResult != 0) {
+ return colResult;
+ }
+ if (ascending) {
+ return this.rowResult.getRow().compareTo(other.rowResult.getRow());
+ }
+ return other.rowResult.getRow().compareTo(this.rowResult.getRow());
+ }
+
+ public boolean next() throws IOException {
+ rowResult = server.nextOrdered(scannerId);
+ if (rowResult == null) {
+ return false;
+ }
+ Cell colCell = rowResult.get(sortColumn);
+ if (colCell == null) {
+ return false; // FIXME
+ }
+ byte[] colBytes = colCell.getValue();
+ if (colBytes == null) {
+ return false; // FIXME
+ }
+ columnValue = comparator.makeValue(colBytes);
+ return true;
+ }
+
+ public void close() throws IOException {
+ server.closeOrdered(scannerId);
+ }
+
+ /**
+ * Get the scannerId.
+ *
+ * @return Return the scannerId.
+ */
+ public long getScannerId() {
+ return scannerId;
+ }
+
+ /**
+ * Get the rowResult.
+ *
+ * @return Return the rowResult.
+ */
+ public RowResult getRowResult() {
+ return rowResult;
+ }
+ }
+
+ private HTable table;
+ private Text sortColumn;
+ private boolean ascending;
+ private ColumnValueComparator comparator;
+ private RowFilterInterface filter;
+ private SortedSet topRows = new TreeSet();
+ private SortedMap columnMap; // To send to filters
+
+ @SuppressWarnings("unchecked")
+ public OrderedScanner(HTable table, Text sortColumn, boolean ascending, Text[] columns,
+ long timeStamp, RowFilterInterface filter) throws IOException {
+ this.table = table;
+ this.sortColumn = sortColumn;
+ this.ascending = ascending;
+ this.filter = filter;
+ comparator = (ColumnValueComparator) table.getMetadata().getFamilies().get(
+ HStoreKey.extractFamily(sortColumn, true)).getSortedColumns().get(sortColumn);
+ openScanners(columns, timeStamp);
+ if (filter != null) {
+ this.columnMap = new TreeMap();
+ }
+ }
+
+ // TODO, don't open connections to all regions if we have a row prefix.
+ private List getRegions() throws IOException {
+ List regionLocations = new LinkedList();
+
+ Text nextRegionStartKey = HConstants.EMPTY_START_ROW;
+ HRegionLocation regionLocation;
+ do {
+ regionLocation = table.getRegionLocation(nextRegionStartKey);
+ regionLocations.add(regionLocation);
+ nextRegionStartKey = regionLocation.getRegionInfo().getEndKey();
+ } while (!regionLocation.getRegionInfo().getEndKey().equals(HConstants.LAST_ROW));
+ return regionLocations;
+ }
+
+ private void openScanners(Text[] columns, long timeStamp) throws IOException {
+ for (HRegionLocation regionLoc : getRegions()) {
+ HOrderedRegionInterface server = (HOrderedRegionInterface) table.connection
+ .getHRegionConnection(regionLoc.getServerAddress());
+
+ long scannerId = server.openOrderedScanner(
+ regionLoc.getRegionInfo().getRegionName(), sortColumn, ascending, columns,
+ timeStamp, filter);
+ LOG.trace("Making scanner for region: "+regionLoc.getRegionInfo().toString());
+ RegionScanner s = new RegionScanner(scannerId, server);
+ if (s.next()) {
+ LOG.trace("added to sorted set.");
+ topRows.add(s);
+ } else {
+ LOG.trace("No rows, closing");
+ s.close();
+ }
+ }
+ }
+
+ public void close() {
+ for (RegionScanner regionScanner : topRows) {
+ try {
+ regionScanner.close();
+ } catch (IOException e) {
+ // Dont worry
+ }
+ }
+ }
+
+ public RowResult next() throws IOException {
+
+ boolean filtered;
+ RowResult result;
+ do {
+ filtered = false;
+
+ if (topRows.size() == 0) {
+ return null;
+ }
+
+ RegionScanner top = topRows.first();
+ LOG.trace("Got top row ["+top.rowResult.getRow().toString() + "] with value ["+top.columnValue.toString()+"]");
+ topRows.remove(top);
+ result = top.getRowResult();
+ if (top.next()) {
+ LOG.trace("puting back into sorted set");
+ topRows.add(top);
+ } else {
+ LOG.trace("nothing left, closing");
+ top.close();
+ }
+
+ if (filter != null) {
+ columnMap.clear();
+ for (Entry entry : result.entrySet()) {
+ columnMap.put(entry.getKey(), entry.getValue().getValue());
+ }
+ filtered = filter.filterRow(columnMap);
+ if (filtered) {
+ LOG.trace("Filtered on assembled row");
+ }
+ }
+
+ } while(filtered);
+ return result;
+ }
+
+ // This implementation copy-pasted from HTable#ClientScanner
+ public Iterator iterator() {
+ return new Iterator() {
+ // The next RowResult, possibly pre-read
+ RowResult next = null;
+
+ // return true if there is another item pending, false if there isn't.
+ // this method is where the actual advancing takes place, but you need
+ // to call next() to consume it. hasNext() will only advance if there
+ // isn't a pending next().
+ public boolean hasNext() {
+ if (next == null) {
+ try {
+ next = OrderedScanner.this.next();
+ return next != null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+
+ // get the pending next item and advance the iterator. returns null if
+ // there is no next item.
+ public RowResult next() {
+ // since hasNext() does the real advancing, we call this to determine
+ // if there is a next before proceeding.
+ if (!hasNext()) {
+ return null;
+ }
+
+ // if we get to here, then hasNext() has given us an item to return.
+ // we want to return the item and then null out the next pointer, so
+ // we use a temporary variable.
+ RowResult temp = next;
+ next = null;
+ return temp;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+}