Index: src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java (revision 0) @@ -0,0 +1,306 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.TimestampsFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import com.sun.xml.bind.v2.runtime.unmarshaller.XsiNilLoader.Array; + +/** + * Run tests related to {@link TimestampsFilter} using HBase client APIs. + * Sets up the HBase mini cluster once at start. Each creates a table + * named for the method and does its stuff against that. + */ +public class TestTimestampsFilter { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + /** + * Test from client side for TimestampsFilter. + * + * The TimestampsFilter provides the ability to request cells (KeyValues) + * whose timestamp/version is in the specified list of timestamps/version. + * + * @throws Exception + */ + @Test + public void testTimestampsFilter() throws Exception { + byte [] TABLE = Bytes.toBytes("testTimestampsFilter"); + byte [] FAMILY = Bytes.toBytes("event_log"); + byte [][] FAMILIES = new byte[][] { FAMILY }; + KeyValue kvs[]; + + // create table; set versions to max... + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE); + + for (int rowIdx = 0; rowIdx < 5; rowIdx++) { + for (int colIdx = 0; colIdx < 5; colIdx++) { + // insert versions 201..300 + putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300); + // insert versions 1..100 + putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100); + } + } + + // do some verification before flush + verifyInsertedValues(ht, FAMILY); + + TEST_UTIL.flush(); + // give the flush some time since it is asynchronous... + try { + Thread.sleep(2000); + } catch (InterruptedException i) { + //ignore + } + + // do some verification after flush + verifyInsertedValues(ht, FAMILY); + + // Insert some more versions after flush. These should be in memstore. + // After this we should have data in both memstore & HFiles. + for (int rowIdx = 0; rowIdx < 5; rowIdx++) { + for (int colIdx = 0; colIdx < 5; colIdx++) { + putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400); + putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200); + } + } + + for (int rowIdx = 0; rowIdx < 5; rowIdx++) { + for (int colIdx = 0; colIdx < 5; colIdx++) { + kvs = getNVersions(ht, FAMILY, rowIdx, colIdx, + Arrays.asList(505L, 5L, 105L, 305L, 205L)); + assertEquals(4, kvs.length); + assertOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305); + assertOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205); + assertOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105); + assertOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5); + } + } + + // Request an empty list of versions using the Timestamps filter; + // Should return none. + kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList()); + assertEquals(0, kvs.length); + + // + // Test the filter using a Scan operation + // Scan rows 0..4. For each row, get all its columns, but only + // those versions of the columns with the specified timestamps. + Result[] results = scanNVersions(ht, FAMILY, 0, 4, + Arrays.asList(6L, 106L, 306L)); + assertEquals("# of rows returned from scan", 5, results.length); + for (int rowIdx = 0; rowIdx < 5; rowIdx++) { + kvs = results[rowIdx].raw(); + // each row should have 5 columns. + // And we have requested 3 versions for each. + assertEquals("Number of KeyValues in result for row:" + rowIdx, + 3*5, kvs.length); + for (int colIdx = 0; colIdx < 5; colIdx++) { + int offset = colIdx * 3; + assertOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306); + assertOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106); + assertOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6); + } + } + } + + private void verifyInsertedValues(HTable ht, byte[] cf) throws IOException { + for (int rowIdx = 0; rowIdx < 5; rowIdx++) { + for (int colIdx = 0; colIdx < 5; colIdx++) { + // ask for versions that exist. + KeyValue[] kvs = getNVersions(ht, cf, rowIdx, colIdx, + Arrays.asList(5L, 300L, 6L, 80L)); + assertEquals(4, kvs.length); + assertOneCell(kvs[0], cf, rowIdx, colIdx, 300); + assertOneCell(kvs[1], cf, rowIdx, colIdx, 80); + assertOneCell(kvs[2], cf, rowIdx, colIdx, 6); + assertOneCell(kvs[3], cf, rowIdx, colIdx, 5); + + // ask for versions that do not exist. + kvs = getNVersions(ht, cf, rowIdx, colIdx, + Arrays.asList(101L, 102L)); + assertEquals(0, kvs.length); + + // ask for some versions that exist and some that do not. + kvs = getNVersions(ht, cf, rowIdx, colIdx, + Arrays.asList(1L, 300L, 105L, 70L, 115L)); + assertEquals(3, kvs.length); + assertOneCell(kvs[0], cf, rowIdx, colIdx, 300); + assertOneCell(kvs[1], cf, rowIdx, colIdx, 70); + assertOneCell(kvs[2], cf, rowIdx, colIdx, 1); + } + } + } + + /** + * Assert that the passed in KeyValue has expected contents for the + * specified row, column & timestamp. + */ + private void assertOneCell(KeyValue kv, byte[] cf, + int rowIdx, int colIdx, long ts) { + + String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts; + + assertEquals("Row mismatch which checking: " + ctx, + "row:"+ rowIdx, Bytes.toString(kv.getRow())); + + assertEquals("ColumnFamily mismatch while checking: " + ctx, + Bytes.toString(cf), Bytes.toString(kv.getFamily())); + + assertEquals("Column qualifier mismatch while checking: " + ctx, + "column:" + colIdx, + Bytes.toString(kv.getQualifier())); + + assertEquals("Timestamp mismatch while checking: " + ctx, + ts, kv.getTimestamp()); + + assertEquals("Value mismatch while checking: " + ctx, + "value-version-" + ts, Bytes.toString(kv.getValue())); + } + + /** + * Uses the TimestampFilter on a Get to request a specified list of + * versions for the row/column specified by rowIdx & colIdx. + * + */ + private KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx, + int colIdx, List versions) + throws IOException { + byte row[] = Bytes.toBytes("row:" + rowIdx); + byte column[] = Bytes.toBytes("column:" + colIdx); + Filter filter = new TimestampsFilter(versions); + Get get = new Get(row); + get.addColumn(cf, column); + get.setFilter(filter); + get.setMaxVersions(); + Result result = ht.get(get); + + return result.raw(); + } + + /** + * Uses the TimestampFilter on a Scan to request a specified list of + * versions for the rows from startRowIdx to endRowIdx (both inclusive). + */ + private Result[] scanNVersions(HTable ht, byte[] cf, int startRowIdx, + int endRowIdx, List versions) + throws IOException { + byte startRow[] = Bytes.toBytes("row:" + startRowIdx); + byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive + Filter filter = new TimestampsFilter(versions); + Scan scan = new Scan(startRow, endRow); + scan.setFilter(filter); + scan.setMaxVersions(); + ResultScanner scanner = ht.getScanner(scan); + return scanner.next(endRowIdx - startRowIdx + 1); + } + + /** + * Insert in specific row/column versions with timestamps + * versionStart..versionEnd. + */ + private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx, + long versionStart, long versionEnd) + throws IOException { + byte row[] = Bytes.toBytes("row:" + rowIdx); + byte column[] = Bytes.toBytes("column:" + colIdx); + Put put = new Put(row); + + for (long idx = versionStart; idx <= versionEnd; idx++) { + put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx)); + } + + ht.put(put); + } + +} + Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 960691) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -22,9 +22,12 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.TimestampsFilter; import org.apache.hadoop.hbase.filter.Filter.ReturnCode; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; +import java.io.IOException; import java.util.NavigableSet; /** @@ -53,6 +56,18 @@ this.startKey = KeyValue.createFirstOnRow(scan.getStartRow()); this.filter = scan.getFilter(); + if (this.filter instanceof TimestampsFilter) { + // optimize by updating/narrowing down the time range + // to be based on the min/max timestamp in requested list + // of timestamps. + TimestampsFilter tsFilter = (TimestampsFilter)(this.filter); + try { + this.tr = new TimeRange(tsFilter.getMin(), tsFilter.getMax()); + } catch (IOException e) { + // should never happen because we have arranged max to be >= min. + } + } + // Single branch to deal with two types of reads (columns vs all in family) if (columns == null || columns.size() == 0) { // use a specialized scan for wildcard column tracker. @@ -173,8 +188,10 @@ if (filterResponse == ReturnCode.SKIP) return MatchCode.SKIP; + else if (filterResponse == ReturnCode.NEXT_COL) + return MatchCode.SEEK_NEXT_COL; + // else if (filterResponse == ReturnCode.NEXT_ROW) - // else if (filterResponse == ReturnCode.NEXT_ROW) stickyNextRow = true; return MatchCode.SEEK_NEXT_ROW; } Index: src/main/java/org/apache/hadoop/hbase/filter/Filter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/Filter.java (revision 960691) +++ src/main/java/org/apache/hadoop/hbase/filter/Filter.java (working copy) @@ -102,6 +102,10 @@ */ SKIP, /** + * Skip this column. Go to the next column in this row. + */ + NEXT_COL, + /** * Done with columns, skip to next row. Note that filterRow() will * still be called. */ Index: src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java (revision 0) @@ -0,0 +1,102 @@ +package org.apache.hadoop.hbase.filter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.KeyValue; + +/** + * Filter that returns only cells whose timestamp (version) is + * in the specified list of timestamps (versions). + *

+ * Note: Use of this filter overrides any time range/time stamp + * options specified using {@link Get#setTimeRange(long, long)}, + * {@link Scan#setTimeRange(long, long)}, {@link Get#setTimeStamp(long)}, + * or {@link Scan#setTimeStamp(long)}. + */ +public class TimestampsFilter extends FilterBase { + + TreeSet timestamps; + + // defaults: if Filter is used but no timestamps are requested, + // everything should be excluded. + long minTimeStamp = Long.MAX_VALUE; + long maxTimeStamp = Long.MAX_VALUE; + + /** + * Used during deserialization. Do not use otherwise. + */ + public TimestampsFilter() { + super(); + } + + /** + * Constructor for filter that retains only those + * cells whose timestamp (version) is in the specified + * list of timestamps. + * + * @param timestamps + */ + public TimestampsFilter(List timestamps) { + this.timestamps = new TreeSet(timestamps); + initMinMaxTimestamps(); + } + + private void initMinMaxTimestamps() { + if (this.timestamps.size() > 0) { + minTimeStamp = this.timestamps.first(); + maxTimeStamp = this.timestamps.last() + 1; + } + } + + /** + * Gets the minimum timestamp requested by filter. + * @return minimum timestamp requested by filter. + */ + public long getMin() { + return minTimeStamp; + } + + /** + * Gets the maximum timestamp requested by filter. + * + * @return maximum timestamp requested by filter. + */ + public long getMax() { + return maxTimeStamp; + } + + @Override + public ReturnCode filterKeyValue(KeyValue v) { + if (this.timestamps.contains(v.getTimestamp())) { + return ReturnCode.INCLUDE; + } else if (v.getTimestamp() < minTimeStamp) { + // The remaining versions of this column are guaranteed + // to be lesser than all of the other values. + return ReturnCode.NEXT_COL; + } + return ReturnCode.SKIP; + } + + @Override + public void readFields(DataInput in) throws IOException { + int numTimestamps = in.readInt(); + this.timestamps = new TreeSet(); + for (int idx = 0; idx < numTimestamps; idx++) { + this.timestamps.add(in.readLong()); + } + initMinMaxTimestamps(); + } + + @Override + public void write(DataOutput out) throws IOException { + int numTimestamps = this.timestamps.size(); + out.writeInt(numTimestamps); + for (Long timestamp : this.timestamps) { + out.writeLong(timestamp); + } + } +}