Index: src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestRowBasedIndexedTable.java =================================================================== --- src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestRowBasedIndexedTable.java (revision 0) +++ src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/tableindexed/TestRowBasedIndexedTable.java (revision 0) @@ -0,0 +1,252 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.IOException; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RowLock; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.tableindexed.RowBasedIndexedRegionServer; +import org.apache.hadoop.hbase.util.Bytes; + +public class TestRowBasedIndexedTable extends HBaseClusterTestCase { + + private static final Log LOG = LogFactory.getLog(TestRowBasedIndexedTable.class); + + private static final String TABLE_NAME = "table1"; + + private static final byte[] FAMILY_COLON = Bytes.toBytes("family:"); + private static final byte[] FAMILY = Bytes.toBytes("family"); + private static final byte[] QUAL_A = Bytes.toBytes("a"); + private static final byte[] COL_A = Bytes.toBytes("family:a"); + private static final byte[] VALUE_A = Bytes.toBytes("0000009962"); + private static final String INDEX_COL_A = "A"; + + private static final int NUM_ROWS = 10; + private static final int MAX_VAL = 10000; + + private IndexedTableAdmin admin; + private RowBasedIndexedTable table; + private Random random = new Random(); + private HTableDescriptor desc; + + /** constructor */ + public TestRowBasedIndexedTable() { + conf + .set(HConstants.REGION_SERVER_IMPL, RowBasedIndexedRegionServer.class.getName()); + conf.setInt("hbase.master.info.port", -1); + conf.setInt("hbase.regionserver.info.port", -1); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + desc = new HTableDescriptor(TABLE_NAME); + desc.addFamily(new HColumnDescriptor(FAMILY_COLON)); + + IndexedTableDescriptor indexDesc = new IndexedTableDescriptor(desc); + // Create a new index that does lexicographic ordering on COL_A + RowBasedIndexSpecification colAIndex = new RowBasedIndexSpecification(INDEX_COL_A, COL_A); + indexDesc.addIndex(colAIndex); + + admin = new IndexedTableAdmin(conf); + admin.createIndexedTable(indexDesc); + table = new RowBasedIndexedTable(conf, desc.getName()); + } + + private void writeInitalRows() throws IOException { + for (int i = 0; i < NUM_ROWS; i++) { + Put update = new Put(PerformanceEvaluation.format(i)); + update.add(FAMILY, QUAL_A, VALUE_A); + table.put(update); + LOG.info("Inserted row [" + Bytes.toString(update.getRow()) + "] val: [" + + Bytes.toString(VALUE_A) + "]"); + } + } + + + public void testInitialWrites() throws IOException { + writeInitalRows(); + assertRowsInOrder(NUM_ROWS); + } + + private void assertRowsInOrder(int numRowsExpected) + throws IndexNotFoundException, IOException { + ResultScanner scanner = table.getIndexedScanner(INDEX_COL_A, VALUE_A, COL_A, null); + int numRows = 0; + byte[] lastColA = null; + for (Result rowResult : scanner) { + byte[] colA = rowResult.getValue(COL_A); + LOG.info("index scan : row [" + Bytes.toString(rowResult.getRow()) + + "] value [" + Bytes.toString(colA) + "]"); + if (lastColA != null) { + Assert.assertTrue(Bytes.compareTo(lastColA, colA) <= 0); + } + lastColA = colA; + numRows++; + } + scanner.close(); + Assert.assertEquals(numRowsExpected, numRows); + } + + private void assertRowUpdated(int updatedRow, int expectedRowValue) + throws IndexNotFoundException, IOException { + byte[] expectedRowValueBytes = PerformanceEvaluation.format(expectedRowValue); + ResultScanner scanner = table.getIndexedScanner(INDEX_COL_A, expectedRowValueBytes, COL_A, null); + byte[] persistedRowValue = null; + for (Result rowResult : scanner) { + byte[] row = rowResult.getRow(); + byte[] value = rowResult.getValue(COL_A); + if (Bytes.toString(row).equals(Bytes.toString(PerformanceEvaluation.format(updatedRow)))) { + persistedRowValue = value; + LOG.info("update found: row [" + Bytes.toString(row) + + "] value [" + Bytes.toString(value) + "]"); + } + else + LOG.info("updated index scan : row [" + Bytes.toString(row) + + "] value [" + Bytes.toString(value) + "]"); + } + scanner.close(); + + Assert.assertEquals(Bytes.toString(expectedRowValueBytes), + Bytes.toString(persistedRowValue)); + } + + private void assertRowDeleted(int numRowsExpected) + throws IndexNotFoundException, IOException { + // Check the size of the primary table + ResultScanner scanner = table.getScanner(new Scan()); + int numRows = 0; + for (Result rowResult : scanner) { + byte[] colA = rowResult.getValue(FAMILY, QUAL_A); + LOG.info("primary scan : row [" + Bytes.toString(rowResult.getRow()) + + "] value [" + Bytes.toString(colA) + "]"); + numRows++; + } + scanner.close(); + Assert.assertEquals(numRowsExpected, numRows); + + // Check the size of the index tables + assertRowsInOrder(numRowsExpected); + } + + private void updateRow(int row, int newValue) throws IOException { + Put update = new Put(PerformanceEvaluation.format(row)); + byte[] valueA = PerformanceEvaluation.format(newValue); + update.add(FAMILY, QUAL_A, valueA); + table.put(update); + LOG.info("Updated row [" + Bytes.toString(update.getRow()) + "] val: [" + + Bytes.toString(valueA) + "]"); + } + + private void updateLockedRow(int row, int newValue) throws IOException { + RowLock lock = table.lockRow(PerformanceEvaluation.format(row)); + Put update = new Put(PerformanceEvaluation.format(row), lock); + byte[] valueA = PerformanceEvaluation.format(newValue); + update.add(FAMILY, QUAL_A, valueA); + LOG.info("Updating row [" + Bytes.toString(update.getRow()) + "] val: [" + + Bytes.toString(valueA) + "]"); + table.put(update); + LOG.info("Updated row [" + Bytes.toString(update.getRow()) + "] val: [" + + Bytes.toString(valueA) + "]"); + table.unlockRow(lock); + } + + private void updateLockedRowNoAutoFlush(int row, int newValue) throws IOException { + table.flushCommits(); + table.setAutoFlush(false); + RowLock lock = table.lockRow(PerformanceEvaluation.format(row)); + Put update = new Put(PerformanceEvaluation.format(row), lock); + byte[] valueA = PerformanceEvaluation.format(newValue); + update.add(FAMILY, QUAL_A, valueA); + LOG.info("Updating row [" + Bytes.toString(update.getRow()) + "] val: [" + + Bytes.toString(valueA) + "]"); + table.put(update); + LOG.info("Updated row [" + Bytes.toString(update.getRow()) + "] val: [" + + Bytes.toString(valueA) + "]"); + table.flushCommits(); + table.close(); + table = new RowBasedIndexedTable(conf, desc.getName()); + } + + public void testMultipleWrites() throws IOException { + writeInitalRows(); + writeInitalRows(); // Update the rows. + assertRowsInOrder(NUM_ROWS); + } + + public void testDelete() throws IOException { + writeInitalRows(); + // Delete the first row; + table.deleteAll(PerformanceEvaluation.format(0)); + + assertRowsInOrder(NUM_ROWS - 1); + } + + public void testRowUpdate() throws IOException { + writeInitalRows(); + int row = NUM_ROWS - 2; + int value = MAX_VAL + 111; + updateRow(row, value); + assertRowUpdated(row, value); + } + + public void testLockedRowUpdate() throws IOException { + writeInitalRows(); + int row = NUM_ROWS - 2; + int value = MAX_VAL + 111; + updateLockedRow(row, value); + assertRowUpdated(row, value); + } + + public void testLockedRowUpdateNoAutoFlush() throws IOException { + writeInitalRows(); + int row = NUM_ROWS - 4; + int value = MAX_VAL + 2222; + updateLockedRowNoAutoFlush(row, value); + assertRowUpdated(row, value); + } + + public void testLockedRowDelete() throws IOException { + writeInitalRows(); + // Delete the first row; + byte[] row = PerformanceEvaluation.format(0); + RowLock lock = table.lockRow(row); + table.delete(new Delete(row, HConstants.LATEST_TIMESTAMP, lock)); + table.unlockRow(lock); + + assertRowDeleted(NUM_ROWS - 1); + } +} Index: src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java =================================================================== --- src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java (revision 931965) +++ src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java (working copy) @@ -53,8 +53,8 @@ static final Log LOG = LogFactory.getLog(IndexedTable.class); - private final IndexedTableDescriptor indexedTableDescriptor; - private Map indexIdToTable = new HashMap(); + protected final IndexedTableDescriptor indexedTableDescriptor; + protected Map indexIdToTable = new HashMap(); public IndexedTable(final HBaseConfiguration conf, final byte[] tableName) throws IOException { @@ -126,7 +126,7 @@ return new ScannerWrapper(indexScanner, baseColumns); } - private void verifyIndexColumns(byte[][] requestedColumns, + protected void verifyIndexColumns(byte[][] requestedColumns, IndexSpecification indexSpec) { if (requestedColumns == null) { return; Index: src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/RowBasedIndexKeyGenerator.java =================================================================== --- src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/RowBasedIndexKeyGenerator.java (revision 0) +++ src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/RowBasedIndexKeyGenerator.java (revision 0) @@ -0,0 +1,61 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Creates indexed keys for secondary indices defined using + * {@link RowBasedIndexSpecification}. RowBasedIndexKeyGenerator constructs the + * index-row-key from the indexed column value in the original column. + */ +public class RowBasedIndexKeyGenerator implements IndexKeyGenerator { + + private byte [] column; + + public RowBasedIndexKeyGenerator(byte [] column) { + this.column = column; + } + + public RowBasedIndexKeyGenerator() { + // For Writable + } + + /** {@inheritDoc} */ + public byte[] createIndexKey(byte[] rowKey, Map columns) { + return columns.get(column); + } + + /** {@inheritDoc} */ + public void readFields(DataInput in) throws IOException { + column = Bytes.readByteArray(in); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, column); + } + +} Index: src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/RowBasedIndexedTable.java =================================================================== --- src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/RowBasedIndexedTable.java (revision 0) +++ src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/RowBasedIndexedTable.java (revision 0) @@ -0,0 +1,196 @@ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; + +public class RowBasedIndexedTable extends IndexedTable +{ + + public RowBasedIndexedTable(HBaseConfiguration conf, byte[] tableName) throws IOException + { + super(conf, tableName); + } + + /** + * Open up an indexed scanner on a {@link RowBasedIndexSpecification} type + * index. Results will come back in the indexed order, but will contain + * RowResults from the original table. The indexRow parameter is always + * required and corresponds to the value of the indexed based table column + * against which we wish to filter. This value is used to limit the secondary + * index table scan to the specific row containing the indexed values. + * + * @param indexId + * @param indexRow + * Required: should be the value in the base table column against + * which we want to filter. + * @param indexColumn + * Required so that we verify that we are querying against the + * correct index. + * @param baseColumns + * @return + * @throws IOException + * @throws IndexNotFoundException + * @throws IllegalArgumentException + */ + public ResultScanner getIndexedScanner(String indexId, final byte[] indexRow, + byte[] indexColumn, final byte[][] baseColumns) throws IOException, IndexNotFoundException, IllegalArgumentException { + if(null==indexRow) { + throw new IllegalArgumentException("the index row parameter cannot be null"); + } + if(null==indexColumn) { + throw new IllegalArgumentException("the index column parameter cannot be null"); + } + IndexSpecification indexSpec = this.indexedTableDescriptor.getIndex(indexId); + if (indexSpec == null) { + throw new IndexNotFoundException("Index " + indexId + + " not defined in table " + + super.getTableDescriptor().getNameAsString()); + } + verifyIndexColumns((null!=indexColumn)?new byte[][]{indexColumn}:null, indexSpec); + // TODO, verify/remove index columns from baseColumns + + HTable indexTable = indexIdToTable.get(indexId); + + // Start and stop on the same row. + Scan indexScan = new Scan(); + indexScan.setStartRow(indexRow); + indexScan.setStopRow(Bytes.toBytes(Bytes.toString(indexRow).concat("z"))); + ResultScanner indexScanner = indexTable.getScanner(indexScan); + + return new RowBasedScannerWrapper(indexScanner, baseColumns, indexRow); + } + + private class RowBasedScannerWrapper implements ResultScanner { + + private ResultScanner indexScanner; + private byte[][] columns; + private Result indexResult; + private Iterator qualifiers; + private byte[] indexRow; + + public RowBasedScannerWrapper(ResultScanner indexScanner, byte[][] columns, final byte[] indexRow) { + this.indexScanner = indexScanner; + this.columns = columns; + this.indexRow = indexRow; + } + + /** {@inheritDoc} */ + public Result next() throws IOException { + Result[] result = next(1); + if (result == null || result.length < 1) + return null; + return result[0]; + } + + /** {@inheritDoc} */ + public Result[] next(int nbRows) throws IOException { + if(this.indexResult==null) { + indexResult = indexScanner.next(); + if (indexResult == null) { + LOG.debug("Missing index row for indexed key: ["+Bytes.toString(this.indexRow)+"]"); + return null; + } + qualifiers = indexResult.getFamilyMap(INDEX_COL_FAMILY_NAME).navigableKeySet().iterator(); + } + List resultsArray = new ArrayList(); + int resultCount = nbRows; + while(qualifiers.hasNext() && resultCount>0) { + byte[] baseRow = qualifiers.next(); + LOG.debug("next index column [" + Bytes.toString(baseRow) + "]"); + Result baseResult = null; + if (columns == null || columns.length == 0) { + Set baseColumnsByte = new HashSet(); + if(null!=columns) + { + for (byte[] baseColumn : columns) + baseColumnsByte.add(baseColumn); + } + else + baseColumnsByte = RowBasedIndexedTable.this.getTableDescriptor().getFamiliesKeys(); + columns=baseColumnsByte.toArray(new byte[0][0]); + } + LOG.debug("Going to base table for remaining columns"); + Get baseGet = new Get(baseRow); + baseGet.addColumns(columns); + baseResult = RowBasedIndexedTable.this.get(baseGet); + + List results = new ArrayList(); + if (baseResult != null) { + List list = baseResult.list(); + if (list != null) { + results.addAll(list); + } + } + + resultsArray.add(new Result(results)); + resultCount--; + } + + return resultsArray.toArray(new Result[]{}); + } + + /** {@inheritDoc} */ + public void close() { + indexScanner.close(); + } + + // Copied from HTable.ClientScanner.iterator() + public Iterator iterator() { + return new Iterator() { + // The next RowResult, possibly pre-read + Result next = null; + + // return true if there is another item pending, false if there isn't. + // this method is where the actual advancing takes place, but you need + // to call next() to consume it. hasNext() will only advance if there + // isn't a pending next(). + public boolean hasNext() { + if (next == null) { + try { + next = RowBasedScannerWrapper.this.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + // get the pending next item and advance the iterator. returns null if + // there is no next item. + public Result next() { + // since hasNext() does the real advancing, we call this to determine + // if there is a next before proceeding. + if (!hasNext()) { + return null; + } + + // if we get to here, then hasNext() has given us an item to return. + // we want to return the item and then null out the next pointer, so + // we use a temporary variable. + Result temp = next; + next = null; + return temp; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + } +} Index: src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/RowBasedIndexSpecification.java =================================================================== --- src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/RowBasedIndexSpecification.java (revision 0) +++ src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/RowBasedIndexSpecification.java (revision 0) @@ -0,0 +1,27 @@ +package org.apache.hadoop.hbase.client.tableindexed; + + +/** + * Specialized {@link IndexSpecification} creating row-based secondary index + * tables. Base table rows with the same indexed column value have their row + * keys stored as column qualifiers on the same secondary index table row. The + * key for that row is the indexed column value from the base table. This allows + * to avoid expensive secondary index table scans and provides faster access for + * applications such as foreign key indexing or queries such as + * "find all table A rows whose familyA:columnB value is X". + */ +public class RowBasedIndexSpecification extends IndexSpecification { + + /** + * Construct a row-based index spec for a single column using a + * {@link RowBasedIndexKeyGenerator} under the covers. + * + * @param indexId + * @param indexedColumn + */ + public RowBasedIndexSpecification(String indexId, byte[] indexedColumn) { + super(indexId, new byte[][] { indexedColumn }, null, + new RowBasedIndexKeyGenerator(indexedColumn)); + } + +} Index: src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html =================================================================== --- src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html (revision 931965) +++ src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html (working copy) @@ -31,11 +31,11 @@
and
  • an IndexKeyGenerator which constructs the index-row-key from the indexed column(s) and the original row. -IndexesSpecifications can be added to a table's metadata (HTableDescriptor) before the table is constructed. +IndexSpecifications can be added to a table's metadata (HTableDescriptor) before the table is constructed. Afterwards, updates and deletes to the original table will trigger the updates in the index, and -the indexes can be scanned using the API on IndexedTable. +the indices can be scanned using the API on IndexedTable. -For a simple example, look at the unit test in org.apache.hadoop.hbase.client.tableIndexed. +For a simple example, look at the TestIndexedTable unit test in org.apache.hadoop.hbase.client.tableIndexed.

    To enable the indexing, modify hbase-site.xml to turn on the IndexedRegionServer. This is done by setting @@ -43,6 +43,31 @@ org.apache.hadoop.hbase.ipc.IndexedRegionInterface and hbase.regionserver.impl to org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer - + +RowBasedIndexSpecification is a specialized IndexSpecification class for +creating row-based secondary index tables. Base table rows with the same +indexed column value have their row keys stored as column qualifiers on +the same secondary index table row. The key for that row is the indexed +column value from the base table. This allows to avoid expensive +secondary index table scans and provides faster access for applications +such as foreign key indexing or queries such as "find all table A rows +whose familyA:columnB value is X". RowBasedIndexSpecification indices +can be scanned using the API on RowBasedIndexedTable. The metadata for +RowBasedIndexSpecification differ from IndexSpecification in that: + +

  • only a single base table column can be indexed per RowBasedIndexSpecification. +No additional columns are put in the index table. +
    and +
  • RowBasedIndexKeyGenerator, which constructs the index-row-key from the indexed +column value in the original column, is always used. + +For a simple RowBasedIndexSpecification example, look at the TestRowBasedIndexedTable unit test in org.apache.hadoop.hbase.client.tableIndexed. + +

    To enable RowBasedIndexSpecification indexing, modify hbase-site.xml to turn on the +IndexedRegionServer. This is done by setting +hbase.regionserver.class to +org.apache.hadoop.hbase.ipc.IndexedRegionInterface and +hbase.regionserver.impl to +org.apache.hadoop.hbase.regionserver.tableindexed.RowBasedIndexedRegionServer Index: src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/RowBasedIndexedRegionServer.java =================================================================== --- src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/RowBasedIndexedRegionServer.java (revision 0) +++ src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/RowBasedIndexedRegionServer.java (revision 0) @@ -0,0 +1,44 @@ +package org.apache.hadoop.hbase.regionserver.tableindexed; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.tableindexed.RowBasedIndexSpecification; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.util.Progressable; + +/** + * Specialized {@link IndexedRegionServer} instantiating + * {@link RowBasedIndexedRegion}s so that secondary indexes defined through + * {@link RowBasedIndexSpecification} can be used. + * + *

    + * To enable secondary indexes defined using {@link RowBasedIndexSpecification}, + * modify hbase-site.xml to turn on the RowBasedIndexedRegionServer. This is + * done by setting hbase.regionserver.class to + * org.apache.hadoop.hbase.ipc.IndexedRegionInterface and + * hbase.regionserver.impl to + * org.apache.hadoop.hbase.regionserver.tableindexed.RowBasedIndexedRegionServer + **/ +public class RowBasedIndexedRegionServer extends IndexedRegionServer { + + public RowBasedIndexedRegionServer(HBaseConfiguration conf) throws IOException { + super(conf); + } + + @Override + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + HRegion r = new RowBasedIndexedRegion(HTableDescriptor.getTableDir(super + .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super + .getFileSystem(), super.conf, regionInfo, super.getFlushRequester(), super.getTransactionalLeases()); + r.initialize(null, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + return r; + } +} Index: src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/RowBasedIndexedRegion.java =================================================================== --- src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/RowBasedIndexedRegion.java (revision 0) +++ src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/RowBasedIndexedRegion.java (revision 0) @@ -0,0 +1,73 @@ +package org.apache.hadoop.hbase.regionserver.tableindexed; + +import java.io.IOException; +import java.util.SortedMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Leases; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; +import org.apache.hadoop.hbase.client.tableindexed.IndexedTable; +import org.apache.hadoop.hbase.client.tableindexed.RowBasedIndexKeyGenerator; +import org.apache.hadoop.hbase.client.tableindexed.RowBasedIndexSpecification; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HLog; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Specialized {@link IndexedRegion} sub-class designed to work exclusively with + * {@link RowBasedIndexSpecification} and {@link RowBasedIndexKeyGenerator} by + * performing tailored secondary index table updates and deletes. + */ +public class RowBasedIndexedRegion extends IndexedRegion { + + private static final Log LOG = LogFactory.getLog(RowBasedIndexedRegion.class); + + public RowBasedIndexedRegion(Path basedir, HLog log, FileSystem fs, + HBaseConfiguration conf, HRegionInfo regionInfo, + FlushRequester flushListener, Leases trxLeases) throws IOException { + super(basedir, log, fs, conf, regionInfo, flushListener, trxLeases); + } + + @Override + protected Delete makeDeleteToRemoveOldIndexEntry(IndexSpecification indexSpec, byte[] row, + SortedMap oldColumnValues) throws IOException { + for (byte[] indexedCol : indexSpec.getIndexedColumns()) { + if (!oldColumnValues.containsKey(indexedCol)) { + LOG.debug("Index [" + indexSpec.getIndexId() + + "] not trying to remove old entry for row [" + + Bytes.toString(row) + "] because col [" + + Bytes.toString(indexedCol) + "] is missing"); + return null; + } + } + + byte[] oldIndexRow = indexSpec.getKeyGenerator().createIndexKey(row, + oldColumnValues); + LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry [" + + Bytes.toString(oldIndexRow) + "]"); + Delete del = new Delete(oldIndexRow); + del.deleteColumns(IndexedTable.INDEX_COL_FAMILY_NAME, row); + return del; + } + + @Override + protected Put makeIndexUpdate(IndexSpecification indexSpec, byte[] row, + SortedMap columnValues) throws IOException { + byte[] indexRow = indexSpec.getKeyGenerator().createIndexKey(row, columnValues); + Put indexUpdate = new Put(indexRow); + indexUpdate.add(IndexedTable.INDEX_COL_FAMILY_NAME, row, new byte[]{1}); + LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry [" + + Bytes.toString(indexUpdate.getRow()) + "] for row [" + + Bytes.toString(row) + "]"); + + return indexUpdate; + + } +} Index: src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java =================================================================== --- src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (revision 931965) +++ src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (working copy) @@ -180,7 +180,7 @@ return neededColumns; } - private Delete makeDeleteToRemoveOldIndexEntry(IndexSpecification indexSpec, byte[] row, + protected Delete makeDeleteToRemoveOldIndexEntry(IndexSpecification indexSpec, byte[] row, SortedMap oldColumnValues) throws IOException { for (byte[] indexedCol : indexSpec.getIndexedColumns()) { if (!oldColumnValues.containsKey(indexedCol)) { @@ -227,7 +227,7 @@ return false; } - private Put makeIndexUpdate(IndexSpecification indexSpec, byte[] row, + protected Put makeIndexUpdate(IndexSpecification indexSpec, byte[] row, SortedMap columnValues) throws IOException { Put indexUpdate = IndexMaintenanceUtils.createIndexUpdate(indexSpec, row, columnValues); LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry ["