Index: src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (working copy) @@ -42,9 +42,11 @@ * @param bytes * @param offset * @param length + * @param timestamp * @return The match code instance. */ - public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, int length); + public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, + int length, long timestamp); /** * Updates internal variables in between files Index: src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (working copy) @@ -23,6 +23,7 @@ import java.util.List; import java.util.NavigableSet; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; /** @@ -51,6 +52,9 @@ private final List columnsToReuse; private int index; private ColumnCount column; + /** Keeps track of the latest timestamp included for current column. + * Used to eliminate duplicates. */ + private long latestTSOfCurrentColumn; /** * Default constructor. @@ -84,51 +88,63 @@ * @param bytes KeyValue buffer * @param offset offset to the start of the qualifier * @param length length of the qualifier + * @param timestamp timestamp of the key being checked * @return MatchCode telling ScanQueryMatcher what action to take */ - public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, int length) { + public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset, + int length, long timestamp) { do { // No more columns left, we are done with this query if(this.columns.size() == 0) { - return ScanQueryMatcher.MatchCode.DONE; // done_row + return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row } // No more columns to match against, done with storefile if(this.column == null) { - return ScanQueryMatcher.MatchCode.NEXT; // done_row + return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row } // Compare specific column to current column int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), column.getLength(), bytes, offset, length); - // Matches, decrement versions left and include + // Column Matches. If it is not a duplicate key, decrement versions left + // and include. if(ret == 0) { + //If column matches, check if it is a duplicate timestamp + if (sameAsPreviousTS(timestamp)) { + //If duplicate, skip this Key + return ScanQueryMatcher.MatchCode.SKIP; + } if(this.column.decrement() == 0) { // Done with versions for this column this.columns.remove(this.index); + resetTS(); if(this.columns.size() == this.index) { // Will not hit any more columns in this storefile this.column = null; } else { this.column = this.columns.get(this.index); } + } else { + setTS(timestamp); } return ScanQueryMatcher.MatchCode.INCLUDE; } + resetTS(); if (ret > 0) { - // Specified column is smaller than the current, skip to next column. - return ScanQueryMatcher.MatchCode.SKIP; + // Specified column is smaller than the current, skip to next column. + return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; } // Specified column is bigger than current column // Move down current column and check again if(ret <= -1) { - if(++this.index == this.columns.size()) { + if(++this.index >= this.columns.size()) { // No more to match, do not include, done with storefile - return ScanQueryMatcher.MatchCode.NEXT; // done_row + return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row } // This is the recursive case. this.column = this.columns.get(this.index); @@ -154,8 +170,21 @@ buildColumnList(); this.index = 0; this.column = this.columns.get(this.index); + resetTS(); } + private void resetTS() { + latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP; + } + + private void setTS(long timestamp) { + latestTSOfCurrentColumn = timestamp; + } + + private boolean sameAsPreviousTS(long timestamp) { + return timestamp == latestTSOfCurrentColumn; + } + private void buildColumnList() { this.columns.clear(); this.columns.addAll(this.columnsToReuse); Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (working copy) @@ -87,7 +87,7 @@ } else { KeyValueScanner topScanner = this.heap.peek(); if (topScanner == null || - this.comparator.compare(kvNext, topScanner.peek()) > 0) { + this.comparator.compare(kvNext, topScanner.peek()) >= 0) { this.heap.add(this.current); this.current = this.heap.poll(); } @@ -153,7 +153,22 @@ this.kvComparator = kvComparator; } public int compare(KeyValueScanner left, KeyValueScanner right) { - return compare(left.peek(), right.peek()); + int comparison = compare(left.peek(), right.peek()); + if (comparison != 0) { + return comparison; + } else { + // Since both the keys are exactly the same, we break the tie in favor + // of the key which came latest. + long leftSequenceID = left.getSequenceID(); + long rightSequenceID = right.getSequenceID(); + if (leftSequenceID > rightSequenceID) { + return -1; + } else if (leftSequenceID < rightSequenceID) { + return 1; + } else { + return 0; + } + } } /** * Compares two KeyValue @@ -253,4 +268,9 @@ public PriorityQueue getHeap() { return this.heap; } + + @Override + public long getSequenceID() { + return 0; + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (working copy) @@ -57,6 +57,14 @@ public boolean reseek(KeyValue key) throws IOException; /** + * Get the sequence id associated with this KeyValueScanner. This is required + * for comparing multiple files to find out which one has the latest data. + * The default implementation for this would be to return 0. A file having + * lower sequence id will be considered to be the older one. + */ + public long getSequenceID(); + + /** * Close the KeyValue scanner. */ public void close(); Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -668,6 +668,15 @@ this.kvsetIt = null; this.snapshotIt = null; } + + /** + * MemStoreScanner returns max value as sequence id because it will + * always have the latest data among all files. + */ + @Override + public long getSequenceID() { + return Long.MAX_VALUE; + } } public final static long FIXED_OVERHEAD = ClassSize.align( Index: src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (working copy) @@ -130,4 +130,9 @@ public void close() { heap.close(); } + + @Override + public long getSequenceID() { + return 0; + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (working copy) @@ -199,19 +199,17 @@ } } - MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength); - // if SKIP -> SEEK_NEXT_COL - // if (NEXT,DONE) -> SEEK_NEXT_ROW - // if (INCLUDE) -> INCLUDE - if (colChecker == MatchCode.SKIP) { - return MatchCode.SEEK_NEXT_COL; - } else if (colChecker == MatchCode.NEXT || colChecker == MatchCode.DONE) { + MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp); + /* + * According to current implementation, colChecker can only be + * SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return + * the MatchCode. If it is SEEK_NEXT_ROW, also set stickyNextRow. + */ + if (colChecker == MatchCode.SEEK_NEXT_ROW) { stickyNextRow = true; - return MatchCode.SEEK_NEXT_ROW; } + return colChecker; - return MatchCode.INCLUDE; - } public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (working copy) @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; @@ -36,6 +37,9 @@ private int columnLength = 0; private int currentCount = 0; private int maxVersions; + /* Keeps track of the latest timestamp included for current column. + * Used to eliminate duplicates. */ + private long latestTSOfCurrentColumn; /** * Return maxVersions of every row. @@ -53,10 +57,12 @@ * @param bytes * @param offset * @param length + * @param timestamp * @return The match code instance. */ @Override - public MatchCode checkColumn(byte[] bytes, int offset, int length) { + public MatchCode checkColumn(byte[] bytes, int offset, int length, + long timestamp) { if (columnBuffer == null) { // first iteration. columnBuffer = bytes; @@ -64,18 +70,28 @@ columnLength = length; currentCount = 0; - if (++currentCount > maxVersions) - return ScanQueryMatcher.MatchCode.SKIP; + if (++currentCount > maxVersions) { + return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; + } + setTS(timestamp); return ScanQueryMatcher.MatchCode.INCLUDE; } int cmp = Bytes.compareTo(bytes, offset, length, columnBuffer, columnOffset, columnLength); if (cmp == 0) { - if (++currentCount > maxVersions) - return ScanQueryMatcher.MatchCode.SKIP; // skip to next col + //If column matches, check if it is a duplicate timestamp + if (sameAsPreviousTS(timestamp)) { + return ScanQueryMatcher.MatchCode.SKIP; + } + if (++currentCount > maxVersions) { + return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col + } + setTS(timestamp); return ScanQueryMatcher.MatchCode.INCLUDE; } + resetTS(); + // new col > old col if (cmp > 0) { // switched columns, lets do something.x @@ -84,7 +100,8 @@ columnLength = length; currentCount = 0; if (++currentCount > maxVersions) - return ScanQueryMatcher.MatchCode.SKIP; + return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; + setTS(timestamp); return ScanQueryMatcher.MatchCode.INCLUDE; } @@ -101,8 +118,10 @@ columnOffset = offset; columnLength = length; currentCount = 0; - if (++currentCount > maxVersions) - return ScanQueryMatcher.MatchCode.SKIP; + if (++currentCount > maxVersions) { + return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; + } + setTS(timestamp); return ScanQueryMatcher.MatchCode.INCLUDE; } @@ -116,8 +135,21 @@ @Override public void reset() { columnBuffer = null; + resetTS(); } + private void resetTS() { + latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP; + } + + private void setTS(long timestamp) { + latestTSOfCurrentColumn = timestamp; + } + + private boolean sameAsPreviousTS(long timestamp) { + return timestamp == latestTSOfCurrentColumn; + } + /** * Used by matcher and scan/get to get a hint of the next column * to seek to after checkColumn() returns SKIP. Returns the next interesting Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -184,7 +184,7 @@ } this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); - this.storefiles = ImmutableList.copyOf(loadStoreFiles()); + this.storefiles = sortAndClone(loadStoreFiles()); } public HColumnDescriptor getFamily() { @@ -219,7 +219,7 @@ } /* - * Creates a series of StoreFile loaded from the given directory. + * Creates an unsorted list of StoreFile loaded from the given directory. * @throws IOException */ private List loadStoreFiles() @@ -256,7 +256,6 @@ } results.add(curfile); } - Collections.sort(results, StoreFile.Comparators.FLUSH_TIME); return results; } @@ -357,7 +356,7 @@ try { ArrayList newFiles = new ArrayList(storefiles); newFiles.add(sf); - this.storefiles = ImmutableList.copyOf(newFiles); + this.storefiles = sortAndClone(newFiles); notifyChangedReadersObservers(); } finally { this.lock.writeLock().unlock(); @@ -511,7 +510,7 @@ try { ArrayList newList = new ArrayList(storefiles); newList.add(sf); - storefiles = ImmutableList.copyOf(newList); + storefiles = sortAndClone(newList); this.memstore.clearSnapshot(set); // Tell listeners of the change in readers. @@ -900,7 +899,7 @@ newStoreFiles.add(result); } - this.storefiles = ImmutableList.copyOf(newStoreFiles); + this.storefiles = sortAndClone(newStoreFiles); // Tell observers that list of StoreFiles has changed. notifyChangedReadersObservers(); @@ -931,6 +930,12 @@ return result; } + public ImmutableList sortAndClone(List storeFiles) { + Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME); + ImmutableList newList = ImmutableList.copyOf(storeFiles); + return newList; + } + // //////////////////////////////////////////////////////////////////////////// // Accessors. // (This is the only section that is directly useful!) Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (working copy) @@ -394,6 +394,7 @@ } } } + this.reader.setSequenceID(this.sequenceid); b = metadataMap.get(MAJOR_COMPACTION_KEY); if (b != null) { @@ -866,6 +867,7 @@ protected BloomType bloomFilterType; private final HFile.Reader reader; protected TimeRangeTracker timeRangeTracker = null; + protected long sequenceID = -1; public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory) throws IOException { @@ -1048,6 +1050,14 @@ public BloomType getBloomFilterType() { return this.bloomFilterType; } + + public long getSequenceID() { + return sequenceID; + } + + public void setSequenceID(long sequenceID) { + this.sequenceID = sequenceID; + } } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (working copy) @@ -163,4 +163,9 @@ public boolean shouldSeek(Scan scan, final SortedSet columns) { return reader.shouldSeek(scan, columns); } + + @Override + public long getSequenceID() { + return reader.getSequenceID(); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (revision 993156) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (working copy) @@ -368,4 +368,9 @@ //guarantees that heap will never be null before this call. return this.heap.reseek(kv); } + + @Override + public long getSequenceID() { + return 0; + } } Index: src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (revision 993156) +++ src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (working copy) @@ -35,6 +35,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.NavigableMap; import java.util.UUID; import org.apache.commons.logging.Log; @@ -2844,7 +2845,7 @@ return Bytes.equals(left, right); } - @Ignore @Test + @Test public void testDuplicateVersions() throws Exception { byte [] TABLE = Bytes.toBytes("testDuplicateVersions"); @@ -3044,21 +3045,201 @@ get.setMaxVersions(Integer.MAX_VALUE); result = ht.get(get); assertNResult(result, ROW, FAMILY, QUALIFIER, - new long [] {STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]}, - new byte[][] {VALUES[3], VALUES[14], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]}, - 0, 7); + new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[14], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]}, + 0, 9); scan = new Scan(ROW); scan.addColumn(FAMILY, QUALIFIER); scan.setMaxVersions(Integer.MAX_VALUE); result = getSingleScanResult(ht, scan); assertNResult(result, ROW, FAMILY, QUALIFIER, - new long [] {STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]}, - new byte[][] {VALUES[3], VALUES[14], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]}, - 0, 7); + new long [] {STAMPS[1], STAMPS[2], STAMPS[3], STAMPS[4], STAMPS[5], STAMPS[6], STAMPS[8], STAMPS[9], STAMPS[13], STAMPS[15]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3], VALUES[14], VALUES[5], VALUES[6], VALUES[8], VALUES[9], VALUES[13], VALUES[15]}, + 0, 9); } @Test + public void testUpdates() throws Exception { + + byte [] TABLE = Bytes.toBytes("testUpdates"); + HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10); + + // Write a column with values at timestamp 1, 2 and 3 + byte[] row = Bytes.toBytes("row1"); + byte[] qualifier = Bytes.toBytes("myCol"); + Put put = new Put(row); + put.add(FAMILY, qualifier, 1L, Bytes.toBytes("AAA")); + hTable.put(put); + + put = new Put(row); + put.add(FAMILY, qualifier, 2L, Bytes.toBytes("BBB")); + hTable.put(put); + + put = new Put(row); + put.add(FAMILY, qualifier, 3L, Bytes.toBytes("EEE")); + hTable.put(put); + + Get get = new Get(row); + get.addColumn(FAMILY, qualifier); + get.setMaxVersions(); + + // Check that the column indeed has the right values at timestamps 1 and + // 2 + Result result = hTable.get(get); + NavigableMap navigableMap = + result.getMap().get(FAMILY).get(qualifier); + assertEquals("AAA", Bytes.toString(navigableMap.get(1L))); + assertEquals("BBB", Bytes.toString(navigableMap.get(2L))); + + // Update the value at timestamp 1 + put = new Put(row); + put.add(FAMILY, qualifier, 1L, Bytes.toBytes("CCC")); + hTable.put(put); + + // Update the value at timestamp 2 + put = new Put(row); + put.add(FAMILY, qualifier, 2L, Bytes.toBytes("DDD")); + hTable.put(put); + + // Check that the values at timestamp 2 and 1 got updated + result = hTable.get(get); + navigableMap = result.getMap().get(FAMILY).get(qualifier); + assertEquals("CCC", Bytes.toString(navigableMap.get(1L))); + assertEquals("DDD", Bytes.toString(navigableMap.get(2L))); + } + + @Test + public void testUpdatesWithMajorCompaction() throws Exception { + + String tableName = "testUpdatesWithMajorCompaction"; + byte [] TABLE = Bytes.toBytes(tableName); + HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + + // Write a column with values at timestamp 1, 2 and 3 + byte[] row = Bytes.toBytes("row2"); + byte[] qualifier = Bytes.toBytes("myCol"); + Put put = new Put(row); + put.add(FAMILY, qualifier, 1L, Bytes.toBytes("AAA")); + hTable.put(put); + + put = new Put(row); + put.add(FAMILY, qualifier, 2L, Bytes.toBytes("BBB")); + hTable.put(put); + + put = new Put(row); + put.add(FAMILY, qualifier, 3L, Bytes.toBytes("EEE")); + hTable.put(put); + + Get get = new Get(row); + get.addColumn(FAMILY, qualifier); + get.setMaxVersions(); + + // Check that the column indeed has the right values at timestamps 1 and + // 2 + Result result = hTable.get(get); + NavigableMap navigableMap = + result.getMap().get(FAMILY).get(qualifier); + assertEquals("AAA", Bytes.toString(navigableMap.get(1L))); + assertEquals("BBB", Bytes.toString(navigableMap.get(2L))); + + // Trigger a major compaction + admin.flush(tableName); + admin.majorCompact(tableName); + Thread.sleep(6000); + + // Update the value at timestamp 1 + put = new Put(row); + put.add(FAMILY, qualifier, 1L, Bytes.toBytes("CCC")); + hTable.put(put); + + // Update the value at timestamp 2 + put = new Put(row); + put.add(FAMILY, qualifier, 2L, Bytes.toBytes("DDD")); + hTable.put(put); + + // Trigger a major compaction + admin.flush(tableName); + admin.majorCompact(tableName); + Thread.sleep(6000); + + // Check that the values at timestamp 2 and 1 got updated + result = hTable.get(get); + navigableMap = result.getMap().get(FAMILY).get(qualifier); + assertEquals("CCC", Bytes.toString(navigableMap.get(1L))); + assertEquals("DDD", Bytes.toString(navigableMap.get(2L))); + } + + @Test + public void testMajorCompactionBetweenTwoUpdates() throws Exception { + + String tableName = "testMajorCompactionBetweenTwoUpdates"; + byte [] TABLE = Bytes.toBytes(tableName); + HTable hTable = TEST_UTIL.createTable(TABLE, FAMILY, 10); + HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + + // Write a column with values at timestamp 1, 2 and 3 + byte[] row = Bytes.toBytes("row3"); + byte[] qualifier = Bytes.toBytes("myCol"); + Put put = new Put(row); + put.add(FAMILY, qualifier, 1L, Bytes.toBytes("AAA")); + hTable.put(put); + + put = new Put(row); + put.add(FAMILY, qualifier, 2L, Bytes.toBytes("BBB")); + hTable.put(put); + + put = new Put(row); + put.add(FAMILY, qualifier, 3L, Bytes.toBytes("EEE")); + hTable.put(put); + + Get get = new Get(row); + get.addColumn(FAMILY, qualifier); + get.setMaxVersions(); + + // Check that the column indeed has the right values at timestamps 1 and + // 2 + Result result = hTable.get(get); + NavigableMap navigableMap = + result.getMap().get(FAMILY).get(qualifier); + assertEquals("AAA", Bytes.toString(navigableMap.get(1L))); + assertEquals("BBB", Bytes.toString(navigableMap.get(2L))); + + // Trigger a major compaction + admin.flush(tableName); + admin.majorCompact(tableName); + Thread.sleep(6000); + + // Update the value at timestamp 1 + put = new Put(row); + put.add(FAMILY, qualifier, 1L, Bytes.toBytes("CCC")); + hTable.put(put); + + // Trigger a major compaction + admin.flush(tableName); + admin.majorCompact(tableName); + Thread.sleep(6000); + + // Update the value at timestamp 2 + put = new Put(row); + put.add(FAMILY, qualifier, 2L, Bytes.toBytes("DDD")); + hTable.put(put); + + // Trigger a major compaction + admin.flush(tableName); + admin.majorCompact(tableName); + Thread.sleep(6000); + + // Check that the values at timestamp 2 and 1 got updated + result = hTable.get(get); + navigableMap = result.getMap().get(FAMILY).get(qualifier); + + assertEquals("CCC", Bytes.toString(navigableMap.get(1L))); + assertEquals("DDD", Bytes.toString(navigableMap.get(2L))); + } + + @Test public void testGet_EmptyTable() throws IOException { HTable table = TEST_UTIL.createTable(Bytes.toBytes("testGet_EmptyTable"), FAMILY); Get get = new Get(ROW); Index: src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (revision 993156) +++ src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (working copy) @@ -103,4 +103,9 @@ public void close() { // noop. } + + @Override + public long getSequenceID() { + return 0; + } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestColumnSeeking.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestColumnSeeking.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestColumnSeeking.java (revision 0) @@ -0,0 +1,289 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +public class TestColumnSeeking { + + private final static HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + static final Log LOG = LogFactory.getLog(TestColumnSeeking.class); + + @SuppressWarnings("unchecked") + @Test + public void testDuplicateVersions() throws IOException { + String family = "Family"; + byte[] familyBytes = Bytes.toBytes("Family"); + String table = "TestDuplicateVersions"; + + HColumnDescriptor hcd = + new HColumnDescriptor(familyBytes, 1000, + HColumnDescriptor.DEFAULT_COMPRESSION, + HColumnDescriptor.DEFAULT_IN_MEMORY, + HColumnDescriptor.DEFAULT_BLOCKCACHE, + HColumnDescriptor.DEFAULT_TTL, + HColumnDescriptor.DEFAULT_BLOOMFILTER); + HTableDescriptor htd = new HTableDescriptor(table); + htd.addFamily(hcd); + HRegionInfo info = new HRegionInfo(htd, null, null, false); + HRegion region = + HRegion.createHRegion(info, HBaseTestingUtility.getTestDir(), TEST_UTIL + .getConfiguration()); + + List rows = generateRandomWords(10, "row"); + List allColumns = generateRandomWords(10, "column"); + List values = generateRandomWords(100, "value"); + + long maxTimestamp = 2; + double selectPercent = 0.5; + int numberOfTests = 5; + double flushPercentage = 0.2; + double minorPercentage = 0.2; + double majorPercentage = 0.2; + double putPercentage = 0.2; + + HashMap allKVMap = new HashMap(); + + HashMap[] kvMaps = new HashMap[numberOfTests]; + ArrayList[] columnLists = new ArrayList[numberOfTests]; + + for (int i = 0; i < numberOfTests; i++) { + kvMaps[i] = new HashMap(); + columnLists[i] = new ArrayList(); + for (String column : allColumns) { + if (Math.random() < selectPercent) { + columnLists[i].add(column); + } + } + } + + for (String value : values) { + for (String row : rows) { + Put p = new Put(Bytes.toBytes(row)); + for (String column : allColumns) { + for (long timestamp = 1; timestamp <= maxTimestamp; timestamp++) { + KeyValue kv = + KeyValueTestUtil.create(row, family, column, timestamp, value); + if (Math.random() < putPercentage) { + p.add(kv); + allKVMap.put(kv.getKeyString(), kv); + for (int i = 0; i < numberOfTests; i++) { + if (columnLists[i].contains(column)) { + kvMaps[i].put(kv.getKeyString(), kv); + } + } + } + } + } + region.put(p); + if (Math.random() < flushPercentage) { + LOG.info("Flushing... "); + region.flushcache(); + } + + if (Math.random() < minorPercentage) { + LOG.info("Minor compacting... "); + region.compactStores(false); + } + + if (Math.random() < majorPercentage) { + LOG.info("Major compacting... "); + region.compactStores(true); + } + } + } + + for (int i = 0; i < numberOfTests + 1; i++) { + Collection kvSet; + Scan scan = new Scan(); + scan.setMaxVersions(); + if (i < numberOfTests) { + kvSet = kvMaps[i].values(); + for (String column : columnLists[i]) { + scan.addColumn(familyBytes, Bytes.toBytes(column)); + } + LOG.info("ExplicitColumns scanner"); + LOG.info("Columns: " + columnLists[i].size() + " Keys: " + + kvSet.size()); + } else { + kvSet = allKVMap.values(); + LOG.info("Wildcard scanner"); + LOG.info("Columns: " + allColumns.size() + " Keys: " + kvSet.size()); + + } + InternalScanner scanner = region.getScanner(scan); + List results = new ArrayList(); + while (scanner.next(results)) + ; + assertEquals(kvSet.size(), results.size()); + assertTrue(results.containsAll(kvSet)); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testReseeking() throws IOException { + String family = "Family"; + byte[] familyBytes = Bytes.toBytes("Family"); + String table = "TestSingleVersions"; + + HTableDescriptor htd = new HTableDescriptor(table); + htd.addFamily(new HColumnDescriptor(family)); + HRegionInfo info = new HRegionInfo(htd, null, null, false); + HRegion region = + HRegion.createHRegion(info, HBaseTestingUtility.getTestDir(), TEST_UTIL + .getConfiguration()); + + List rows = generateRandomWords(10, "row"); + List allColumns = generateRandomWords(100, "column"); + + long maxTimestamp = 2; + double selectPercent = 0.5; + int numberOfTests = 5; + double flushPercentage = 0.2; + double minorPercentage = 0.2; + double majorPercentage = 0.2; + double putPercentage = 0.2; + + HashMap allKVMap = new HashMap(); + + HashMap[] kvMaps = new HashMap[numberOfTests]; + ArrayList[] columnLists = new ArrayList[numberOfTests]; + String valueString = "Value"; + + for (int i = 0; i < numberOfTests; i++) { + kvMaps[i] = new HashMap(); + columnLists[i] = new ArrayList(); + for (String column : allColumns) { + if (Math.random() < selectPercent) { + columnLists[i].add(column); + } + } + } + + for (String row : rows) { + Put p = new Put(Bytes.toBytes(row)); + for (String column : allColumns) { + for (long timestamp = 1; timestamp <= maxTimestamp; timestamp++) { + KeyValue kv = + KeyValueTestUtil.create(row, family, column, timestamp, + valueString); + if (Math.random() < putPercentage) { + p.add(kv); + allKVMap.put(kv.getKeyString(), kv); + for (int i = 0; i < numberOfTests; i++) { + if (columnLists[i].contains(column)) { + kvMaps[i].put(kv.getKeyString(), kv); + } + } + } + + } + } + region.put(p); + if (Math.random() < flushPercentage) { + LOG.info("Flushing... "); + region.flushcache(); + } + + if (Math.random() < minorPercentage) { + LOG.info("Minor compacting... "); + region.compactStores(false); + } + + if (Math.random() < majorPercentage) { + LOG.info("Major compacting... "); + region.compactStores(true); + } + } + + for (int i = 0; i < numberOfTests + 1; i++) { + Collection kvSet; + Scan scan = new Scan(); + scan.setMaxVersions(); + if (i < numberOfTests) { + kvSet = kvMaps[i].values(); + for (String column : columnLists[i]) { + scan.addColumn(familyBytes, Bytes.toBytes(column)); + } + LOG.info("ExplicitColumns scanner"); + LOG.info("Columns: " + columnLists[i].size() + " Keys: " + + kvSet.size()); + } else { + kvSet = allKVMap.values(); + LOG.info("Wildcard scanner"); + LOG.info("Columns: " + allColumns.size() + " Keys: " + kvSet.size()); + + } + InternalScanner scanner = region.getScanner(scan); + List results = new ArrayList(); + while (scanner.next(results)) + ; + assertEquals(kvSet.size(), results.size()); + assertTrue(results.containsAll(kvSet)); + } + } + + List generateRandomWords(int numberOfWords, String suffix) { + Set wordSet = new HashSet(); + for (int i = 0; i < numberOfWords; i++) { + int lengthOfWords = (int) (Math.random() * 5) + 1; + char[] wordChar = new char[lengthOfWords]; + for (int j = 0; j < wordChar.length; j++) { + wordChar[j] = (char) (Math.random() * 26 + 97); + } + String word; + if (suffix == null) { + word = new String(wordChar); + } else { + word = new String(wordChar) + suffix; + } + wordSet.add(word); + } + List wordList = new ArrayList(wordSet); + return wordList; + } +} Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (revision 993156) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (working copy) @@ -50,9 +50,10 @@ //Initialize result List result = new ArrayList(); + long timestamp = 0; //"Match" for(byte [] col : scannerColumns){ - result.add(exp.checkColumn(col, 0, col.length)); + result.add(exp.checkColumn(col, 0, col.length, ++timestamp)); } assertEquals(expected.size(), result.size()); @@ -76,11 +77,11 @@ columns.add(col2); columns.add(col4); List expected = new ArrayList(); - expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); expected.add(ScanQueryMatcher.MatchCode.INCLUDE); - expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); expected.add(ScanQueryMatcher.MatchCode.INCLUDE); - expected.add(ScanQueryMatcher.MatchCode.DONE); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); int maxVersions = 1; //Create "Scanner" @@ -106,25 +107,25 @@ columns.add(col4); List expected = new ArrayList(); - expected.add(ScanQueryMatcher.MatchCode.SKIP); - expected.add(ScanQueryMatcher.MatchCode.SKIP); - expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); expected.add(ScanQueryMatcher.MatchCode.INCLUDE); expected.add(ScanQueryMatcher.MatchCode.INCLUDE); - expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); - expected.add(ScanQueryMatcher.MatchCode.SKIP); - expected.add(ScanQueryMatcher.MatchCode.SKIP); - expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); expected.add(ScanQueryMatcher.MatchCode.INCLUDE); expected.add(ScanQueryMatcher.MatchCode.INCLUDE); - expected.add(ScanQueryMatcher.MatchCode.DONE); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); - expected.add(ScanQueryMatcher.MatchCode.DONE); - expected.add(ScanQueryMatcher.MatchCode.DONE); - expected.add(ScanQueryMatcher.MatchCode.DONE); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW); int maxVersions = 2; //Create "Scanner" @@ -163,13 +164,13 @@ ColumnTracker explicit = new ExplicitColumnTracker(columns, maxVersions); for (int i = 0; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length); + explicit.checkColumn(col, 0, col.length, 1); } explicit.update(); for (int i = 1; i < 100000; i+=2) { byte [] col = Bytes.toBytes("col"+i); - explicit.checkColumn(col, 0, col.length); + explicit.checkColumn(col, 0, col.length, 1); } } @@ -184,8 +185,8 @@ new byte[][] { col1, col4 }); List expected = Arrays.asList( new ScanQueryMatcher.MatchCode[] { - ScanQueryMatcher.MatchCode.SKIP, - ScanQueryMatcher.MatchCode.SKIP }); + ScanQueryMatcher.MatchCode.SEEK_NEXT_COL, + ScanQueryMatcher.MatchCode.SEEK_NEXT_COL }); runTest(1, columns, scanner, expected); } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (revision 993156) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (working copy) @@ -259,6 +259,11 @@ public boolean reseek(KeyValue key) throws IOException { return seek(key); } + + @Override + public long getSequenceID() { + return 0; + } } } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java (revision 993156) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java (working copy) @@ -52,7 +52,8 @@ List actual = new ArrayList(); for(byte [] qualifier : qualifiers) { - ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length); + ScanQueryMatcher.MatchCode mc = tracker.checkColumn(qualifier, 0, + qualifier.length, 1); actual.add(mc); } @@ -77,13 +78,15 @@ List expected = new ArrayList(); expected.add(ScanQueryMatcher.MatchCode.INCLUDE); expected.add(ScanQueryMatcher.MatchCode.INCLUDE); - expected.add(ScanQueryMatcher.MatchCode.SKIP); + expected.add(ScanQueryMatcher.MatchCode.SEEK_NEXT_COL); expected.add(ScanQueryMatcher.MatchCode.INCLUDE); List actual = new ArrayList(); + long timestamp = 0; for(byte [] qualifier : qualifiers) { - MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length); + MatchCode mc = tracker.checkColumn(qualifier, 0, qualifier.length, + ++timestamp); actual.add(mc); } @@ -106,7 +109,7 @@ try { for(byte [] qualifier : qualifiers) { - tracker.checkColumn(qualifier, 0, qualifier.length); + tracker.checkColumn(qualifier, 0, qualifier.length, 1); } } catch (Exception e) { ok = true;