Index: src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java =================================================================== --- src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (revision 700621) +++ src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (working copy) @@ -475,7 +475,7 @@ * @return Returns zero-prefixed 10-byte wide decimal version of passed * number (Does absolute in case number is negative). */ - static byte [] format(final int number) { + public static byte [] format(final int number) { byte [] b = new byte[10]; int d = Math.abs(number); for (int i = b.length - 1; i >= 0; i--) { Index: src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java =================================================================== --- src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java (revision 0) +++ src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java (revision 0) @@ -0,0 +1,139 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.IOException; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.client.tableindexed.IndexNotFoundException; +import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; +import org.apache.hadoop.hbase.client.tableindexed.IndexedTable; +import org.apache.hadoop.hbase.client.tableindexed.IndexedTableAdmin; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegion; +import org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer; +import org.apache.hadoop.hbase.util.Bytes; + +public class TestIndexedTable extends HBaseClusterTestCase { + + private static final Log LOG = LogFactory.getLog(TestIndexedTable.class); + + private static final String TABLE_NAME = "table1"; + + private static final byte[] FAMILY = Bytes.toBytes("family:"); + private static final byte[] COL_A = Bytes.toBytes("family:a"); + private static final byte[] COL_B = Bytes.toBytes("family:b"); + private static final String INDEX_COL_A_ASC = "A-Acending"; + + private static final int NUM_ROWS = 10; + private static final int MAX_VAL = 10000; + + private IndexedTableAdmin admin; + private IndexedTable table; + private Random random = new Random(); + + /** constructor */ + public TestIndexedTable() { + conf + .set(HConstants.REGION_SERVER_IMPL, IndexedRegionServer.class.getName()); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); + desc.addFamily(new HColumnDescriptor(FAMILY)); + + // Create a new index that does lexicographic ordering on COL_A + IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A_ASC, + COL_A, true); + desc.addIndex(colAIndex); + + admin = new IndexedTableAdmin(conf); + admin.createTable(desc); + table = new IndexedTable(conf, desc.getName()); + } + + private void writeInitalRows() throws IOException { + for (int i = 0; i < NUM_ROWS; i++) { + BatchUpdate update = new BatchUpdate(PerformanceEvaluation.format(i)); + byte[] colA = PerformanceEvaluation.format(random.nextInt(MAX_VAL)); + update.put(COL_A, colA); + table.commit(update); + LOG.info("Inserted row [" + Bytes.toString(update.getRow()) + "] val: [" + + Bytes.toString(colA) + "]"); + } + } + + + public void testInitialWrites() throws IOException { + writeInitalRows(); + assertRowsInOrder(NUM_ROWS); + } + + private void assertRowsInOrder(int numRowsExpected) throws IndexNotFoundException, IOException { + Scanner scanner = table.getIndexedScanner(INDEX_COL_A_ASC, + HConstants.EMPTY_START_ROW, null, null, null); + int numRows = 0; + byte[] lastColA = null; + for (RowResult rowResult : scanner) { + byte[] colA = rowResult.get(COL_A).getValue(); + LOG.info("index scan : row [" + Bytes.toString(rowResult.getRow()) + + "] value [" + Bytes.toString(colA) + "]"); + if (lastColA != null) { + Assert.assertTrue(Bytes.compareTo(lastColA, colA) <= 0); + } + lastColA = colA; + numRows++; + } + Assert.assertEquals(numRowsExpected, numRows); + } + + public void testMultipleWrites() throws IOException { + writeInitalRows(); + writeInitalRows(); // Update the rows. + assertRowsInOrder(NUM_ROWS); + } + + public void testDelete() throws IOException { + writeInitalRows(); + // Delete the first row; + table.deleteAll(PerformanceEvaluation.format(0)); + + assertRowsInOrder(NUM_ROWS - 1); + } + +} Property changes on: src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java ___________________________________________________________________ Name: svn:keywords + Revision Date Id Index: src/java/org/apache/hadoop/hbase/HStoreKey.java =================================================================== --- src/java/org/apache/hadoop/hbase/HStoreKey.java (revision 700621) +++ src/java/org/apache/hadoop/hbase/HStoreKey.java (working copy) @@ -517,6 +517,9 @@ if(rowCompare == 0) rowCompare = Bytes.compareTo(keysA[1], KeysB[1]); return rowCompare; + } + if (regionInfo != null && regionInfo.getTableDesc().getRowKeyComparator() != null) { + return regionInfo.getTableDesc().getRowKeyComparator().compare(rowA, rowB); } return Bytes.compareTo(rowA, rowB); } Index: src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (revision 0) @@ -0,0 +1,294 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.tableindexed; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; +import org.apache.hadoop.hbase.client.tableindexed.IndexedTable; +import org.apache.hadoop.hbase.io.BatchOperation; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HLog; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; + +public class IndexedRegion extends HRegion { + + private static final Log LOG = LogFactory.getLog(IndexedRegion.class); + + private final HBaseConfiguration conf; + private Map indexSpecToTable = new HashMap(); + + public IndexedRegion(final Path basedir, final HLog log, final FileSystem fs, + final HBaseConfiguration conf, final HRegionInfo regionInfo, + final FlushRequester flushListener) { + super(basedir, log, fs, conf, regionInfo, flushListener); + this.conf = conf; + } + + private synchronized HTable getIndexTable(IndexSpecification index) + throws IOException { + HTable indexTable = indexSpecToTable.get(index); + if (indexTable == null) { + indexTable = new HTable(conf, index.getIndexedTableName(super + .getRegionInfo().getTableDesc().getName())); + indexSpecToTable.put(index, indexTable); + } + return indexTable; + } + + private Collection getIndexes() { + return super.getRegionInfo().getTableDesc().getIndexs(); + } + + /** + * @param b + * @param lockid + * @param writeToWAL if true, then we write this update to the log + * @throws IOException + */ + @Override + public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL) + throws IOException { + updateIndexes(b); // Do this first because will want to see the old row + super.batchUpdate(b, lockid, writeToWAL); + } + + private void updateIndexes(BatchUpdate b) throws IOException { + List indexesToUpdate = new LinkedList(); + + // Find the indexes we need to update + for (IndexSpecification index : getIndexes()) { + if (possiblyAppliesToIndex(index, b)) { + indexesToUpdate.add(index); + } + } + + if (indexesToUpdate.size() == 0) { + return; + } + + // See what columns we will need for the update + Set neededColumns = new TreeSet(Bytes.BYTES_COMPARATOR); + for (IndexSpecification indexSpec : indexesToUpdate) { + for (byte[] col : indexSpec.getAllColumns()) { + neededColumns.add(col); + } + } + + SortedMap newColumnValues = getColumnsFromBatchUpdate(b); + SortedMap oldColumnValues = getCurrentColumns(b.getRow(), + neededColumns); + + // Add the old values to the new if they are not there + for (Entry oldEntry : oldColumnValues.entrySet()) { + if (!newColumnValues.containsKey(oldEntry.getValue())) { + newColumnValues.put(oldEntry.getKey(), oldEntry.getValue()); + } + } + + Iterator indexIterator = indexesToUpdate.iterator(); + while (indexIterator.hasNext()) { + IndexSpecification indexSpec = indexIterator.next(); + if (!doesApplyToIndex(indexSpec, newColumnValues)) { + indexIterator.remove(); + } + } + + for (IndexSpecification indexSpec : indexesToUpdate) { + removeOldIndexEntry(indexSpec, b.getRow(), oldColumnValues); + updateIndex(indexSpec, b.getRow(), newColumnValues); + } + } + + private void removeOldIndexEntry(IndexSpecification indexSpec, byte[] row, + SortedMap oldColumnValues) throws IOException { + for (byte[] indexedCol : indexSpec.getIndexedColumns()) { + if (!oldColumnValues.containsKey(indexedCol)) { + LOG.debug("Index [" + indexSpec.getIndexId() + + "] not trying to remove old entry for row [" + + Bytes.toString(row) + "] because col [" + + Bytes.toString(indexedCol) + "] is missing"); + return; + } + } + + byte[] oldIndexRow = indexSpec.getKeyGenerator().createIndexKey(row, + oldColumnValues); + LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry [" + + Bytes.toString(oldIndexRow) + "]"); + getIndexTable(indexSpec).deleteAll(oldIndexRow); + } + + private SortedMap getCurrentColumns(byte[] row, + Set neededColumns) throws IOException { + + SortedMap columnValues = new TreeMap( + Bytes.BYTES_COMPARATOR); + + Map rest = super.getFull(row, neededColumns, + HConstants.LATEST_TIMESTAMP, null); + + for (Entry entry : rest.entrySet()) { + columnValues.put(entry.getKey(), entry.getValue().getValue()); + } + + return columnValues; + } + + private SortedMap getColumnsFromBatchUpdate(BatchUpdate b) { + SortedMap columnValues = new TreeMap( + Bytes.BYTES_COMPARATOR); + for (BatchOperation op : b) { + if (op.isPut()) { + columnValues.put(op.getColumn(), op.getValue()); + } + } + return columnValues; + } + + /** Ask if this update *could* apply to the index. It may actually apply if some of the columns needed are missing. + * + * @param indexSpec + * @param b + * @return true if possibly apply. + */ + private boolean possiblyAppliesToIndex(IndexSpecification indexSpec, BatchUpdate b) { + for (BatchOperation op : b) { + if (indexSpec.containsColumn(op.getColumn())) { + return true; + } + } + return false; + } + + /** Ask if this update *could* apply to the index. It may actually apply if some of the columns needed are missing. + * + * @param indexSpec + * @param b + * @return true if possibly apply. + */ + private boolean doesApplyToIndex(IndexSpecification indexSpec, SortedMap columnValues) { + + for (byte [] neededCol : indexSpec.getIndexedColumns()) { + if (! columnValues.containsKey(neededCol)) { + LOG.debug("Index [" + indexSpec.getIndexId() + "] can't be updated because [" + + Bytes.toString(neededCol) + "] is missing"); + return false; + } + } + return true; + } + + private void updateIndex(IndexSpecification indexSpec, byte[] row, + SortedMap columnValues) throws IOException { + BatchUpdate indexUpdate = createIndexUpdate(indexSpec, row, columnValues); + getIndexTable(indexSpec).commit(indexUpdate); + LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry [" + + Bytes.toString(indexUpdate.getRow()) + "] for row [" + + Bytes.toString(row) + "]"); + + } + + private BatchUpdate createIndexUpdate(IndexSpecification indexSpec, + byte[] row, SortedMap columnValues) { + byte[] indexRow = indexSpec.getKeyGenerator().createIndexKey(row, + columnValues); + BatchUpdate update = new BatchUpdate(indexRow); + + update.put(IndexedTable.INDEX_BASE_ROW_COLUMN, row); + + for (byte[] col : indexSpec.getIndexedColumns()) { + byte[] val = columnValues.get(col); + if (val == null) { + throw new RuntimeException("Unexpected missing column value. ["+Bytes.toString(col)+"]"); + } + update.put(col, val); + } + + for (byte [] col : indexSpec.getAdditionalColumns()) { + byte[] val = columnValues.get(col); + if (val != null) { + update.put(col, val); + } + } + + return update; + } + + @Override + public void deleteAll(final byte[] row, final long ts, final Integer lockid) + throws IOException { + + if (getIndexes().size() != 0) { + + Set neededColumns = new TreeSet(Bytes.BYTES_COMPARATOR); + for (IndexSpecification indexSpec : getIndexes()) { + for (byte[] col : indexSpec.getAllColumns()) { + neededColumns.add(col); + } + } + + SortedMap oldColumnValues = getCurrentColumns(row, + neededColumns); + + for (IndexSpecification indexSpec : getIndexes()) { + removeOldIndexEntry(indexSpec, row, oldColumnValues); + } + + // TODO, handle if there is still a version visible. + if (ts != HConstants.LATEST_TIMESTAMP) { + LOG + .warn("delteAll for only some versions is not yet implemented. Index will be out of sync!"); + } + } + super.deleteAll(row, ts, lockid); + } + + @Override + public void deleteAll(final byte[] row, byte[] column, final long ts, + final Integer lockid) throws IOException { + LOG + .warn("deleteAll for a single column not yet implemented. Index will be out of sync!"); + super.deleteAll(row, column, ts, lockid); + } + +} Property changes on: src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java ___________________________________________________________________ Name: svn:keywords + Revision Date Id Index: src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java (revision 0) @@ -0,0 +1,64 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.tableindexed; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ipc.IndexedRegionInterface; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.util.Progressable; + +/** + * RegionServer which maintains secondary indexes. + * + **/ +public class IndexedRegionServer extends HRegionServer implements + IndexedRegionInterface { + + public IndexedRegionServer(HBaseConfiguration conf) throws IOException { + this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS, + DEFAULT_REGIONSERVER_ADDRESS)), conf); + } + + public IndexedRegionServer(HServerAddress serverAddress, + HBaseConfiguration conf) throws IOException { + super(serverAddress, conf); + } + + @Override + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + HRegion r = new IndexedRegion(HTableDescriptor.getTableDir(super + .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super + .getFileSystem(), super.conf, regionInfo, super.getFlushRequester()); + r.initialize(null, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + return r; + } + +} Property changes on: src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java ___________________________________________________________________ Name: svn:keywords + Revision Date Id Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (revision 700621) +++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (working copy) @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegion; import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.Progressable; @@ -73,7 +74,7 @@ * will have to consult the transaction log to determine the final decision of * the transaction. This is not yet implemented. */ -class TransactionalRegion extends HRegion { +class TransactionalRegion extends IndexedRegion { private static final String LEASE_TIME = "hbase.transaction.leaseTime"; private static final int DEFAULT_LEASE_TIME = 60 * 1000; Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (revision 700621) +++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (working copy) @@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.MapWritable; @@ -54,7 +54,7 @@ * region level, so we mostly just delegate to the appropriate * TransactionalRegion. */ -public class TransactionalRegionServer extends HRegionServer implements +public class TransactionalRegionServer extends IndexedRegionServer implements TransactionalRegionInterface { static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class); Index: src/java/org/apache/hadoop/hbase/HTableDescriptor.java =================================================================== --- src/java/org/apache/hadoop/hbase/HTableDescriptor.java (revision 700621) +++ src/java/org/apache/hadoop/hbase/HTableDescriptor.java (working copy) @@ -19,8 +19,12 @@ */ package org.apache.hadoop.hbase; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -29,8 +33,10 @@ import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.WritableComparable; /** @@ -57,7 +63,8 @@ // Changes prior to version 3 were not recorded here. // Version 3 adds metadata as a map where keys and values are byte[]. - public static final byte TABLE_DESCRIPTOR_VERSION = 3; + // Version 4 adds indexes + public static final byte TABLE_DESCRIPTOR_VERSION = 4; private byte [] name = HConstants.EMPTY_BYTE_ARRAY; private String nameAsString = ""; @@ -75,6 +82,7 @@ public static final String MEMCACHE_FLUSHSIZE = "MEMCACHE_FLUSHSIZE"; public static final String IS_ROOT = "IS_ROOT"; public static final String IS_META = "IS_META"; + public static final String ROW_KEY_COMPARATOR = "ROW_KEY_COMPARATOR"; public static final boolean DEFAULT_IN_MEMORY = false; @@ -90,6 +98,10 @@ private final Map families = new HashMap(); + // Key is indexId + private final Map indexes = + new HashMap(); + /** * Private constructor used internally creating table descriptors for * catalog tables: e.g. .META. and -ROOT-. @@ -107,12 +119,16 @@ * catalog tables: e.g. .META. and -ROOT-. */ protected HTableDescriptor(final byte [] name, HColumnDescriptor[] families, + Collection indexes, Map values) { this.name = name.clone(); setMetaFlags(name); for(HColumnDescriptor descriptor : families) { this.families.put(Bytes.mapKey(descriptor.getName()), descriptor); } + for(IndexSpecification index : indexes) { + this.indexes.put(index.getIndexId(), index); + } for (Map.Entry entry: values.entrySet()) { this.values.put(entry.getKey(), entry.getValue()); @@ -376,13 +392,59 @@ return Integer.valueOf(value); return DEFAULT_MEMCACHE_FLUSH_SIZE; } - + /** * @param memcacheFlushSize memory cache flush size for each hregion */ public void setMemcacheFlushSize(int memcacheFlushSize) { setValue(MEMCACHE_FLUSHSIZE, Integer.toString(memcacheFlushSize)); } + + public void setRowKeyComparator(WritableComparator comparator) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + HBaseConfiguration conf = new HBaseConfiguration(); + try { + ObjectWritable.writeObject(dos, comparator, WritableComparator.class, conf); + dos.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + setValue(ROW_KEY_COMPARATOR.getBytes(), bos.toByteArray()); + } + + private WritableComparator comparator = null; + public WritableComparator getRowKeyComparator() { + if (comparator != null) { + return comparator; + } + byte[] bytes = getValue(ROW_KEY_COMPARATOR.getBytes()); + if (bytes == null) { + return null; + } + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + DataInputStream in = new DataInputStream(bis); + HBaseConfiguration conf = new HBaseConfiguration(); + try { + comparator = (WritableComparator) ObjectWritable.readObject(in, conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + return comparator; + } + + + public Collection getIndexs() { + return indexes.values(); + } + + public IndexSpecification getIndex(String indexId) { + return indexes.get(indexId); + } + + public void addIndex(IndexSpecification index) { + indexes.put(index.getIndexId(), index); + } /** * Adds a column family. @@ -490,6 +552,16 @@ c.readFields(in); families.put(Bytes.mapKey(c.getName()), c); } + indexes.clear(); + if (version < 4) { + return; + } + int numIndexes = in.readInt(); + for (int i = 0; i < numIndexes; i++) { + IndexSpecification index = new IndexSpecification(); + index.readFields(in); + addIndex(index); + } } public void write(DataOutput out) throws IOException { @@ -509,6 +581,10 @@ HColumnDescriptor family = it.next(); family.write(out); } + out.writeInt(indexes.size()); + for(IndexSpecification index : indexes.values()) { + index.write(out); + } } // Comparable Index: src/java/org/apache/hadoop/hbase/WritableComparator.java =================================================================== --- src/java/org/apache/hadoop/hbase/WritableComparator.java (revision 0) +++ src/java/org/apache/hadoop/hbase/WritableComparator.java (revision 0) @@ -0,0 +1,28 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.Comparator; + +import org.apache.hadoop.io.Writable; + +public interface WritableComparator extends Writable, Comparator { +// No methods, just bring the two interfaces together +} Index: src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java (revision 0) +++ src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java (revision 0) @@ -0,0 +1,11 @@ +/* + * $Id$ + * Created on Sep 10, 2008 + * + */ +package org.apache.hadoop.hbase.ipc; + +/** Interface for the indexed region server. */ +public interface IndexedRegionInterface extends HRegionInterface { + // No methods for now... +} Index: src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (revision 700621) +++ src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (working copy) @@ -28,7 +28,7 @@ * Interface for transactional region servers. * */ -public interface TransactionalRegionInterface extends HRegionInterface { +public interface TransactionalRegionInterface extends IndexedRegionInterface { /** Interface version number */ public static final long versionID = 1L; Index: src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java (revision 0) @@ -0,0 +1,99 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.ColumnNameParseException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Extension of HBaseAdmin that creates indexed tables. + * + */ +public class IndexedTableAdmin extends HBaseAdmin { + + /** + * Constructor + * + * @param conf Configuration object + * @throws MasterNotRunningException + */ + public IndexedTableAdmin(HBaseConfiguration conf) + throws MasterNotRunningException { + super(conf); + } + + /** + * Creates a new table + * + * @param desc table descriptor for table + * + * @throws IllegalArgumentException if the table name is reserved + * @throws MasterNotRunningException if master is not running + * @throws TableExistsException if table already exists (If concurrent + * threads, the table may have been created between test-for-existence and + * attempt-at-creation). + * @throws IOException + */ + @Override + public void createTable(HTableDescriptor desc) throws IOException { + super.createTable(desc); + this.createIndexTables(desc); + } + + private void createIndexTables(HTableDescriptor tableDesc) throws IOException { + byte[] baseTableName = tableDesc.getName(); + for (IndexSpecification indexSpec : tableDesc.getIndexs()) { + HTableDescriptor indexTableDesc = createIndexTableDesc(baseTableName, + indexSpec); + super.createTable(indexTableDesc); + } + } + + private HTableDescriptor createIndexTableDesc(byte[] baseTableName, + IndexSpecification indexSpec) throws ColumnNameParseException { + HTableDescriptor indexTableDesc = new HTableDescriptor(indexSpec + .getIndexedTableName(baseTableName)); + Set families = new TreeSet(Bytes.BYTES_COMPARATOR); + families.add(IndexedTable.INDEX_COL_FAMILY); + for (byte[] column : indexSpec.getAllColumns()) { + families.add(Bytes.add(HStoreKey.getFamily(column), + new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER })); + } + + for (byte[] colFamily : families) { + indexTableDesc.addFamily(new HColumnDescriptor(colFamily)); + } + + indexTableDesc.setRowKeyComparator(indexSpec.getKeyComparator()); + + return indexTableDesc; + } +} Property changes on: src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java ___________________________________________________________________ Name: svn:keywords + Revision Date Id Index: src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java (revision 0) @@ -0,0 +1,59 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.hbase.util.Bytes; + +/** Creates indexed keys for a single column.... + * + */ +public class SimpleIndexKeyGenerator implements IndexKeyGenerator { + + private byte [] column; + + public SimpleIndexKeyGenerator(byte [] column) { + this.column = column; + } + + public SimpleIndexKeyGenerator() { + // For Writable + } + + /** {@inheritDoc} */ + public byte[] createIndexKey(byte[] rowKey, Map columns) { + return Bytes.add(columns.get(column), rowKey); + } + + /** {@inheritDoc} */ + public void readFields(DataInput in) throws IOException { + column = Bytes.readByteArray(in); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, column); + } + +} Property changes on: src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java ___________________________________________________________________ Name: svn:keywords + Revision Date Id Index: src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java (revision 0) @@ -0,0 +1,45 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.IOException; + +/** + * Thrown when asking for an index that does not exist. + */ +public class IndexNotFoundException extends IOException { + + public IndexNotFoundException() { + super(); + } + + public IndexNotFoundException(String arg0) { + super(arg0); + } + + public IndexNotFoundException(Throwable arg0) { + super(arg0); + } + + public IndexNotFoundException(String arg0, Throwable arg1) { + super(arg0, arg1); + } + +} Property changes on: src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java ___________________________________________________________________ Name: svn:keywords + Revision Date Id Index: src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java (revision 0) @@ -0,0 +1,46 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.WritableComparator; +import org.apache.hadoop.hbase.util.Bytes; + +public class ReverseByteArrayComparator implements WritableComparator { + + /** {@inheritDoc} */ + public int compare(byte[] o1, byte[] o2) { + return Bytes.compareTo(o2, o1); + } + + + /** {@inheritDoc} */ + public void readFields(DataInput arg0) throws IOException { + // Nothing + } + + /** {@inheritDoc} */ + public void write(DataOutput arg0) throws IOException { + // Nothing + } +} Property changes on: src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java ___________________________________________________________________ Name: svn:keywords + Revision Date Id Index: src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java (revision 0) @@ -0,0 +1,29 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.util.Map; + +import org.apache.hadoop.io.Writable; + +public interface IndexKeyGenerator extends Writable { + + byte [] createIndexKey(byte [] rowKey, Map columns); +} Property changes on: src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java ___________________________________________________________________ Name: svn:keywords + Revision Date Id Index: src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java (revision 0) @@ -0,0 +1,200 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.WritableComparator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** Holds the specification for a single secondary index. */ +public class IndexSpecification implements Writable { + + // Columns that are indexed (part of the indexRowKey) + private byte[][] indexedColumns; + + // Constructs the + private IndexKeyGenerator keyGenerator; + + private WritableComparator keyComparator; + + // Additional columns mapped into the indexed row. These will be available for + // filters when scanning the index. + private byte[][] additionalColumns; + + private byte[][] allColumns; + + // Id of this index, unique within a table. + private String indexId; + + /** Construct an "simple" index spec for a single column. */ + public IndexSpecification(String indexId, byte[] indexedColumn, + boolean acending) { + this(indexId, new byte[][] { indexedColumn }, null, + new SimpleIndexKeyGenerator(indexedColumn), acending == true ? null + : new ReverseByteArrayComparator()); + } + + /** + * Construct an index spec by specifying everything. + * + * @param indexId + * @param indexedColumns + * @param additionalColumns + * @param keyGenerator + * @param keyComparator + */ + public IndexSpecification(String indexId, byte[][] indexedColumns, + byte[][] additionalColumns, IndexKeyGenerator keyGenerator, + WritableComparator keyComparator) { + this.indexId = indexId; + this.indexedColumns = indexedColumns; + this.additionalColumns = additionalColumns; + this.keyGenerator = keyGenerator; + this.keyComparator = keyComparator; + this.makeAllColumns(); + } + + public IndexSpecification() { + // For writable + } + + private void makeAllColumns() { + this.allColumns = new byte[indexedColumns.length + + (additionalColumns == null ? 0 : additionalColumns.length)][]; + System.arraycopy(indexedColumns, 0, allColumns, 0, indexedColumns.length); + if (additionalColumns != null) { + System.arraycopy(additionalColumns, 0, allColumns, indexedColumns.length, + additionalColumns.length); + } + } + + /** + * Get the indexedColumns. + * + * @return Return the indexedColumns. + */ + public byte[][] getIndexedColumns() { + return indexedColumns; + } + + /** + * Get the keyGenerator. + * + * @return Return the keyGenerator. + */ + public IndexKeyGenerator getKeyGenerator() { + return keyGenerator; + } + + /** + * Get the keyComparator. + * + * @return Return the keyComparator. + */ + public WritableComparator getKeyComparator() { + return keyComparator; + } + + /** + * Get the additionalColumns. + * + * @return Return the additionalColumns. + */ + public byte[][] getAdditionalColumns() { + return additionalColumns; + } + + /** + * Get the indexId. + * + * @return Return the indexId. + */ + public String getIndexId() { + return indexId; + } + + public byte[][] getAllColumns() { + return allColumns; + } + + public boolean containsColumn(byte[] column) { + for (byte[] col : allColumns) { + if (Bytes.equals(column, col)) { + return true; + } + } + return false; + } + + public byte[] getIndexedTableName(byte[] baseTableName) { + return Bytes.add(baseTableName, Bytes.toBytes("-" + indexId)); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + public void readFields(DataInput in) throws IOException { + indexId = in.readUTF(); + int numIndexedCols = in.readInt(); + indexedColumns = new byte[numIndexedCols][]; + for (int i = 0; i < numIndexedCols; i++) { + indexedColumns[i] = Bytes.readByteArray(in); + } + int numAdditionalCols = in.readInt(); + additionalColumns = new byte[numAdditionalCols][]; + for (int i = 0; i < numAdditionalCols; i++) { + additionalColumns[i] = Bytes.readByteArray(in); + } + makeAllColumns(); + HBaseConfiguration conf = new HBaseConfiguration(); + keyGenerator = (IndexKeyGenerator) ObjectWritable.readObject(in, conf); + keyComparator = (WritableComparator) ObjectWritable.readObject(in, + conf); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + out.writeUTF(indexId); + out.writeInt(indexedColumns.length); + for (byte[] col : indexedColumns) { + Bytes.writeByteArray(out, col); + } + if (additionalColumns != null) { + out.writeInt(additionalColumns.length); + for (byte[] col : additionalColumns) { + Bytes.writeByteArray(out, col); + } + } else { + out.writeInt(0); + } + HBaseConfiguration conf = new HBaseConfiguration(); + ObjectWritable + .writeObject(out, keyGenerator, IndexKeyGenerator.class, conf); + ObjectWritable.writeObject(out, keyComparator, WritableComparable.class, + conf); + } + +} Property changes on: src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java ___________________________________________________________________ Name: svn:keywords + Revision Date Id Index: src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java (revision 0) @@ -0,0 +1,211 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.util.Bytes; + +/** HTable extended with indexed support. */ +public class IndexedTable extends HTable { + + // FIXME, these belong elsewhere + public static final byte[] INDEX_COL_FAMILY_NAME = Bytes.toBytes("__INDEX__"); + public static final byte[] INDEX_COL_FAMILY = Bytes.add( + INDEX_COL_FAMILY_NAME, new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER }); + public static final byte[] INDEX_BASE_ROW_COLUMN = Bytes.add( + INDEX_COL_FAMILY, Bytes.toBytes("ROW")); + + private static final Log LOG = LogFactory.getLog(IndexedTable.class); + + private Map indexIdToTable = new HashMap(); + + /** {@inheritDoc} */ + public IndexedTable(final HBaseConfiguration conf, final byte[] tableName) + throws IOException { + super(conf, tableName); + + for (IndexSpecification spec : super.getTableDescriptor().getIndexs()) { + indexIdToTable.put(spec.getIndexId(), new HTable(conf, spec + .getIndexedTableName(tableName))); + } + } + + /** + * Open up an indexed scanner. Results will come back in the indexed order, + * but will contain RowResults from the original table. + * + * @param indexId the id of the index to use + * @param indexStartRow (created from the IndexKeyGenerator) + * @param indexColumns in the index table + * @param indexFilter filter to run on the index'ed table. This can only use + * columns that have been added to the index. + * @param baseColumns from the original table + * @return scanner + * @throws IOException + * @throws IndexNotFoundException + */ + public Scanner getIndexedScanner(String indexId, final byte[] indexStartRow, + byte[][] indexColumns, final RowFilterInterface indexFilter, + final byte[][] baseColumns) throws IOException, IndexNotFoundException { + IndexSpecification indexSpec = super.getTableDescriptor().getIndex(indexId); + if (indexSpec == null) { + throw new IndexNotFoundException("Index " + indexId + + " not defined in table " + + super.getTableDescriptor().getNameAsString()); + } + verifyIndexColumns(indexColumns, indexSpec); + // TODO, verify/remove index columns from baseColumns + + HTable indexTable = indexIdToTable.get(indexId); + + // Add the IndexRowColumn + byte[][] allIndexColumns = new byte[(indexColumns == null ? 0 + : indexColumns.length) + 1][]; + if (indexColumns != null) { + System + .arraycopy(indexColumns, 0, allIndexColumns, 0, indexColumns.length); + allIndexColumns[indexColumns.length] = INDEX_BASE_ROW_COLUMN; + } else { + allIndexColumns[0] = INDEX_BASE_ROW_COLUMN; + } + + Scanner indexScanner = indexTable.getScanner(allIndexColumns, + indexStartRow, indexFilter); + + return new ScannerWrapper(indexScanner, baseColumns); + } + + private void verifyIndexColumns(byte[][] requestedColumns, + IndexSpecification indexSpec) { + if (requestedColumns == null) { + return; + } + for (byte[] requestedColumn : requestedColumns) { + boolean found = false; + for (byte[] indexColumn : indexSpec.getAllColumns()) { + if (Bytes.equals(requestedColumn, indexColumn)) { + found = true; + break; + } + } + if (!found) { + throw new RuntimeException("Column [" + Bytes.toString(requestedColumn) + + "] not in index " + indexSpec.getIndexId()); + } + } + } + + private class ScannerWrapper implements Scanner { + + private Scanner indexScanner; + private byte[][] columns; + + public ScannerWrapper(Scanner indexScanner, byte[][] columns) { + this.indexScanner = indexScanner; + this.columns = columns; + } + + /** {@inheritDoc} */ + public RowResult next() throws IOException { + RowResult indexResult = indexScanner.next(); + if (indexResult == null) { + return null; + } + + byte[] baseRow = indexResult.get(INDEX_BASE_ROW_COLUMN).getValue(); + + LOG.debug("next index row [" + Bytes.toString(indexResult.getRow()) + + "] -> base row [" + Bytes.toString(baseRow) + "]"); + + HbaseMapWritable colValues = new HbaseMapWritable(); + + RowResult baseResult = IndexedTable.this.getRow(baseRow, columns); + colValues.putAll(baseResult); + + for (Entry entry : indexResult.entrySet()) { + byte[] col = entry.getKey(); + if (HStoreKey.matchingFamily(INDEX_COL_FAMILY_NAME, col)) { + continue; + } + + // FIXME, only put if they asked for it? + colValues.put(col, entry.getValue()); + + } + + return new RowResult(baseRow, colValues); + } + + /** {@inheritDoc} */ + public void close() { + indexScanner.close(); + } + + /** {@inheritDoc} */ + public Iterator iterator() { + // FIXME, copied from HTable.ClientScanner. Extract this to common base + // class? + return new Iterator() { + RowResult next = null; + + public boolean hasNext() { + if (next == null) { + try { + next = ScannerWrapper.this.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + public RowResult next() { + if (!hasNext()) { + return null; + } + RowResult temp = next; + next = null; + return temp; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + } +} Property changes on: src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java ___________________________________________________________________ Name: svn:keywords + Revision Date Id Index: src/java/org/apache/hadoop/hbase/client/tableindexed/package.html =================================================================== --- src/java/org/apache/hadoop/hbase/client/tableindexed/package.html (revision 0) +++ src/java/org/apache/hadoop/hbase/client/tableindexed/package.html (revision 0) @@ -0,0 +1,47 @@ + + + + + + + + +This package provides support for secondary indexing by maintaining a separate, "index", table for each index. + +The IndexSpecification class provides the metadata for the index. This includes: +
  • the columns that contribute to the index key, +
  • additional columns to put in the index table (and are thus made available to filters on the index table), +
  • an IndexKeyGenerator which constructs the index-row-key from the indexed column(s) and the original row, +
    and +
  • (optionally) a custom key comparator for the indexed table. This can allow an index on a deserialized column value. + +IndexesSpecifications can be added to a table's metadata (HTableDescriptor) before the table is constructed. +Afterwards, updates and deletes to the original table will trigger the updates in the index, and +the indexes can be scanned using the API on IndexedTable. + +For a simple example, look at the unit test in org.apache.hadoop.hbase.client.tableIndexed. + +

    To enable the indexing, modify hbase-site.xml to turn on the +IndexedRegionServer. This is done by setting +hbase.regionserver.class to +org.apache.hadoop.hbase.ipc.IndexedRegionInterface and +hbase.regionserver.impl to +org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer + + + Index: src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (revision 700621) +++ src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (working copy) @@ -28,17 +28,19 @@ import org.apache.hadoop.hbase.client.Scanner; import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.client.tableindexed.IndexedTable; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.util.Bytes; /** * Table with transactional support. * */ -public class TransactionalTable extends HTable { +public class TransactionalTable extends IndexedTable { /** * @param conf @@ -47,7 +49,7 @@ */ public TransactionalTable(final HBaseConfiguration conf, final String tableName) throws IOException { - super(conf, tableName); + this(conf, Bytes.toBytes(tableName)); } /** Index: src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java (revision 700621) +++ src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java (working copy) @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; /** * Read-only table descriptor. @@ -37,7 +38,7 @@ * @param desc */ UnmodifyableHTableDescriptor(final HTableDescriptor desc) { - super(desc.getName(), getUnmodifyableFamilies(desc), desc.getValues()); + super(desc.getName(), getUnmodifyableFamilies(desc), desc.getIndexs(), desc.getValues()); } /* @@ -108,4 +109,9 @@ public void setMemcacheFlushSize(int memcacheFlushSize) { throw new UnsupportedOperationException("HTableDescriptor is read-only"); } + + @Override + public void addIndex(IndexSpecification index) { + throw new UnsupportedOperationException("HTableDescriptor is read-only"); + } }