Index: src/test/org/apache/hadoop/hbase/TestOrderedScan.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestOrderedScan.java (revision 0) +++ src/test/org/apache/hadoop/hbase/TestOrderedScan.java (revision 0) @@ -0,0 +1,196 @@ +/* + * Created on Apr 24, 2008 + * + */ +package org.apache.hadoop.hbase; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.OrderedTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.HOrderedRegionInterface; +import org.apache.hadoop.hbase.regionserver.OrderedRegionServer; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + +public class TestOrderedScan extends HBaseClusterTestCase { + private static final Log LOG = LogFactory.getLog(TestOrderedScan.class); + + private static final int NUM_VALS = 100; + private static final Text VALUE = new Text("value:"); + private static final Text VALUE_LONG = new Text("value:long"); + private static final Text OTHER = new Text("other:"); + private static final Text OTHER_DATA = new Text("other:data"); + private static final byte[] OTHER_DATA_CONTENTS = "SOME DATA IN ANOTHER COLUMN" + .getBytes(); + + private HTableDescriptor desc; + private HBaseAdmin admin; + private OrderedTable table; + + private static final class LongComparator implements ColumnValueComparator { + + private static final int BYTE_BITS = 8; + private static final int BYTE_MASK = 0xFF; + private static final int LONG_BYTES = Long.SIZE / BYTE_BITS; + + public Comparator getComparator() { + return new Comparator() { + + public int compare(final Long arg0, final Long arg1) { + return arg0.compareTo(arg1); + } + }; + } + + public Long makeValue(final byte[] bytes) { + long result = 0; + int shift = BYTE_BITS * (LONG_BYTES - 1); + for (int i = 0; i < LONG_BYTES; i++) { + result += (long) (bytes[i] & BYTE_MASK) << shift; + shift -= BYTE_BITS; + } + return result; + + } + + public void readFields(final DataInput arg0) throws IOException { + // No state. + } + + public void write(final DataOutput arg0) throws IOException { + // No state. + } + + public static byte[] serialize(final long in) { + final byte[] bytes = new byte[LONG_BYTES]; + int shift = BYTE_BITS * (LONG_BYTES - 1); + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) (in >>> shift & BYTE_MASK); + shift -= BYTE_BITS; + } + return bytes; + } + + } + + /** constructor */ + public TestOrderedScan() { + // Make the thread wake frequency a little slower so other threads + // can run + // conf.setInt("hbase.server.thread.wakefrequency", 2000); + + // Make lease timeout longer, lease checks less frequent + // conf.setInt("hbase.master.lease.period", 1000 * 1000); + // conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); + + // Increase the amount of time between client retries + // conf.setLong("hbase.client.pause", 15 * 1000); + + conf.set(HConstants.REGION_SERVER_CLASS, HOrderedRegionInterface.class.getName()); + conf.set(HConstants.REGION_SERVER_IMPL, OrderedRegionServer.class.getName()); + // conf.setInt("hbase.regionserver.lease.period", 1000 * 1000); + // conf.setInt("ipc.client.timeout", 1000 * 1000); + } + + /** + * Since all the "tests" depend on the results of the previous test, they are + * not Junit tests that can stand alone. Consequently we have a single Junit + * test that runs the "sub-tests" as private methods. + * + * @throws IOException + */ + public void test() throws IOException { + setupTable(); + writeRows(); + // scanRowsUnsorted(); + scanRowsSorted(); + } + + private void setupTable() throws IOException { + desc = new HTableDescriptor("test"); + HOrderedColumnDescriptor colDesc = new HOrderedColumnDescriptor(VALUE.toString()); + colDesc.addSortedColumn(VALUE_LONG.getBytes(), new LongComparator()); + desc.addFamily(colDesc); + desc.addFamily(new HColumnDescriptor(OTHER.toString())); + admin = new HBaseAdmin(conf); + admin.createTable(desc); + table = new OrderedTable(conf, desc.getName()); + } + + private void writeRows() throws IOException { + long startTime = System.currentTimeMillis(); + + // Write out a bunch of values + Random r = new Random(); + for (int k = 1; k <= NUM_VALS; k++) { + BatchUpdate b = new BatchUpdate(new Text("row_" + k)); + + b.put(VALUE_LONG, LongComparator.serialize(r.nextLong())); + b.put(OTHER_DATA, OTHER_DATA_CONTENTS); + + table.commit(b); + } + LOG.info("Wrote " + NUM_VALS + " rows. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } + + private long getLong(final byte[] bytes) throws IOException { + LongWritable longWritable = new LongWritable(); + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream in = new DataInputStream(bais); + longWritable.readFields(in); + in.close(); + return longWritable.get(); + } + + private void scanRowsSorted() throws IOException { + long startTime = System.currentTimeMillis(); + + Scanner scanner = table + .getOrderedScanner(VALUE_LONG.getBytes(), true, new byte[][] { + VALUE_LONG.getBytes(), OTHER_DATA.getBytes() }, HConstants.LATEST_TIMESTAMP, + null); + + int numFound = 0; + Long lastValue = null; + while (true) { + RowResult result = scanner.next(); + if (result == null) { + break; + } + numFound++; + LOG.debug("row: " + result.getRow()); + byte[] value = result.get(VALUE_LONG).getValue(); + long thisValue = getLong(value); + LOG.debug("value: " + thisValue); + byte[] otherValue = result.get(OTHER_DATA).getValue(); + Assert.assertTrue(Arrays.equals(OTHER_DATA_CONTENTS, otherValue)); + + if (lastValue != null) { + Assert.assertTrue(lastValue <= thisValue); + } + lastValue = thisValue; + } + + Assert.assertEquals(NUM_VALS, numFound); + + LOG.info("Read sorted " + numFound + " rows. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } +} Index: src/java/org/apache/hadoop/hbase/ColumnValueComparator.java =================================================================== --- src/java/org/apache/hadoop/hbase/ColumnValueComparator.java (revision 0) +++ src/java/org/apache/hadoop/hbase/ColumnValueComparator.java (revision 0) @@ -0,0 +1,33 @@ +/* + * $Id$ + * Created on Apr 20, 2008 + * + */ +package org.apache.hadoop.hbase; + +import java.util.Comparator; + +import org.apache.hadoop.io.Writable; + +/** + * Allows one to deserialize a byte array to a value of type , as well as get + * a comparator for that type. + * + * @param value type. + */ +public interface ColumnValueComparator extends Writable { + + /** Make a value. + * + * @param bytes + * @return value + */ + T makeValue(byte[] bytes); + + /** Get the comparator. + * + * @return comparator + */ + Comparator getComparator(); + +} Index: src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/java/org/apache/hadoop/hbase/HConstants.java (revision 658783) +++ src/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -80,6 +80,9 @@ /** Parameter name for what region server interface to use. */ static final String REGION_SERVER_CLASS = "hbase.regionserver.class"; + /** Parameter name for what region server implementation to use. */ + static final String REGION_SERVER_IMPL= "hbase.regionserver.impl"; + /** Default region server interface class name. */ static final String DEFAULT_REGION_SERVER_CLASS = HRegionInterface.class.getName(); Index: src/java/org/apache/hadoop/hbase/regionserver/HOrderedRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HOrderedRegion.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/HOrderedRegion.java (revision 0) @@ -0,0 +1,51 @@ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HOrderedColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.util.Progressable; + +/** Implementation of HRegion that can maintain a secondary index. + * + */ +public class HOrderedRegion extends HRegion { + + public HOrderedRegion(Path basedir, HLog log, FileSystem fs, + HBaseConfiguration conf, HRegionInfo regionInfo, Path initialFiles, + FlushRequester flushListener, Progressable reporter) throws IOException { + super(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, reporter); + } + + @Override + protected HStore instantiateHStore(Path baseDir, HColumnDescriptor c, + Path oldLogFile, Progressable reporter) throws IOException { + if (c instanceof HOrderedColumnDescriptor) { + HOrderedColumnDescriptor oc = (HOrderedColumnDescriptor) c; + if (oc.getSortedColumns() != null && oc.getSortedColumns().size() > 0) { + return new ColumnSortedHStore(basedir, this.regionInfo, oc, this.fs, + oldLogFile, this.conf, reporter); + } + } + return super.instantiateHStore(baseDir, c, oldLogFile, reporter); + } + + + public Iterator getSortedIterator(byte[] orderByColumn, + boolean ascending) throws IOException { + HStore store = super.getStore(orderByColumn); + if (store == null || !(store instanceof ColumnSortedHStore)) { + throw new IOException("No sorted store for column: " + + new String(orderByColumn)); + } + ColumnSortedHStore sortedStore = (ColumnSortedHStore) store; + return sortedStore.getOrderedIterator(orderByColumn, ascending); + } +} Index: src/java/org/apache/hadoop/hbase/regionserver/ColumnSortedHStore.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/ColumnSortedHStore.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/ColumnSortedHStore.java (revision 0) @@ -0,0 +1,117 @@ +/* + * $Id$ + * Created on Apr 23, 2008 + * + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.Map.Entry; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ColumnValueComparator; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HOrderedColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.Progressable; + +/** + * HStore which also maintains a sort on column values. + * + */ +class ColumnSortedHStore extends HStore { + + Map> columnNameToSortedStore = Collections + .synchronizedMap(new TreeMap>( + Bytes.BYTES_COMPARATOR)); + + @SuppressWarnings("unchecked") + ColumnSortedHStore(final Path basedir, final HRegionInfo info, + final HOrderedColumnDescriptor family, final FileSystem fs, + final Path reconstructionLog, final HBaseConfiguration conf, + final Progressable reporter) throws IOException { + super(basedir, info, family, fs, reconstructionLog, conf, reporter); + + for (Entry> entry : family + .getSortedColumns().entrySet()) { + + columnNameToSortedStore.put(entry.getKey(), new SortedColumn(entry + .getValue())); + } + + buildSortedStores(); + } + + private void buildSortedStores() throws IOException { + LOG.debug("Starting sorted set construction"); + int numKeys = 0; + + long startTime = System.currentTimeMillis(); + SortedMap results = new TreeMap( + Bytes.BYTES_COMPARATOR); + + for (Entry> entry : columnNameToSortedStore + .entrySet()) { + byte[] colName = entry.getKey(); + SortedColumn sortedColumnStore = entry.getValue(); + + InternalScanner scanner = super.getScanner(HConstants.LATEST_TIMESTAMP, + new byte[][] { colName }, HConstants.EMPTY_START_ROW, null); + HStoreKey key = new HStoreKey(); + while (scanner.next(key, results)) { + byte[] value = results.get(colName); + sortedColumnStore.add(new HStoreKey(key), value); + results.clear(); // Not done by scanner + numKeys++; + } + } + LOG.debug("Sorted set constructed. Took: " + + (System.currentTimeMillis() - startTime) + " ms for " + numKeys + + " keys"); + } + + /** + * Get an iterator ordered by the given columnName. + * + * @param colName + * @param ascending if true then low to high + * @return iterator + */ + public Iterator getOrderedIterator(final byte[] colName, + final boolean ascending) { + SortedColumn scs = columnNameToSortedStore.get(colName); + if (scs == null) { + return null; + } + return scs.getKeyIterator(ascending); + } + + /** + * {@inheritDoc} + */ + @Override + void add(final HStoreKey key, final byte[] value) { + super.add(key, value); + SortedColumn scs = columnNameToSortedStore.get(key.getColumn()); + if (scs != null) { + if (HLogEdit.isDeleted(value)) { + scs.remove(key); + } else { + scs.add(key, value); + } + } + } + +} Index: src/java/org/apache/hadoop/hbase/regionserver/SortedColumn.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/SortedColumn.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/SortedColumn.java (revision 0) @@ -0,0 +1,115 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ColumnValueComparator; +import org.apache.hadoop.hbase.HStoreKey; + +/** Maintains a sorted tree for a single column. + * +* @param value type of column + */ +class SortedColumn { + + private class SortedColumnValue implements Comparable { + + private HStoreKey key; + private T value; + + public SortedColumnValue(HStoreKey key, T value) { + this.key = key; + this.value = value; + } + + public int compareTo(SortedColumnValue o) { + int result = columnValueComparator.getComparator().compare(this.value, o.value); + if (result != 0) { + return result; + } + return key.compareTo(o.key); + } + + /** + * Get the key. + * + * @return Return the key. + */ + public HStoreKey getKey() { + return key; + } + + /** + * Get the value. + * + * @return Return the value. + */ + public T getValue() { + return value; + } + } + + private ColumnValueComparator columnValueComparator; + private NavigableSet sortedSet = new TreeSet(); + private Map keyToValueMap = Collections + .synchronizedMap(new HashMap()); + + SortedColumn(ColumnValueComparator comparator) { + this.columnValueComparator = comparator; + } + + public void add(HStoreKey key, byte[] bytes) { + T value = columnValueComparator.makeValue(bytes); + SortedColumnValue colValue = new SortedColumnValue(key, value); + + SortedColumnValue oldColValue = keyToValueMap.get(key); + + if (oldColValue != null) { + keyToValueMap.remove(key); + sortedSet.remove(oldColValue); + } + + keyToValueMap.put(key, colValue); + sortedSet.add(colValue); + } + + public void remove(HStoreKey key) { + SortedColumnValue value = keyToValueMap.get(key); + if (value == null) { + throw new RuntimeException("No value for key " + key.toString()); + } + sortedSet.remove(value); + keyToValueMap.remove(key); + } + + public Iterator getKeyIterator(boolean ascending) { + final Iterator iterator; + if (ascending) { + iterator = sortedSet.iterator(); + } else { + iterator = sortedSet.descendingIterator(); + } + + return new Iterator() { + + public boolean hasNext() { + return iterator.hasNext(); + } + + public HStoreKey next() { + return iterator.next().getKey(); + } + + public void remove() { + throw new UnsupportedOperationException("#remove not allowed!"); + } + + }; + } +} Index: src/java/org/apache/hadoop/hbase/regionserver/OrderedRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/OrderedRegionServer.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/OrderedRegionServer.java (revision 0) @@ -0,0 +1,248 @@ +/* + * $Id$ + * Created on Apr 24, 2008 + * + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LeaseListener; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.HOrderedRegionInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.Progressable; + +/** RegionServer which provides ordered scanners. + * + */ +public class OrderedRegionServer extends HRegionServer implements HOrderedRegionInterface { + + + private Map orderedScanners = + Collections.synchronizedMap(new HashMap()); + + private final Random rand = new Random(); + + /** + * Instantiated as a scanner lease. + * If the lease times out, the scanner is closed + */ + private class ScannerListener implements LeaseListener { + private final String scannerName; + + ScannerListener(final String n) { + this.scannerName = n; + } + + /** {@inheritDoc} */ + public void leaseExpired() { + LOG.info("Ordered Scanner " + this.scannerName + " lease expired"); + synchronized(orderedScanners) { + orderedScanners.remove(this.scannerName); + } + } + } + + public OrderedRegionServer(HBaseConfiguration conf) throws IOException { + this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS, + DEFAULT_REGIONSERVER_ADDRESS)), conf); + } + + + + public OrderedRegionServer(HServerAddress serverAddress, + HBaseConfiguration conf) throws IOException { + super(serverAddress, conf); + } + + /** {@inheritDoc} */ + @Override + public long getProtocolVersion(final String protocol, + @SuppressWarnings("unused") final long clientVersion) + throws IOException { + if (protocol.equals(HOrderedRegionInterface.class.getName())) { + return HOrderedRegionInterface.versionID; + } + throw new IOException("Unknown protocol: " + protocol); + } + + /** {@inheritDoc} */ + @Override + protected HRegion instantiateRegion(final HRegionInfo regionInfo) throws IOException { + return new HOrderedRegion(HTableDescriptor.getTableDir(super.getRootDir(), regionInfo + .getTableDesc().getName()), this.log, super.getFileSystem(), conf, regionInfo, null, + this.cacheFlusher, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + } + + + /** {@inheritDoc} */ + public long openOrderedScanner(byte [] regionName, byte [] orderByColumn, boolean ascending, + byte[][] columns, long timestamp, RowFilterInterface filter) + throws IOException { + + super.checkOpen(); + NullPointerException npe = null; + if (regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if (columns == null) { + npe = new NullPointerException("columns to scan is null"); + } + if (npe != null) { + IOException io = new IOException("Invalid arguments to openScanner"); + io.initCause(npe); + throw io; + } + super.getRequestCount().incrementAndGet(); + try { + HOrderedRegion r = (HOrderedRegion) getRegion(regionName); + long scannerId = -1L; + Iterator it = r.getSortedIterator(orderByColumn, ascending); + OrderedScanner scanner = new OrderedScanner(regionName, it, columns, filter); + scannerId = rand.nextLong(); + String scannerName = String.valueOf(scannerId); + synchronized(orderedScanners) { + orderedScanners.put(scannerName, scanner); + } + super.getLeases(). + createLease(scannerName, new ScannerListener(scannerName)); + return scannerId; + } catch (IOException e) { + LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", + RemoteExceptionHandler.checkIOException(e)); + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public RowResult nextOrdered(final long scannerId) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + String scannerName = String.valueOf(scannerId); + OrderedScanner scanner = orderedScanners.get(scannerName); + if (scanner == null) { + throw new UnknownScannerException("Name: " + scannerName); + } + super.getLeases().renewLease(scannerName); + return scanner.next(); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public void closeOrdered(long scannerId) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + String scannerName = String.valueOf(scannerId); + OrderedScanner s = null; + synchronized(orderedScanners) { + s = orderedScanners.remove(scannerName); + } + if(s == null) { + throw new UnknownScannerException(scannerName); + } + super.getLeases().cancelLease(scannerName); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + private class OrderedScanner { + private byte [] regionName; + private Iterator iterator; + private byte [][] columns; + private RowFilterInterface filter; + private SortedMap sortedMap; + private List columnsToDrop = null; + + + public OrderedScanner(byte [] regionName, Iterator iterator, byte [][] columns, RowFilterInterface filter) { + this.regionName = regionName; + this.iterator = iterator; + this.columns = columns; + this.filter = filter; + if (filter != null) { + sortedMap = new TreeMap(Bytes.BYTES_COMPARATOR); + columnsToDrop = new LinkedList(); + } + } + + public RowResult next() throws IOException { + + RowResult result = null; + + boolean filtered; + do { + if (! iterator.hasNext()) { + LOG.trace("end of iterator"); + result = null; + break; + } + HStoreKey nextKey = iterator.next(); + filtered = false; + byte [] rowKey = nextKey.getRow(); + filtered = filter == null ? false : filter.filterRowKey(rowKey); + + if (filtered) { + LOG.trace("filtered on row key"); + } + + if (!filtered) { + result = OrderedRegionServer.super.getRow(regionName, rowKey, columns, nextKey.getTimestamp()); + if (filter != null) { + sortedMap.clear(); + columnsToDrop.clear(); + for (Entry entry : result.entrySet()) { + if (!filter.filterColumn(rowKey, entry.getKey(), entry.getValue().getValue())) { + sortedMap.put(entry.getKey(), entry.getValue().getValue()); + } else { + columnsToDrop.add(entry.getKey()); + LOG.trace("filtered column "+ entry.getKey().toString()); + } + } + if (!filtered) { + for (byte []colName : columnsToDrop) { + result.remove(colName); + } + } + } + + } + }while (filtered); + + return result; + } + + } + +} Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 658783) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -849,15 +852,7 @@ HRegion region = this.onlineRegions.get(mapKey); if(region == null) { try { - region = new HRegion(HTableDescriptor.getTableDir(rootDir, - regionInfo.getTableDesc().getName()), - this.log, this.fs, conf, regionInfo, null, this.cacheFlusher, - new Progressable() { - public void progress() { - addProcessingMessage(regionInfo); - } - } - ); + region = instantiateRegion(regionInfo); // Startup a compaction early if one is needed. this.compactSplitThread.compactionRequested(region); } catch (IOException e) { @@ -880,6 +875,17 @@ reportOpen(regionInfo); } + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo + .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null, + this.cacheFlusher, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + } + /* * Add a MSG_REPORT_PROCESS_OPEN to the outbound queue. * This method is called while region is in the queue of regions to process @@ -1364,7 +1370,7 @@ * * @throws IOException */ - private void checkOpen() throws IOException { + protected void checkOpen() throws IOException { if (this.stopRequested.get() || this.abortRequested) { throw new IOException("Server not running"); } @@ -1523,6 +1529,34 @@ * @param args */ public static void main(String [] args) { - doMain(args, HRegionServer.class); + Configuration conf = new HBaseConfiguration(); + @SuppressWarnings("unchecked") + Class regionServerClass = (Class) conf + .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); + doMain(args, regionServerClass); } + + /** + * Get the leases. + * @return Return the leases. + */ + protected Leases getLeases() { + return leases; + } + + /** + * Get the rootDir. + * @return Return the rootDir. + */ + protected Path getRootDir() { + return rootDir; + } + + /** + * Get the fs. + * @return Return the fs. + */ + protected FileSystem getFileSystem() { + return fs; + } } Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 658783) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -446,8 +446,7 @@ // Load in all the HStores. long maxSeqId = -1; for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { - HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs, - oldLogFile, this.conf, reporter); + HStore store = instantiateHStore(this.basedir, c, oldLogFile, reporter); stores.put(Bytes.mapKey(c.getName()), store); long storeSeqId = store.getMaxSequenceId(); if (storeSeqId > maxSeqId) { @@ -492,6 +491,13 @@ LOG.info("region " + this + "/" + this.regionInfo.getEncodedName() + " available"); } + + protected HStore instantiateHStore(Path baseDir, + HColumnDescriptor c, Path oldLogFile, + Progressable reporter) throws IOException { + return new HStore(baseDir, this.regionInfo, c, this.fs, oldLogFile, + this.conf, reporter); + } /** * @return Updates to this region need to have a sequence id that is >= to @@ -1525,7 +1531,7 @@ * @return Store that goes with the family on passed column. * TODO: Make this lookup faster. */ - private HStore getStore(final byte [] column) { + protected HStore getStore(final byte [] column) { return this.stores.get(HStoreKey.getFamilyMapKey(column)); } Index: src/java/org/apache/hadoop/hbase/HTableDescriptor.java =================================================================== --- src/java/org/apache/hadoop/hbase/HTableDescriptor.java (revision 658783) +++ src/java/org/apache/hadoop/hbase/HTableDescriptor.java (working copy) @@ -28,8 +28,10 @@ import java.util.Iterator; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.WritableComparable; /** @@ -214,9 +216,10 @@ out.writeBoolean(metaregion); Bytes.writeByteArray(out, name); out.writeInt(families.size()); + Configuration conf = new HBaseConfiguration(); for(Iterator it = families.values().iterator(); it.hasNext(); ) { - it.next().write(out); + ObjectWritable.writeObject(out, it.next(), HColumnDescriptor.class, conf); } } @@ -228,9 +231,9 @@ this.nameAsString = Bytes.toString(this.name); int numCols = in.readInt(); this.families.clear(); + Configuration conf = new HBaseConfiguration(); for (int i = 0; i < numCols; i++) { - HColumnDescriptor c = new HColumnDescriptor(); - c.readFields(in); + HColumnDescriptor c = (HColumnDescriptor) ObjectWritable.readObject(in, conf); this.families.put(Bytes.mapKey(c.getName()), c); } } Index: src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java =================================================================== --- src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 658783) +++ src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy) @@ -65,6 +65,7 @@ /** 'local:' */ public static final String LOCAL_COLON = LOCAL + ":"; private final HBaseConfiguration conf; + private final Class regionServerClass; /** * Constructor. @@ -98,6 +99,7 @@ // start/stop ports at different times during the life of the test. conf.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0"); this.regionThreads = new ArrayList(); + regionServerClass = (Class) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); for (int i = 0; i < noRegionServers; i++) { addRegionServer(); } @@ -112,7 +114,13 @@ */ public RegionServerThread addRegionServer() throws IOException { synchronized (regionThreads) { - RegionServerThread t = new RegionServerThread(new HRegionServer(conf), + HRegionServer server; + try { + server = regionServerClass.getConstructor(HBaseConfiguration.class).newInstance(conf); + } catch (Exception e) { + throw new IOException(e); + } + RegionServerThread t = new RegionServerThread(server, this.regionThreads.size()); this.regionThreads.add(t); return t; Index: src/java/org/apache/hadoop/hbase/HOrderedColumnDescriptor.java =================================================================== --- src/java/org/apache/hadoop/hbase/HOrderedColumnDescriptor.java (revision 0) +++ src/java/org/apache/hadoop/hbase/HOrderedColumnDescriptor.java (revision 0) @@ -0,0 +1,118 @@ +/* + * $Id$ + * Created on May 21, 2008 + * + */ +package org.apache.hadoop.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; + +/** Column descriptor with info about the secondary indexes for ordered scans. + * + */ +public class HOrderedColumnDescriptor extends HColumnDescriptor { + + // Columns which we maintain sorting. Entries are ColumnName -> + // WritableComparable bytes for the value + private Map> sortedColumns = new TreeMap>( + Bytes.BYTES_COMPARATOR); + + public HOrderedColumnDescriptor() { + super(); + } + + public HOrderedColumnDescriptor(String columnName) { + super(columnName); + } + + public HOrderedColumnDescriptor(Text columnName) { + super(columnName); + } + + public HOrderedColumnDescriptor(byte[] columnName) { + super(columnName); + } + + public HOrderedColumnDescriptor(byte[] columnName, int maxVersions, + CompressionType compression, boolean inMemory, boolean blockCacheEnabled, + int maxValueLength, int timeToLive, BloomFilterDescriptor bloomFilter) { + super(columnName, maxVersions, compression, inMemory, blockCacheEnabled, + maxValueLength, timeToLive, bloomFilter); + } + + /** {@inheritDoc} */ + @Override + public void readFields(final DataInput in) throws IOException { + super.readFields(in); + HBaseConfiguration conf = new HBaseConfiguration(); + + int numEntries = in.readInt(); + for (int i = 0; i < numEntries; i++) { + byte[] key = Bytes.readByteArray(in); + ColumnValueComparator comparator = (ColumnValueComparator) ObjectWritable + .readObject(in, conf); + sortedColumns.put(key, comparator); + } + } + + /** {@inheritDoc} */ + @Override + public void write(final DataOutput out) throws IOException { + super.write(out); + + Configuration conf = new HBaseConfiguration(); + out.writeInt(sortedColumns.size()); + for (Entry> entry : sortedColumns.entrySet()) { + Bytes.writeByteArray(out, entry.getKey()); + ObjectWritable + .writeObject(out, entry.getValue(), ColumnValueComparator.class, conf); + } + } + + /** + * Get the sortedColumns. + * + * @return Return a map of the sortedColumns, to the comparator instance. + */ + public Map> getSortedColumns() { + return sortedColumns; + } + + /** + * Add a sorted column. + * + * @param colunmName + * Qualified with family prefix. Must be same as this family + * name. + * @param comparator + * used to instantiate column values and compare + */ + public void addSortedColumn(final byte[] colunmName, + final ColumnValueComparator comparator) { + sortedColumns.put(colunmName, comparator); + } + + /** + * Add a sorted column. + * + * @param colunmName + * Qualified with family prefix. Must be same as this family + * name. + * @param comparator + * used to instantiate column values and compare + */ + public void addSortedColumn(final String colunmName, + final ColumnValueComparator comparator) { + sortedColumns.put(colunmName.getBytes(), comparator); + } +} Index: src/java/org/apache/hadoop/hbase/ipc/HOrderedRegionInterface.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HOrderedRegionInterface.java (revision 0) +++ src/java/org/apache/hadoop/hbase/ipc/HOrderedRegionInterface.java (revision 0) @@ -0,0 +1,55 @@ +/* + * $Id$ + * Created on Apr 24, 2008 + * + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.io.Text; + +/** + * Version of HRegionInterface supplemented with ordered scanners. + * + */ +public interface HOrderedRegionInterface extends HRegionInterface { + /** Interface version number */ + public static final long versionID = 1L; + + /** + * Open an ordered-by scanner. + * + * @param regionName name of region to scan + * @param orderByColumn column to orderBy (must be pre-declared in + * HColumnDescriptor). + * @param ascending if true, then low to high + * @param columns columns to scan. + * @param timestamp only return values whose timestamp is <= this value + * @param filter RowFilter for filtering results at the row-level. + * @return + * @throws IOException + */ + public long openOrderedScanner(byte[] regionName, byte[] orderByColumn, + boolean ascending, byte[][] columns, long timestamp, + RowFilterInterface filter) throws IOException; + + /** + * Get the next row of an ordered-by scan. + * + * @param scannerId returned from openScanner + * @return rowResult + * @throws IOException + */ + public RowResult nextOrdered(long scannerId) throws IOException; + + /** + * Close an ordred scanner + * + * @param scannerId returned from openScanner + * @throws IOException + */ + public void closeOrdered(long scannerId) throws IOException; +} Index: src/java/org/apache/hadoop/hbase/client/OrderedTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/OrderedTable.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/OrderedTable.java (revision 0) @@ -0,0 +1,57 @@ +/* + * $Id$ + * Created on May 21, 2008 + * + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.io.Text; + +/** Adds column ordered scanners to HTable. + */ +public class OrderedTable extends HTable { + + public OrderedTable(HBaseConfiguration conf, String tableName) + throws IOException { + super(conf, tableName); + } + + public OrderedTable(HBaseConfiguration conf, Text tableName) + throws IOException { + super(conf, tableName); + } + + public OrderedTable(HBaseConfiguration conf, byte[] tableName) + throws IOException { + super(conf, tableName); + } + + /** + * Get a scanner on the current table. The rows are to be iterated ordered-by + * the given column's value. The orderBy column must habe been declared when + * the table was created. + * + * + * @param orderByColumn to column to orderBy. (Must be declared in table + * metadata) + * @param ascending if true then order low to high. + * @param columns columns to scan. + * @param timestamp only return results whose timestamp <= this value + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + @SuppressWarnings("unchecked") + public Scanner getOrderedScanner(byte[] orderByColumn, boolean ascending, + byte[][] columns, long timestamp, RowFilterInterface filter) + throws IOException { + // TODO, verify orderBy column + return new OrderedScanner(this, orderByColumn, ascending, columns, + timestamp, filter); + } + +} Index: src/java/org/apache/hadoop/hbase/client/OrderedScanner.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/OrderedScanner.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/OrderedScanner.java (revision 0) @@ -0,0 +1,264 @@ +/* + * $Id$ + * Created on Apr 24, 2008 + * + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ColumnValueComparator; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HOrderedColumnDescriptor; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.HOrderedRegionInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; + +/** + * Scanner where results are returned ordered-by a given column. + * + * @param column value type + * + */ +class OrderedScanner implements Scanner { + + private static final Log LOG = LogFactory.getLog(OrderedScanner.class); + + + private class RegionScanner implements + Comparable { + private long scannerId; + private HOrderedRegionInterface server; + private RowResult rowResult; + private T columnValue; + + public RegionScanner(long scannerId, HOrderedRegionInterface server) { + this.scannerId = scannerId; + this.server = server; + } + + public int compareTo(RegionScanner other) { + int colResult; + if (ascending) { + colResult = comparator.getComparator().compare(this.columnValue, other.columnValue); + } else { + colResult = comparator.getComparator().compare(other.columnValue, this.columnValue); + } + if (colResult != 0) { + return colResult; + } + if (ascending) { + return Bytes.compareTo(this.rowResult.getRow(), other.rowResult.getRow()); + } + return Bytes.compareTo(other.rowResult.getRow(), this.rowResult.getRow()); + } + + public boolean next() throws IOException { + rowResult = server.nextOrdered(scannerId); + if (rowResult == null) { + return false; + } + Cell colCell = rowResult.get(sortColumn); + if (colCell == null) { + return false; // FIXME + } + byte[] colBytes = colCell.getValue(); + if (colBytes == null) { + return false; // FIXME + } + columnValue = comparator.makeValue(colBytes); + return true; + } + + public void close() throws IOException { + server.closeOrdered(scannerId); + } + + /** + * Get the scannerId. + * + * @return Return the scannerId. + */ + public long getScannerId() { + return scannerId; + } + + /** + * Get the rowResult. + * + * @return Return the rowResult. + */ + public RowResult getRowResult() { + return rowResult; + } + } + + private HTable table; + private byte [] sortColumn; + private boolean ascending; + private ColumnValueComparator comparator; + private RowFilterInterface filter; + private SortedSet topRows = new TreeSet(); + private SortedMap columnMap; // To send to filters + + @SuppressWarnings("unchecked") + public OrderedScanner(HTable table, byte [] sortColumn, boolean ascending, byte[][] columns, + long timeStamp, RowFilterInterface filter) throws IOException { + this.table = table; + this.sortColumn = sortColumn; + this.ascending = ascending; + this.filter = filter; + + HOrderedColumnDescriptor colDesc = (HOrderedColumnDescriptor) table.getMetadata().getFamily( + HStoreKey.getFamily(sortColumn)); + + this.comparator = (ColumnValueComparator) colDesc.getSortedColumns().get(sortColumn); + openScanners(columns, timeStamp); + if (filter != null) { + this.columnMap = new TreeMap(Bytes.BYTES_COMPARATOR); + } + } + + // TODO, don't open connections to all regions if we have a row prefix. + private List getRegions() throws IOException { + List regionLocations = new LinkedList(); + + byte[] nextRegionStartKey = HConstants.EMPTY_START_ROW; + HRegionLocation regionLocation; + do { + regionLocation = table.getRegionLocation(nextRegionStartKey); + regionLocations.add(regionLocation); + nextRegionStartKey = regionLocation.getRegionInfo().getEndKey(); + } while (!Bytes.equals(regionLocation.getRegionInfo().getEndKey(), HConstants.LAST_ROW)); + return regionLocations; + } + + private void openScanners(byte [][] columns, long timeStamp) throws IOException { + for (HRegionLocation regionLoc : getRegions()) { + HOrderedRegionInterface server = (HOrderedRegionInterface) table.connection + .getHRegionConnection(regionLoc.getServerAddress()); + + long scannerId = server.openOrderedScanner( + regionLoc.getRegionInfo().getRegionName(), sortColumn, ascending, columns, + timeStamp, filter); + LOG.trace("Making scanner for region: "+regionLoc.getRegionInfo().toString()); + RegionScanner s = new RegionScanner(scannerId, server); + if (s.next()) { + LOG.trace("added to sorted set."); + topRows.add(s); + } else { + LOG.trace("No rows, closing"); + s.close(); + } + } + } + + public void close() { + for (RegionScanner regionScanner : topRows) { + try { + regionScanner.close(); + } catch (IOException e) { + // Dont worry + } + } + } + + public RowResult next() throws IOException { + + boolean filtered; + RowResult result; + do { + filtered = false; + + if (topRows.size() == 0) { + return null; + } + + RegionScanner top = topRows.first(); + LOG.trace("Got top row ["+Bytes.toString(top.rowResult.getRow()) + "] with value ["+top.columnValue.toString()+"]"); + topRows.remove(top); + result = top.getRowResult(); + if (top.next()) { + LOG.trace("puting back into sorted set"); + topRows.add(top); + } else { + LOG.trace("nothing left, closing"); + top.close(); + } + + if (filter != null) { + columnMap.clear(); + for (Entry entry : result.entrySet()) { + columnMap.put(entry.getKey(), entry.getValue().getValue()); + } + filtered = filter.filterRow(columnMap); + if (filtered) { + LOG.trace("Filtered on assembled row"); + } + } + + } while(filtered); + return result; + } + + // This implementation copy-pasted from HTable#ClientScanner + public Iterator iterator() { + return new Iterator() { + // The next RowResult, possibly pre-read + RowResult next = null; + + // return true if there is another item pending, false if there isn't. + // this method is where the actual advancing takes place, but you need + // to call next() to consume it. hasNext() will only advance if there + // isn't a pending next(). + public boolean hasNext() { + if (next == null) { + try { + next = OrderedScanner.this.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + // get the pending next item and advance the iterator. returns null if + // there is no next item. + public RowResult next() { + // since hasNext() does the real advancing, we call this to determine + // if there is a next before proceeding. + if (!hasNext()) { + return null; + } + + // if we get to here, then hasNext() has given us an item to return. + // we want to return the item and then null out the next pointer, so + // we use a temporary variable. + RowResult temp = next; + next = null; + return temp; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + +}