diff --git a/src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java b/src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java index 32d39b4..15d7355 100644 --- a/src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java +++ b/src/contrib/indexed/src/java/org/apache/hadoop/hbase/regionserver/IdxRegion.java @@ -400,7 +400,7 @@ public class IdxRegion extends HRegion { List scanners = new ArrayList(); // this seems a bit pointless because the memstore only ever returns an // array with only one element, but just incase... - KeyValueScanner[] memstorescanners = store.memstore.getScanners(); + KeyValueScanner[] memstorescanners = store.memstore.getScanners(null); // to make sure we don't provide rows that the scan is not interested in // we seekTo the scan's startRow KeyValue seekTo = KeyValue.createFirstOnRow(startRow); diff --git a/src/java/org/apache/hadoop/hbase/KeyValue.java b/src/java/org/apache/hadoop/hbase/KeyValue.java index 49a643e..049a0bd 100644 --- a/src/java/org/apache/hadoop/hbase/KeyValue.java +++ b/src/java/org/apache/hadoop/hbase/KeyValue.java @@ -106,6 +106,11 @@ public class KeyValue implements Writable, HeapSize { * {@link KeyValue} keys. */ public static KeyComparator ROOT_KEY_COMPARATOR = new RootKeyComparator(); + /** + * Stands for an 'unset' KeyValue update id - e.g. an id which was never + * assigned. + */ + public static final int UNSET_UPDATE_ID = Integer.MIN_VALUE; /** * Get the appropriate row comparator for the specified table. @@ -199,6 +204,11 @@ public class KeyValue implements Writable, HeapSize { private byte [] bytes = null; private int offset = 0; private int length = 0; + /** + * A transient field used by memstore to keep scans coherent. + * Needn't be written out/in. + */ + private transient int updateId = UNSET_UPDATE_ID; /** Writable Constructor -- DO NOT USE */ public KeyValue() {} @@ -632,6 +642,14 @@ public class KeyValue implements Writable, HeapSize { return length; } + public int getUpdateId() { + return updateId; + } + + public void setUpdateId(int updateId) { + this.updateId = updateId; + } + //--------------------------------------------------------------------------- // // Length and Offset Calculators @@ -1553,7 +1571,18 @@ public class KeyValue implements Writable, HeapSize { final byte [] q, final long ts) { return new KeyValue(row, f, q, ts, Type.Maximum); } - + + /** + * Creates a 'last on row' key value. Target usage is get-scans. + * + * @param row row + * @return last possible key on passed row and family + */ + public static KeyValue createLastOnRow(final byte[] row) { + // need to take care not to create a variant with a non-empty column + return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum); + } + /** * @param b * @param o @@ -1723,11 +1752,15 @@ public class KeyValue implements Writable, HeapSize { // a row. byte ltype = left[loffset + (llength - 1)]; + byte rtype = right[roffset + (rlength - 1)]; - if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) { - return 1; // left is bigger. + if (ltype != rtype) { + if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) { + return 1; // left is bigger. + } else if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) { + return -1; + } } - compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right, rcolumnoffset, rcolumnlength); if (compare != 0) { @@ -1750,8 +1783,7 @@ public class KeyValue implements Writable, HeapSize { // Compare types. Let the delete types sort ahead of puts; i.e. types // of higher numbers sort before those of lesser numbers - // ltype is defined above - byte rtype = right[roffset + (rlength - 1)]; + // ltype and rtype are defined above return (0xff & rtype) - (0xff & ltype); } return 0; @@ -1790,8 +1822,8 @@ public class KeyValue implements Writable, HeapSize { // HeapSize public long heapSize() { return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE + - ClassSize.align(ClassSize.ARRAY + length) + - (2 * Bytes.SIZEOF_INT)); + ClassSize.align(ClassSize.ARRAY + length) + + (3 * Bytes.SIZEOF_INT)); } // Writable diff --git a/src/java/org/apache/hadoop/hbase/client/Scan.java b/src/java/org/apache/hadoop/hbase/client/Scan.java index 0972ce8..eaceef9 100644 --- a/src/java/org/apache/hadoop/hbase/client/Scan.java +++ b/src/java/org/apache/hadoop/hbase/client/Scan.java @@ -101,7 +101,7 @@ public class Scan implements Writable { // additional data for the scan protected Map values = new HashMap(); - + /** * Create a Scan operation across all rows. */ @@ -163,6 +163,20 @@ public class Scan implements Writable { } /** + * Construct a scan matching a Get. + * + * @param get the get to match + */ + public Scan(Get get) { + this.startRow = get.getRow(); + this.stopRow = get.getRow(); + this.filter = get.getFilter(); + this.maxVersions = get.getMaxVersions(); + this.tr = get.getTimeRange(); + this.familyMap = get.getFamilyMap(); + } + + /** * Get all columns from the specified family. *

* Overrides previous calls to addColumn for this family. @@ -332,7 +346,12 @@ public class Scan implements Writable { this.stopRow = stopRow; return this; } - + + public boolean isGetScan() { + return this.startRow != null && this.startRow.length > 0 && + Bytes.equals(this.startRow, this.stopRow); + } + /** * Get all available versions. */ @@ -580,7 +599,7 @@ public class Scan implements Writable { public void remove(final byte [] key) { values.remove(new ImmutableBytesWritable(key)); } - + /** * @return String */ diff --git a/src/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/src/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java index dfb3026..a29e8f3 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java @@ -19,7 +19,7 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode; +import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; /** * Implementing classes of this interface will be used for the tracking @@ -29,10 +29,10 @@ import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode; * Currently there are two different types of Store/Family-level queries. *

  • {@link ExplicitColumnTracker} is used when the query specifies * one or more column qualifiers to return in the family. - *
  • {@link WildcardColumnTracker} is used when the query asks for all + *
  • {@link ScanWildcardColumnTracker} is used when the query asks for all * qualifiers within the family. *

    - * This class is utilized by {@link QueryMatcher} through two methods: + * This class is utilized by {@link ScanQueryMatcher} through two methods: *

    • {@link #checkColumn} is called when a Put satisfies all other * conditions of the query. This method returns a {@link MatchCode} to define * what action should be taken. diff --git a/src/java/org/apache/hadoop/hbase/regionserver/DeleteCompare.java b/src/java/org/apache/hadoop/hbase/regionserver/DeleteCompare.java deleted file mode 100644 index 29a645e..0000000 --- a/src/java/org/apache/hadoop/hbase/regionserver/DeleteCompare.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.util.Bytes; - - -/** - * Class that provides static method needed when putting deletes into memstore - */ -public class DeleteCompare { - - /** - * Return codes from deleteCompare. - */ - enum DeleteCode { - /** - * Do nothing. Move to next KV in memstore - */ - SKIP, - - /** - * Add to the list of deletes. - */ - DELETE, - - /** - * Stop looking at KVs in memstore. Finalize. - */ - DONE - } - - /** - * Method used when putting deletes into memstore to remove all the previous - * entries that are affected by this Delete - * @param mem - * @param deleteBuffer - * @param deleteRowOffset - * @param deleteRowLength - * @param deleteQualifierOffset - * @param deleteQualifierLength - * @param deleteTimeOffset - * @param deleteType - * @param comparator - * @return SKIP if current KeyValue should not be deleted, DELETE if - * current KeyValue should be deleted and DONE when the current KeyValue is - * out of the Deletes range - */ - public static DeleteCode deleteCompare(KeyValue mem, byte [] deleteBuffer, - int deleteRowOffset, short deleteRowLength, int deleteQualifierOffset, - int deleteQualifierLength, int deleteTimeOffset, byte deleteType, - KeyValue.KeyComparator comparator) { - - //Parsing new KeyValue - byte [] memBuffer = mem.getBuffer(); - int memOffset = mem.getOffset(); - - //Getting key lengths - int memKeyLen = Bytes.toInt(memBuffer, memOffset); - memOffset += Bytes.SIZEOF_INT; - - //Skipping value lengths - memOffset += Bytes.SIZEOF_INT; - - //Getting row lengths - short memRowLen = Bytes.toShort(memBuffer, memOffset); - memOffset += Bytes.SIZEOF_SHORT; - int res = comparator.compareRows(memBuffer, memOffset, memRowLen, - deleteBuffer, deleteRowOffset, deleteRowLength); - if(res > 0) { - return DeleteCode.DONE; - } else if(res < 0){ - return DeleteCode.SKIP; - } - - memOffset += memRowLen; - - //Getting family lengths - byte memFamLen = memBuffer[memOffset]; - memOffset += Bytes.SIZEOF_BYTE + memFamLen; - - //Get column lengths - int memQualifierLen = memKeyLen - memRowLen - memFamLen - - Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG - - Bytes.SIZEOF_BYTE; - - //Compare timestamp - int tsOffset = memOffset + memQualifierLen; - int timeRes = Bytes.compareTo(memBuffer, tsOffset, Bytes.SIZEOF_LONG, - deleteBuffer, deleteTimeOffset, Bytes.SIZEOF_LONG); - - if (deleteType == KeyValue.Type.DeleteFamily.getCode()) { - if (timeRes <= 0) { - return DeleteCode.DELETE; - } - return DeleteCode.SKIP; - } - - //Compare columns - res = Bytes.compareTo(memBuffer, memOffset, memQualifierLen, - deleteBuffer, deleteQualifierOffset, deleteQualifierLength); - if (res < 0) { - return DeleteCode.SKIP; - } else if(res > 0) { - return DeleteCode.DONE; - } - // same column, compare the time. - if (timeRes == 0) { - return DeleteCode.DELETE; - } else if (timeRes < 0) { - if (deleteType == KeyValue.Type.DeleteColumn.getCode()) { - return DeleteCode.DELETE; - } - return DeleteCode.DONE; - } else { - return DeleteCode.SKIP; - } - } -} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/src/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java index 7f6ade4..76ff638 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java @@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.regionserver; import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; -import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode; +import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; /** @@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.util.Bytes; * between rows. * *

      - * This class is utilized by {@link QueryMatcher} through two methods: + * This class is utilized by {@link ScanQueryMatcher} through two methods: *

      • {@link #checkColumn} is called when a Put satisfies all other * conditions of the query. This method returns a {@link MatchCode} to define * what action should be taken. diff --git a/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java index ecd44f7..9a05d75 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes; /** * State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}. - * Like {@link GetDeleteTracker} and {@link ScanDeleteTracker} but does not + * Like {@link ScanDeleteTracker} but does not * implement the {@link DeleteTracker} interface since state spans rows (There * is no update nor reset method). */ diff --git a/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java b/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java deleted file mode 100644 index 4dd4f89..0000000 --- a/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * This class is responsible for the tracking and enforcement of Deletes - * during the course of a Get operation. - *

        - * This class is utilized through three methods: - *

        • {@link #add} when encountering a Delete - *
        • {@link #isDeleted} when checking if a Put KeyValue has been deleted - *
        • {@link #update} when reaching the end of a StoreFile - *

          - * This class is NOT thread-safe as queries are never multi-threaded - */ -public class GetDeleteTracker implements DeleteTracker { - private static long UNSET = -1L; - private long familyStamp = UNSET; - protected List deletes = null; - private List newDeletes = new ArrayList(); - private Iterator iterator; - private Delete delete = null; - - /** - * Constructor - */ - public GetDeleteTracker() {} - - /** - * Add the specified KeyValue to the list of deletes to check against for - * this row operation. - *

          - * This is called when a Delete is encountered in a StoreFile. - * @param buffer - * @param qualifierOffset - * @param qualifierLength - * @param timestamp - * @param type - */ - @Override - public void add(byte [] buffer, int qualifierOffset, int qualifierLength, - long timestamp, byte type) { - if (type == KeyValue.Type.DeleteFamily.getCode()) { - if(timestamp > familyStamp) { - familyStamp = timestamp; - } - return; - } - if(timestamp > familyStamp) { - this.newDeletes.add(new Delete(buffer, qualifierOffset, qualifierLength, - type, timestamp)); - } - } - - /** - * Check if the specified KeyValue buffer has been deleted by a previously - * seen delete. - * @param buffer KeyValue buffer - * @param qualifierOffset column qualifier offset - * @param qualifierLength column qualifier length - * @param timestamp timestamp - * @return true is the specified KeyValue is deleted, false if not - */ - @Override - public boolean isDeleted(byte [] buffer, int qualifierOffset, - int qualifierLength, long timestamp) { - // Check against DeleteFamily - if (timestamp <= familyStamp) { - return true; - } - - // Check if there are other deletes - if (this.delete == null) { - return false; - } - - // Check column - int ret = Bytes.compareTo(buffer, qualifierOffset, qualifierLength, - this.delete.buffer, this.delete.qualifierOffset, - this.delete.qualifierLength); - while (ret != 0) { - if (ret <= -1) { - // Have not reached the next delete yet - return false; - } else if (ret >= 1) { - // Deletes an earlier column, need to move down deletes - if (this.iterator.hasNext()) { - this.delete = this.iterator.next(); - } else { - this.delete = null; - return false; - } - ret = Bytes.compareTo(buffer, qualifierOffset, qualifierLength, - this.delete.buffer, this.delete.qualifierOffset, - this.delete.qualifierLength); - - } - } - - // Check Timestamp - if(timestamp > this.delete.timestamp) { - return false; - } - - // Check Type - switch(KeyValue.Type.codeToType(this.delete.type)) { - case Delete: - boolean equal = timestamp == this.delete.timestamp; - - if(this.iterator.hasNext()) { - this.delete = this.iterator.next(); - } else { - this.delete = null; - } - - if(equal){ - return true; - } - // timestamp < this.delete.timestamp - // Delete of an explicit column newer than current - return isDeleted(buffer, qualifierOffset, qualifierLength, timestamp); - case DeleteColumn: - return true; - } - - // should never reach this - return false; - } - - @Override - public boolean isEmpty() { - return this.familyStamp == UNSET && this.delete == null && - this.newDeletes.isEmpty(); - } - - @Override - public void reset() { - this.deletes = null; - this.delete = null; - this.newDeletes = new ArrayList(); - this.familyStamp = UNSET; - this.iterator = null; - } - - /** - * Called at the end of every StoreFile. - *

          - * Many optimized implementations of Trackers will require an update at - * when the end of each StoreFile is reached. - */ - @Override - public void update() { - // If no previous deletes, use new deletes and return - if (this.deletes == null || this.deletes.size() == 0) { - finalize(this.newDeletes); - return; - } - - // If no new delete, retain previous deletes and return - if(this.newDeletes.size() == 0) { - return; - } - - // Merge previous deletes with new deletes - List mergeDeletes = - new ArrayList(this.newDeletes.size()); - int oldIndex = 0; - int newIndex = 0; - - Delete newDelete = newDeletes.get(oldIndex); - Delete oldDelete = deletes.get(oldIndex); - while(true) { - switch(compareDeletes(oldDelete,newDelete)) { - case NEXT_NEW: { - if(++newIndex == newDeletes.size()) { - // Done with new, add the rest of old to merged and return - mergeDown(mergeDeletes, deletes, oldIndex); - finalize(mergeDeletes); - return; - } - newDelete = this.newDeletes.get(newIndex); - break; - } - - case INCLUDE_NEW_NEXT_NEW: { - mergeDeletes.add(newDelete); - if(++newIndex == newDeletes.size()) { - // Done with new, add the rest of old to merged and return - mergeDown(mergeDeletes, deletes, oldIndex); - finalize(mergeDeletes); - return; - } - newDelete = this.newDeletes.get(newIndex); - break; - } - - case INCLUDE_NEW_NEXT_BOTH: { - mergeDeletes.add(newDelete); - ++oldIndex; - ++newIndex; - if(oldIndex == deletes.size()) { - if(newIndex == newDeletes.size()) { - finalize(mergeDeletes); - return; - } - mergeDown(mergeDeletes, newDeletes, newIndex); - finalize(mergeDeletes); - return; - } else if(newIndex == newDeletes.size()) { - mergeDown(mergeDeletes, deletes, oldIndex); - finalize(mergeDeletes); - return; - } - oldDelete = this.deletes.get(oldIndex); - newDelete = this.newDeletes.get(newIndex); - break; - } - - case INCLUDE_OLD_NEXT_BOTH: { - mergeDeletes.add(oldDelete); - ++oldIndex; - ++newIndex; - if(oldIndex == deletes.size()) { - if(newIndex == newDeletes.size()) { - finalize(mergeDeletes); - return; - } - mergeDown(mergeDeletes, newDeletes, newIndex); - finalize(mergeDeletes); - return; - } else if(newIndex == newDeletes.size()) { - mergeDown(mergeDeletes, deletes, oldIndex); - finalize(mergeDeletes); - return; - } - oldDelete = this.deletes.get(oldIndex); - newDelete = this.newDeletes.get(newIndex); - break; - } - - case INCLUDE_OLD_NEXT_OLD: { - mergeDeletes.add(oldDelete); - if(++oldIndex == deletes.size()) { - mergeDown(mergeDeletes, newDeletes, newIndex); - finalize(mergeDeletes); - return; - } - oldDelete = this.deletes.get(oldIndex); - break; - } - - case NEXT_OLD: { - if(++oldIndex == deletes.size()) { - // Done with old, add the rest of new to merged and return - mergeDown(mergeDeletes, newDeletes, newIndex); - finalize(mergeDeletes); - return; - } - oldDelete = this.deletes.get(oldIndex); - } - } - } - } - - private void finalize(List mergeDeletes) { - this.deletes = mergeDeletes; - this.newDeletes = new ArrayList(); - if(this.deletes.size() > 0){ - this.iterator = deletes.iterator(); - this.delete = iterator.next(); - } - } - - private void mergeDown(List mergeDeletes, List srcDeletes, - int srcIndex) { - int index = srcIndex; - while(index < srcDeletes.size()) { - mergeDeletes.add(srcDeletes.get(index++)); - } - } - - - protected DeleteCompare compareDeletes(Delete oldDelete, Delete newDelete) { - - // Compare columns - // Just compairing qualifier portion, can keep on using Bytes.compareTo(). - int ret = Bytes.compareTo(oldDelete.buffer, oldDelete.qualifierOffset, - oldDelete.qualifierLength, newDelete.buffer, newDelete.qualifierOffset, - newDelete.qualifierLength); - - if(ret <= -1) { - return DeleteCompare.INCLUDE_OLD_NEXT_OLD; - } else if(ret >= 1) { - return DeleteCompare.INCLUDE_NEW_NEXT_NEW; - } - - // Same column - - // Branches below can be optimized. Keeping like this until testing - // is complete. - if(oldDelete.type == newDelete.type) { - // the one case where we can merge 2 deletes -> 1 delete. - if(oldDelete.type == KeyValue.Type.Delete.getCode()){ - if(oldDelete.timestamp > newDelete.timestamp) { - return DeleteCompare.INCLUDE_OLD_NEXT_OLD; - } else if(oldDelete.timestamp < newDelete.timestamp) { - return DeleteCompare.INCLUDE_NEW_NEXT_NEW; - } else { - return DeleteCompare.INCLUDE_OLD_NEXT_BOTH; - } - } - if(oldDelete.timestamp < newDelete.timestamp) { - return DeleteCompare.INCLUDE_NEW_NEXT_BOTH; - } - return DeleteCompare.INCLUDE_OLD_NEXT_BOTH; - } - - // old delete is more specific than the new delete. - // if the olddelete is newer then the newdelete, we have to - // keep it - if(oldDelete.type < newDelete.type) { - if(oldDelete.timestamp > newDelete.timestamp) { - return DeleteCompare.INCLUDE_OLD_NEXT_OLD; - } else if(oldDelete.timestamp < newDelete.timestamp) { - return DeleteCompare.NEXT_OLD; - } else { - return DeleteCompare.NEXT_OLD; - } - } - - // new delete is more specific than the old delete. - if(oldDelete.type > newDelete.type) { - if(oldDelete.timestamp > newDelete.timestamp) { - return DeleteCompare.NEXT_NEW; - } else if(oldDelete.timestamp < newDelete.timestamp) { - return DeleteCompare.INCLUDE_NEW_NEXT_NEW; - } else { - return DeleteCompare.NEXT_NEW; - } - } - - // Should never reach, - // throw exception for assertion? - throw new RuntimeException("GetDeleteTracker:compareDelete reached terminal state"); - } - - /** - * Internal class used to store the necessary information for a Delete. - *

          - * Rather than reparsing the KeyValue, or copying fields, this class points - * to the underlying KeyValue buffer with pointers to the qualifier and fields - * for type and timestamp. No parsing work is done in DeleteTracker now. - *

          - * Fields are public because they are accessed often, directly, and only - * within this class. - */ - protected static class Delete { - byte [] buffer; - int qualifierOffset; - int qualifierLength; - byte type; - long timestamp; - /** - * Constructor - * @param buffer - * @param qualifierOffset - * @param qualifierLength - * @param type - * @param timestamp - */ - public Delete(byte [] buffer, int qualifierOffset, int qualifierLength, - byte type, long timestamp) { - this.buffer = buffer; - this.qualifierOffset = qualifierOffset; - this.qualifierLength = qualifierLength; - this.type = type; - this.timestamp = timestamp; - } - } -} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0f199a8..c14cf17 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -64,7 +64,6 @@ package org.apache.hadoop.hbase.regionserver; import java.util.TreeMap; import java.util.TreeSet; import java.util.HashMap; - import java.util.HashSet; import java.util.Random; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -150,14 +149,16 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ final Path regiondir; private final Path regionCompactionDir; KeyValue.KVComparator comparator; + // Used to track updates + private RegionUpdateTracker updateTracker; - /* + /* * Set this when scheduling compaction if want the next compaction to be a * major compaction. Cleared each time through compaction code. */ private volatile boolean forceMajorCompaction = false; - /* + /* * Data structure of write state flags used coordinating flushes, * compactions and closes. */ @@ -289,6 +290,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this.memstoreFlushSize = flushSize; this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong("hbase.hregion.memstore.block.multiplier", 2); + this.updateTracker = new RegionUpdateTracker(); } /** @@ -940,6 +942,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ long sequenceId = -1L; long completeSequenceId = -1L; this.updatesLock.writeLock().lock(); + this.newScannerLock.writeLock().lock(); // Get current size of memstores. final long currentMemStoreSize = this.memstoreSize.get(); List storeFlushers = new ArrayList(); @@ -955,7 +958,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ for (StoreFlusher flusher: storeFlushers) { flusher.prepare(); } + // finally reset the update tracker so that the counter + // won't overflow + this.updateTracker = new RegionUpdateTracker(); } finally { + this.newScannerLock.writeLock().unlock(); this.updatesLock.writeLock().unlock(); } @@ -1099,9 +1106,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } // This will get all results for this store. TODO: Do we need to do this? Get get = new Get(key.getRow()); - List results = new ArrayList(); - store.get(get, null, results); - return new Result(results); + get.addFamily(family); + return get(get, null); } finally { splitsAndClosesLock.readLock().unlock(); } @@ -1163,8 +1169,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ checkReadOnly(); checkResources(); Integer lid = null; - newScannerLock.writeLock().lock(); splitsAndClosesLock.readLock().lock(); + updatesLock.readLock().lock(); try { byte [] row = delete.getRow(); // If we did not pass an existing row lock, obtain a new one @@ -1184,18 +1190,23 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ checkFamily(family); } } - - for(Map.Entry> e: delete.getFamilyMap().entrySet()) { - byte [] family = e.getKey(); - delete(family, e.getValue(), writeToWAL); + + int uid = updateTracker.assignUpdateIds(delete); + try { + for (Map.Entry> e : delete.getFamilyMap().entrySet()) { + byte[] family = e.getKey(); + delete(family, e.getValue(), writeToWAL, lid); + } + } finally { + updateTracker.commit(uid); } } finally { if(lockid == null) releaseRowLock(lid); + updatesLock.readLock().unlock(); splitsAndClosesLock.readLock().unlock(); - newScannerLock.writeLock().unlock(); } } - + /** * @param family @@ -1203,52 +1214,44 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @param writeToWAL * @throws IOException */ - public void delete(byte [] family, List kvs, boolean writeToWAL) + void delete(byte [] family, List kvs, boolean writeToWAL, Integer rowLock) throws IOException { long now = System.currentTimeMillis(); byte [] byteNow = Bytes.toBytes(now); boolean flush = false; - this.updatesLock.readLock().lock(); - try { - if (writeToWAL) { - this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), kvs, now); - } - long size = 0; - Store store = getStore(family); - for (KeyValue kv: kvs) { - // Check if time is LATEST, change to time of most recent addition if so - // This is expensive. - if (kv.isLatestTimestamp() && kv.isDeleteType()) { - List result = new ArrayList(1); - Get g = new Get(kv.getRow()); - NavigableSet qualifiers = - new TreeSet(Bytes.BYTES_COMPARATOR); - byte [] q = kv.getQualifier(); - if(q == null) q = HConstants.EMPTY_BYTE_ARRAY; - qualifiers.add(q); - get(store, g, qualifiers, result); - if (result.isEmpty()) { - // Nothing to delete - continue; - } - if (result.size() > 1) { - throw new RuntimeException("Unexpected size: " + result.size()); - } - KeyValue getkv = result.get(0); - Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), - getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); - } else { - kv.updateLatestStamp(byteNow); + if (writeToWAL) { + this.log.append(regionInfo.getRegionName(), + regionInfo.getTableDesc().getName(), kvs, now); + } + long size = 0; + Store store = getStore(family); + for (KeyValue kv : kvs) { + // Check if time is LATEST, change to time of most recent addition if so + // This is expensive. + if (kv.isLatestTimestamp() && kv.isDeleteType()) { + byte[] q = kv.getQualifier(); + if (q == null) q = HConstants.EMPTY_BYTE_ARRAY; + Get g = new Get(kv.getRow()); + g.addColumn(family, q); + List result = getImpl(g, rowLock); + if (result.isEmpty()) { + // Nothing to delete + continue; } - - size = this.memstoreSize.addAndGet(store.delete(kv)); + if (result.size() > 1) { + throw new RuntimeException("Unexpected size: " + result.size()); + } + KeyValue getkv = result.get(0); + Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), + getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); + } else { + kv.updateLatestStamp(byteNow); } - flush = isFlushSize(size); - } finally { - this.updatesLock.readLock().unlock(); + + size = this.memstoreSize.addAndGet(store.delete(kv)); } + flush = isFlushSize(size); if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); @@ -1296,8 +1299,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // read lock, resources may run out. For now, the thought is that this // will be extremely rare; we'll deal with it when it happens. checkResources(); - newScannerLock.writeLock().lock(); splitsAndClosesLock.readLock().lock(); + updatesLock.readLock().lock(); + int uid = updateTracker.assignUpdateIds(put); try { // We obtain a per-row lock, so other clients will block while one client // performs an update. The read lock is released by the client calling @@ -1322,13 +1326,14 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ if(lockid == null) releaseRowLock(lid); } } finally { + updatesLock.readLock().unlock(); splitsAndClosesLock.readLock().unlock(); - newScannerLock.writeLock().unlock(); + // commit last so that if there's an exception locks are released + updateTracker.commit(uid); } } - - //TODO, Think that gets/puts and deletes should be refactored a bit so that + //TODO, Think that gets/puts and deletes should be refactored a bit so that //the getting of the lock happens before, so that you would just pass it into //the methods. So in the case of checkAndPut you could just do lockRow, //get, put, unlockRow or something @@ -1344,62 +1349,61 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @throws IOException * @return true if the new put was execute, false otherwise */ - public boolean checkAndPut(byte [] row, byte [] family, byte [] qualifier, - byte [] expectedValue, Put put, Integer lockId, boolean writeToWAL) - throws IOException{ - checkReadOnly(); - //TODO, add check for value length or maybe even better move this to the - //client if this becomes a global setting - checkResources(); - splitsAndClosesLock.readLock().lock(); - try { - Get get = new Get(row, put.getRowLock()); - checkFamily(family); - get.addColumn(family, qualifier); + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] expectedValue, Put put, Integer lockId, boolean writeToWAL) + throws IOException { + checkReadOnly(); + //TODO, add check for value length or maybe even better move this to the + //client if this becomes a global setting + checkResources(); + splitsAndClosesLock.readLock().lock(); + int uid = updateTracker.assignUpdateIds(put); - byte [] now = Bytes.toBytes(System.currentTimeMillis()); + try { + Get get = new Get(row, put.getRowLock()); + checkFamily(family); + get.addColumn(family, qualifier); + + byte[] now = Bytes.toBytes(System.currentTimeMillis()); + + // Lock row + Integer lid = getLock(lockId, get.getRow()); + try { + List result = getImpl(get, lid); + boolean matches = false; + if (result.size() == 0 && expectedValue.length == 0) { + matches = true; + } else if (result.size() == 1) { + //Compare the expected value with the actual value + byte[] actualValue = result.get(0).getValue(); + matches = Bytes.equals(expectedValue, actualValue); + } + //If matches put the new put + if (matches) { + for (Map.Entry> entry : + put.getFamilyMap().entrySet()) { + byte[] fam = entry.getKey(); + checkFamily(fam); + List puts = entry.getValue(); + if (updateKeys(puts, now)) { + put(fam, puts, writeToWAL); + } + } + return true; + } + return false; + } finally { + if (lockId == null) releaseRowLock(lid); + } + } finally { + splitsAndClosesLock.readLock().unlock(); + // commit last so that if there's an exception locks are released + updateTracker.commit(uid); + } + } - // Lock row - Integer lid = getLock(lockId, get.getRow()); - List result = new ArrayList(); - try { - //Getting data - for(Map.Entry> entry: - get.getFamilyMap().entrySet()) { - get(this.stores.get(entry.getKey()), get, entry.getValue(), result); - } - boolean matches = false; - if (result.size() == 0 && expectedValue.length == 0) { - matches = true; - } else if(result.size() == 1) { - //Compare the expected value with the actual value - byte [] actualValue = result.get(0).getValue(); - matches = Bytes.equals(expectedValue, actualValue); - } - //If matches put the new put - if(matches) { - for(Map.Entry> entry : - put.getFamilyMap().entrySet()) { - byte [] fam = entry.getKey(); - checkFamily(fam); - List puts = entry.getValue(); - if(updateKeys(puts, now)) { - put(fam, puts, writeToWAL); - } - } - return true; - } - return false; - } finally { - if(lockId == null) releaseRowLock(lid); - } - } finally { - splitsAndClosesLock.readLock().unlock(); - } - } - - - /** + + /** * Checks if any stamps is Long.MAX_VALUE. If so, sets them to now. *

          * This acts to replace LATEST_TIMESTAMP with now. @@ -1513,6 +1517,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ return; } boolean flush = false; + // The lock may be 're-aquired' by this method, but this is ok + // because it's reentrant this.updatesLock.readLock().lock(); try { if (writeToWAL) { @@ -1768,6 +1774,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ private Filter filter; private RowFilterInterface oldFilter; private List results = new ArrayList(); + private int getScan; RegionScanner(Scan scan, List additionalScanners) { this.filter = scan.getFilter(); @@ -1777,15 +1784,17 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } else { this.stopRow = scan.getStopRow(); } - + this.getScan = scan.isGetScan() ? -1 : 0; + List scanners = new ArrayList(); if (additionalScanners != null) { scanners.addAll(additionalScanners); } - for (Map.Entry> entry : + RegionUpdateTracker.UpdateIdValidator validator = updateTracker.getValidator(); + for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - scanners.add(store.getScanner(scan, entry.getValue())); + scanners.add(store.getScanner(scan, entry.getValue(), validator)); } this.storeHeap = new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator); @@ -1886,7 +1895,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ return currentRow == null || (this.stopRow != null && comparator.compareRows(this.stopRow, 0, this.stopRow.length, - currentRow, 0, currentRow.length) <= 0); + currentRow, 0, currentRow.length) <= getScan); } private boolean filterRow() { @@ -2401,46 +2410,31 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } } - - // - // HBASE-880 - // - /** - * @param get - * @param lockid - * @return result - * @throws IOException - */ - public Result get(final Get get, final Integer lockid) throws IOException { - // Verify families are all valid - if (get.hasFamilies()) { - for (byte [] family: get.familySet()) { - checkFamily(family); - } - } else { // Adding all families to scanner - for (byte[] family: regionInfo.getTableDesc().getFamiliesKeys()) { - get.addFamily(family); - } - } - // Lock row - Integer lid = getLock(lockid, get.getRow()); - List result = new ArrayList(); - try { - for (Map.Entry> entry: - get.getFamilyMap().entrySet()) { - get(this.stores.get(entry.getKey()), get, entry.getValue(), result); - } - } finally { - if(lockid == null) releaseRowLock(lid); - } - return new Result(result); - } - private void get(final Store store, final Get get, - final NavigableSet qualifiers, List result) - throws IOException { - store.get(get, qualifiers, result); - } + public Result get(final Get get, final Integer lockid) throws IOException { + // Verify families are all valid + return new Result(getImpl(get, lockid)); + } + + private List getImpl(Get get, Integer lockid) throws IOException { + Scan scan = new Scan(get); + + // Lock row + List result = new ArrayList(); + Integer lid = getLock(lockid, get.getRow()); + InternalScanner scanner = null; + try { + scanner = getScanner(scan); + boolean hasNext = scanner.next(result); // we're getting only one row + assert !hasNext; // we should be getting only only line + } finally { + if (lockid == null) releaseRowLock(lid); + if (scanner != null) { + scanner.close(); + } + } + return result; + } /** * @@ -2465,10 +2459,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // Get the old value: Get get = new Get(row); get.addColumn(family, qualifier); - List results = new ArrayList(); - NavigableSet qualifiers = new TreeSet(Bytes.BYTES_COMPARATOR); - qualifiers.add(qualifier); - store.get(get, qualifiers, results); + List results = getImpl(get, lid); if (!results.isEmpty()) { KeyValue kv = results.get(0); @@ -2494,10 +2485,15 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // Now request the ICV to the store, this will set the timestamp // appropriately depending on if there is a value in memcache or not. // returns the - long size = store.updateColumnValue(row, family, qualifier, result); + int updateId = updateTracker.nextUpdateId(); + try { + long size = store.updateColumnValue(row, family, qualifier, result, updateId); - size = this.memstoreSize.addAndGet(size); - flush = isFlushSize(size); + size = this.memstoreSize.addAndGet(size); + flush = isFlushSize(size); + } finally { + updateTracker.commit(updateId); + } } finally { releaseRowLock(lid); } @@ -2526,7 +2522,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( (5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + - (20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); + (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) + diff --git a/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java b/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java index de3df22..dabd294 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.KeyValue; * has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent * get and set and won't throw ConcurrentModificationException when iterating. */ -class KeyValueSkipListSet implements NavigableSet, Cloneable { +class KeyValueSkipListSet implements NavigableSet { private ConcurrentNavigableMap delegatee; KeyValueSkipListSet(final KeyValue.KVComparator c) { @@ -201,17 +201,4 @@ class KeyValueSkipListSet implements NavigableSet, Cloneable { public T[] toArray(T[] a) { throw new UnsupportedOperationException("Not implemented"); } - - @Override - public KeyValueSkipListSet clone() { - assert this.delegatee.getClass() == ConcurrentSkipListMap.class; - KeyValueSkipListSet clonedSet = null; - try { - clonedSet = (KeyValueSkipListSet) super.clone(); - } catch (CloneNotSupportedException e) { - throw new InternalError(e.getMessage()); - } - clonedSet.delegatee = ((ConcurrentSkipListMap) this.delegatee).clone(); - return clonedSet; - } } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 7afe297..b8ff8cd 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -24,9 +24,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.rmi.UnexpectedException; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.NavigableSet; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicLong; @@ -37,7 +35,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -69,12 +66,6 @@ public class MemStore implements HeapSize { final KeyValue.KVComparator comparator; - // Used comparing versions -- same r/c and ts but different type. - final KeyValue.KVComparator comparatorIgnoreType; - - // Used comparing versions -- same r/c and type but different timestamp. - final KeyValue.KVComparator comparatorIgnoreTimestamp; - // Used to track own heapSize final AtomicLong size; @@ -91,23 +82,11 @@ public class MemStore implements HeapSize { */ public MemStore(final KeyValue.KVComparator c) { this.comparator = c; - this.comparatorIgnoreTimestamp = - this.comparator.getComparatorIgnoringTimestamps(); - this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType(); this.kvset = new KeyValueSkipListSet(c); this.snapshot = new KeyValueSkipListSet(c); this.size = new AtomicLong(DEEP_OVERHEAD); } - void dump() { - for (KeyValue kv: this.kvset) { - LOG.info(kv); - } - for (KeyValue kv: this.snapshot) { - LOG.info(kv); - } - } - /** * Creates a snapshot of the current memstore. * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.Map)} @@ -195,69 +174,7 @@ public class MemStore implements HeapSize { long delete(final KeyValue delete) { long s = 0; this.lock.readLock().lock(); - //Have to find out what we want to do here, to find the fastest way of - //removing things that are under a delete. - //Actions that will take place here are: - //1. Insert a delete and remove all the affected entries already in memstore - //2. In the case of a Delete and the matching put is found then don't insert - // the delete - //TODO Would be nice with if we had an iterator for this, so we could remove - //things that needs to be removed while iterating and don't have to go - //back and do it afterwards - try { - boolean notpresent = false; - List deletes = new ArrayList(); - SortedSet tail = this.kvset.tailSet(delete); - - //Parse the delete, so that it is only done once - byte [] deleteBuffer = delete.getBuffer(); - int deleteOffset = delete.getOffset(); - - int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset); - deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT; - - short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset); - deleteOffset += Bytes.SIZEOF_SHORT; - int deleteRowOffset = deleteOffset; - - deleteOffset += deleteRowLen; - - byte deleteFamLen = deleteBuffer[deleteOffset]; - deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen; - - int deleteQualifierOffset = deleteOffset; - int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen - - Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG - - Bytes.SIZEOF_BYTE; - - deleteOffset += deleteQualifierLen; - - int deleteTimestampOffset = deleteOffset; - deleteOffset += Bytes.SIZEOF_LONG; - byte deleteType = deleteBuffer[deleteOffset]; - - //Comparing with tail from memstore - for (KeyValue kv : tail) { - DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer, - deleteRowOffset, deleteRowLen, deleteQualifierOffset, - deleteQualifierLen, deleteTimestampOffset, deleteType, - comparator.getRawComparator()); - if (res == DeleteCode.DONE) { - break; - } else if (res == DeleteCode.DELETE) { - deletes.add(kv); - } // SKIP - } - - //Delete all the entries effected by the last added delete - for (KeyValue kv : deletes) { - notpresent = this.kvset.remove(kv); - s -= heapSizeChange(kv, notpresent); - } - - // Adding the delete to memstore. Add any value, as long as - // same instance each time. s += heapSizeChange(delete, this.kvset.add(delete)); } finally { this.lock.readLock().unlock(); @@ -266,56 +183,6 @@ public class MemStore implements HeapSize { return s; } - /** - * @param kv Find the row that comes after this one. If null, we return the - * first. - * @return Next row or null if none found. - */ - KeyValue getNextRow(final KeyValue kv) { - this.lock.readLock().lock(); - try { - return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot)); - } finally { - this.lock.readLock().unlock(); - } - } - - /* - * @param a - * @param b - * @return Return lowest of a or b or null if both a and b are null - */ - private KeyValue getLowest(final KeyValue a, final KeyValue b) { - if (a == null) { - return b; - } - if (b == null) { - return a; - } - return comparator.compareRows(a, b) <= 0? a: b; - } - - /* - * @param key Find row that follows this one. If null, return first. - * @param map Set to look in for a row beyond row. - * @return Next row or null if none found. If one found, will be a new - * KeyValue -- can be destroyed by subsequent calls to this method. - */ - private KeyValue getNextRow(final KeyValue key, - final NavigableSet set) { - KeyValue result = null; - SortedSet tail = key == null? set: set.tailSet(key); - // Iterate until we fall into the next row; i.e. move off current row - for (KeyValue kv: tail) { - if (comparator.compareRows(kv, key) <= 0) - continue; - // Note: Not suppressing deletes or expired cells. Needs to be handled - // by higher up functions. - result = kv; - break; - } - return result; - } /** * @param state @@ -437,75 +304,20 @@ public class MemStore implements HeapSize { /** * @return scanner on memstore and snapshot in this order. + * @param validator */ - KeyValueScanner [] getScanners() { + KeyValueScanner [] getScanners(RegionUpdateTracker.UpdateIdValidator validator) { this.lock.readLock().lock(); try { KeyValueScanner [] scanners = new KeyValueScanner[1]; - scanners[0] = new MemStoreScanner(this.kvset.clone(), - this.snapshot.clone(), this.comparator); + scanners[0] = new MemStoreScanner(this.kvset, + validator, this.snapshot, this.comparator); return scanners; } finally { this.lock.readLock().unlock(); } } - // - // HBASE-880/1249/1304 - // - - /** - * Perform a single-row Get on the and snapshot, placing results - * into the specified KV list. - *

          - * This will return true if it is determined that the query is complete - * and it is not necessary to check any storefiles after this. - *

          - * Otherwise, it will return false and you should continue on. - * @param matcher Column matcher - * @param result List to add results to - * @return true if done with store (early-out), false if not - * @throws IOException - */ - public boolean get(QueryMatcher matcher, List result) - throws IOException { - this.lock.readLock().lock(); - try { - if(internalGet(this.kvset, matcher, result) || matcher.isDone()) { - return true; - } - matcher.update(); - return internalGet(this.snapshot, matcher, result) || matcher.isDone(); - } finally { - this.lock.readLock().unlock(); - } - } - - /** - * Gets from either the memstore or the snapshop, and returns a code - * to let you know which is which. - * - * @param matcher - * @param result - * @return 1 == memstore, 2 == snapshot, 0 == none - */ - int getWithCode(QueryMatcher matcher, List result) throws IOException { - this.lock.readLock().lock(); - try { - boolean fromMemstore = internalGet(this.kvset, matcher, result); - if (fromMemstore || matcher.isDone()) - return 1; - - matcher.update(); - boolean fromSnapshot = internalGet(this.snapshot, matcher, result); - if (fromSnapshot || matcher.isDone()) - return 2; - - return 0; - } finally { - this.lock.readLock().unlock(); - } - } /** * Small utility functions for use by Store.incrementColumnValue @@ -518,46 +330,12 @@ public class MemStore implements HeapSize { this.lock.readLock().unlock(); } - /** - * - * @param set memstore or snapshot - * @param matcher query matcher - * @param result list to add results to - * @return true if done with store (early-out), false if not - * @throws IOException - */ - boolean internalGet(final NavigableSet set, - final QueryMatcher matcher, final List result) - throws IOException { - if(set.isEmpty()) return false; - // Seek to startKey - SortedSet tail = set.tailSet(matcher.getStartKey()); - for (KeyValue kv : tail) { - QueryMatcher.MatchCode res = matcher.match(kv); - switch(res) { - case INCLUDE: - result.add(kv); - break; - case SKIP: - break; - case NEXT: - return false; - case DONE: - return true; - default: - throw new RuntimeException("Unexpected " + res); - } - } - return false; - } - - public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (7 * ClassSize.REFERENCE)); + ClassSize.OBJECT + (5 * ClassSize.REFERENCE)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + - ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST + + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + ClassSize.ATOMIC_INTEGER + + ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST + (2 * ClassSize.CONCURRENT_SKIPLISTMAP)); /* diff --git a/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index d342bab..1c7fcbc 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -62,6 +62,7 @@ class MemStoreScanner implements KeyValueScanner { private SortedSet kvsetRef; + private RegionUpdateTracker.UpdateIdValidator validator; private SortedSet snapshotRef; private KeyValue.KVComparator comparatorRef; private Iterator kvsetIterator; @@ -74,14 +75,16 @@ class MemStoreScanner implements KeyValueScanner { /** * Create a new memstore scanner. * - * @param kvset the main key value set - * @param snapshot the snapshot set - * @param comparator the comparator to use + * @param kvset the main key value set + * @param validator any update newer than this will be ignored + * @param snapshot the snapshot set + * @param comparator the comparator to use */ MemStoreScanner(KeyValueSkipListSet kvset, - KeyValueSkipListSet snapshot, KeyValue.KVComparator comparator) { + RegionUpdateTracker.UpdateIdValidator validator, KeyValueSkipListSet snapshot, KeyValue.KVComparator comparator) { super(); this.kvsetRef = kvset; + this.validator = validator; this.snapshotRef = snapshot != null ? snapshot : EMPTY_SET; this.comparatorRef = comparator; this.kvsetIterator = kvsetRef.iterator(); @@ -93,10 +96,19 @@ class MemStoreScanner implements KeyValueScanner { if (nextKV == null) { if (currentSnapshotKV == null && snapshotIterator.hasNext()) { currentSnapshotKV = snapshotIterator.next(); + // reset the kv's update id to 'mark' it as a snapshot kv. + currentSnapshotKV.setUpdateId(KeyValue.UNSET_UPDATE_ID); } if (currentKvsetKV == null && kvsetIterator.hasNext()) { - currentKvsetKV = kvsetIterator.next(); + boolean isValid; + do { + currentKvsetKV = kvsetIterator.next(); + isValid = validator == null || validator.isValid(currentKvsetKV.getUpdateId()) ; + } while (!isValid && kvsetIterator.hasNext()); + if (!isValid) { + currentKvsetKV = null; + } } if (currentSnapshotKV != null && currentKvsetKV != null) { diff --git a/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java b/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java deleted file mode 100644 index 66bd6f1..0000000 --- a/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java +++ /dev/null @@ -1,390 +0,0 @@ -/* - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.util.NavigableSet; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KeyComparator; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * This is the primary class used to process KeyValues during a Get or Scan - * operation. - *

          - * It encapsulates the handling of the column and version input parameters to - * the query through a {@link ColumnTracker}. - *

          - * Deletes are handled using the {@link DeleteTracker}. - *

          - * All other query parameters are accessed from the client-specified Get. - *

          - * The primary method used is {@link #match} with the current KeyValue. It will - * return a {@link QueryMatcher.MatchCode} - * - * , deletes, - * versions, - */ -public class QueryMatcher { - /** - * {@link #match} return codes. These instruct the scanner moving through - * memstores and StoreFiles what to do with the current KeyValue. - *

          - * Additionally, this contains "early-out" language to tell the scanner to - * move on to the next File (memstore or Storefile), or to return immediately. - */ - static enum MatchCode { - /** - * Include KeyValue in the returned result - */ - INCLUDE, - - /** - * Do not include KeyValue in the returned result - */ - SKIP, - - /** - * Do not include, jump to next StoreFile or memstore (in time order) - */ - NEXT, - - /** - * Do not include, return current result - */ - DONE, - - /** - * These codes are used by the ScanQueryMatcher - */ - - /** - * Done with the row, seek there. - */ - SEEK_NEXT_ROW, - /** - * Done with column, seek to next. - */ - SEEK_NEXT_COL, - - /** - * Done with scan, thanks to the row filter. - */ - DONE_SCAN, - } - - /** Keeps track of deletes */ - protected DeleteTracker deletes; - - /** Keeps track of columns and versions */ - protected ColumnTracker columns; - - /** Key to seek to in memstore and StoreFiles */ - protected KeyValue startKey; - - /** Row comparator for the region this query is for */ - KeyComparator rowComparator; - - /** Row the query is on */ - protected byte [] row; - - /** TimeRange the query is for */ - protected TimeRange tr; - - /** Oldest allowed version stamp for TTL enforcement */ - protected long oldestStamp; - - protected Filter filter; - - /** - * Constructs a QueryMatcher for a Get. - * @param get - * @param family - * @param columns - * @param ttl - * @param rowComparator - */ - public QueryMatcher(Get get, byte [] family, - NavigableSet columns, long ttl, KeyComparator rowComparator, - int maxVersions) { - this.row = get.getRow(); - this.filter = get.getFilter(); - this.tr = get.getTimeRange(); - this.oldestStamp = System.currentTimeMillis() - ttl; - this.rowComparator = rowComparator; - this.deletes = new GetDeleteTracker(); - this.startKey = KeyValue.createFirstOnRow(row); - // Single branch to deal with two types of Gets (columns vs all in family) - if (columns == null || columns.size() == 0) { - this.columns = new WildcardColumnTracker(maxVersions); - } else { - this.columns = new ExplicitColumnTracker(columns, maxVersions); - } - } - - // For the subclasses. - protected QueryMatcher() { - super(); - } - - /** - * Constructs a copy of an existing QueryMatcher with a new row. - * @param matcher - * @param row - */ - public QueryMatcher(QueryMatcher matcher, byte [] row) { - this.row = row; - this.filter = matcher.filter; - this.tr = matcher.getTimeRange(); - this.oldestStamp = matcher.getOldestStamp(); - this.rowComparator = matcher.getRowComparator(); - this.columns = matcher.getColumnTracker(); - this.deletes = matcher.getDeleteTracker(); - this.startKey = matcher.getStartKey(); - reset(); - } - - /** - * Main method for ColumnMatcher. - *

          - * Determines whether the specified KeyValue should be included in the - * result or not. - *

          - * Contains additional language to early-out of the current file or to - * return immediately. - *

          - * Things to be checked:

            - *
          • Row - *
          • TTL - *
          • Type - *
          • TimeRange - *
          • Deletes - *
          • Column - *
          • Versions - * @param kv KeyValue to check - * @return MatchCode: include, skip, next, done - */ - public MatchCode match(KeyValue kv) { - if (this.columns.done()) { - return MatchCode.DONE; // done_row - } - if (this.filter != null && this.filter.filterAllRemaining()) { - return MatchCode.DONE; - } - // Directly act on KV buffer - byte [] bytes = kv.getBuffer(); - int offset = kv.getOffset(); - - int keyLength = Bytes.toInt(bytes, offset); - offset += KeyValue.ROW_OFFSET; - - short rowLength = Bytes.toShort(bytes, offset); - offset += Bytes.SIZEOF_SHORT; - - // scanners are relying on us to check the row first, and return - // "NEXT" when we are there. - /* Check ROW - * If past query's row, go to next StoreFile - * If not reached query's row, go to next KeyValue - */ - int ret = this.rowComparator.compareRows(row, 0, row.length, - bytes, offset, rowLength); - if (ret <= -1) { - // Have reached the next row - return MatchCode.NEXT; // got_to_next_row (end) - } else if (ret >= 1) { - // At a previous row - return MatchCode.SKIP; // skip_to_cur_row - } - offset += rowLength; - byte familyLength = bytes[offset]; - offset += Bytes.SIZEOF_BYTE + familyLength; - - int columnLength = keyLength + KeyValue.ROW_OFFSET - - (offset - kv.getOffset()) - KeyValue.TIMESTAMP_TYPE_SIZE; - int columnOffset = offset; - offset += columnLength; - - /* Check TTL - * If expired, go to next KeyValue - */ - long timestamp = Bytes.toLong(bytes, offset); - if(isExpired(timestamp)) { - /* KeyValue is expired, skip but don't early out since a non-expired - * kv could come next. - */ - return MatchCode.SKIP; // go to next kv - } - offset += Bytes.SIZEOF_LONG; - - /* Check TYPE - * If a delete within (or after) time range, add to deletes - * Move to next KeyValue - */ - byte type = bytes[offset]; - // if delete type == delete family, return done_row - - if (isDelete(type)) { - if (tr.withinOrAfterTimeRange(timestamp)) { - this.deletes.add(bytes, columnOffset, columnLength, timestamp, type); - } - return MatchCode.SKIP; // skip the delete cell. - } - - /* Check TimeRange - * If outside of range, move to next KeyValue - */ - if (!tr.withinTimeRange(timestamp)) { - return MatchCode.SKIP; // optimization chances here. - } - - /* Check Deletes - * If deleted, move to next KeyValue - */ - if (!deletes.isEmpty() && deletes.isDeleted(bytes, columnOffset, - columnLength, timestamp)) { - // 2 types of deletes: - // affects 1 cell or 1 column, so just skip the keyvalues. - // - delete family, so just skip to the next row. - return MatchCode.SKIP; - } - - /* Check Column and Versions - * Returns a MatchCode directly, identical language - * If matched column without enough versions, include - * If enough versions of this column or does not match, skip - * If have moved past - * If enough versions of everything, - * TODO: No mapping from Filter.ReturnCode to MatchCode. - */ - MatchCode mc = columns.checkColumn(bytes, columnOffset, columnLength); - if (mc == MatchCode.INCLUDE && this.filter != null) { - switch(this.filter.filterKeyValue(kv)) { - case INCLUDE: return MatchCode.INCLUDE; - case SKIP: return MatchCode.SKIP; - default: return MatchCode.DONE; - } - } - return mc; - } - - // should be in KeyValue. - protected boolean isDelete(byte type) { - return (type != KeyValue.Type.Put.getCode()); - } - - protected boolean isExpired(long timestamp) { - return (timestamp < oldestStamp); - } - - /** - * If matcher returns SEEK_NEXT_COL you may be able - * to get a hint of the next column to seek to - call this. - * If it returns null, there is no hint. - * - * @return immediately after match returns SEEK_NEXT_COL - null if no hint, - * else the next column we want - */ - public ColumnCount getSeekColumn() { - return this.columns.getColumnHint(); - } - - /** - * Called after reading each section (memstore, snapshot, storefiles). - *

            - * This method will update the internal structures to be accurate for - * the next section. - */ - public void update() { - this.deletes.update(); - this.columns.update(); - } - - /** - * Resets the current columns and deletes - */ - public void reset() { - this.deletes.reset(); - this.columns.reset(); - } - - /** - * Set current row - * @param row - */ - public void setRow(byte [] row) { - this.row = row; - } - - /** - * - * @return the start key - */ - public KeyValue getStartKey() { - return this.startKey; - } - - /** - * @return the TimeRange - */ - public TimeRange getTimeRange() { - return this.tr; - } - - /** - * @return the oldest stamp - */ - public long getOldestStamp() { - return this.oldestStamp; - } - - /** - * @return current KeyComparator - */ - public KeyComparator getRowComparator() { - return this.rowComparator; - } - - /** - * @return ColumnTracker - */ - public ColumnTracker getColumnTracker() { - return this.columns; - } - - /** - * @return DeleteTracker - */ - public DeleteTracker getDeleteTracker() { - return this.deletes; - } - - /** - * - * @return true when done. - */ - public boolean isDone() { - return this.columns.done(); - } -} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/RegionUpdateTracker.java b/src/java/org/apache/hadoop/hbase/regionserver/RegionUpdateTracker.java new file mode 100644 index 0000000..b0f8920 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/RegionUpdateTracker.java @@ -0,0 +1,268 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.KeyValue; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.Arrays; +import java.util.List; + +/** + * This class tracks updates (Puts and Deletes) to a region + * and allows a scanner to skip incomplete updates. + */ +public class RegionUpdateTracker { + /** + * The default base value. + */ + public static final int DEFAULT_BASE = KeyValue.UNSET_UPDATE_ID + 1; + /** + * The default window size. + */ + public static final int DEFAULT_WINDOW_SIZE = 16384; + + private static final int WINDOW_SLIDE_THRESHOLD = DEFAULT_WINDOW_SIZE / 2; + + /** + * Capable of validating an update id. + */ + public interface UpdateIdValidator { + + /** + * Validates the given update id assigned by one of + * {@link org.apache.hadoop.hbase.regionserver.RegionUpdateTracker} methods, + * + * @param updateId the id to validate + * @return true if valid + */ + boolean isValid(int updateId); + + } + + private final ReadWriteLock lock; + private final AtomicInteger updateId; + private final UpdateWindow updateWindow; + + /** + * Create a new region update tracker with the default base and window size. + */ + public RegionUpdateTracker() { + // todo make this constructor take it's values from conf. + // todo support HeapSize + this(DEFAULT_BASE, DEFAULT_WINDOW_SIZE); + } + + + private RegionUpdateTracker(int base, int windowSize) { + lock = new ReentrantReadWriteLock(true); + updateId = new AtomicInteger(base); + updateWindow = new UpdateWindow(base, windowSize); + } + + /** + * Assigns update ids to all KeyValues. + * + * @param delete key value container + * @return the id assigned. + */ + public final int assignUpdateIds(Delete delete) { + int uid = updateId.getAndIncrement(); + for (List list : delete.getFamilyMap().values()) { + for (KeyValue keyValue : list) { + keyValue.setUpdateId(uid); + } + } + return uid; + } + + /** + * Assigns update ids to all KeyValues. + * + * @param put key value container + * @return the id assigned + */ + public final int assignUpdateIds(Put put) { + int uid = updateId.getAndIncrement(); + for (List list : put.getFamilyMap().values()) { + for (KeyValue keyValue : list) { + keyValue.setUpdateId(uid); + } + } + return uid; + } + + /** + * Gets the next update id. + * @return the next update id + */ + public final int nextUpdateId() { + return updateId.getAndIncrement(); + } + + /** + * Commit the following update id. + * + * @param updateId the update id to commit + */ + public final void commit(int updateId) { + lock.writeLock().lock(); + try { + updateWindow.set(updateId); + if (updateId > updateWindow.getBase() + WINDOW_SLIDE_THRESHOLD) { + updateWindow.slide(); + } + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Gets a current update id validator. + *

            + * The validator is produced by cloning the current window, this ensures + * consistent record reading and correct write-your-own reads behavior but + * ignores all updates from this point on. + * + * @return a new validator. + */ + public final UpdateIdValidator getValidator() { + lock.readLock().lock(); + try { + return updateWindow.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("clone is supported", e); + } finally { + lock.readLock().unlock(); + } + } + + /** + * The update window is a 'score-board' used to track completed updates + * (puts/deletes). It assumes that the number of outstanding updates is + * a small fraction of windowSize. Updates can be committed in any order. + *

            + * It is also assumed that update ids are assigned consecutively and hence any + * id less than base is assumed to be valid. + *

            + * The {@link #slide()} operation moves the update window 'forward' by setting + * the base to the first 'non-full' word. When base is moved forward the score + * board is shifted left so that the window remains consistent. + *

            + * This class is not thread safe. + */ + static class UpdateWindow implements Cloneable, UpdateIdValidator { + + private static final int INDEX_SHIFT = 6; + private static final int WORD_BIT_COUNT = 64; + + private long[] bits; + private int base; + private int maxId; // exclusive + + public UpdateWindow(int base, int windowSize) { + this.base = base; + this.bits = new long[windowSize / WORD_BIT_COUNT + (windowSize % WORD_BIT_COUNT > 0 ? 1 : 0)]; + this.maxId = base + getWindowSize(); + } + + public int getBase() { + return base; + } + + public int getWindowSize(){ + return bits.length * WORD_BIT_COUNT; + } + + /** + * Sets an id - makes it valid. + * + * @param id the id to test. + */ + public final void set(int id) { + if (id >= base && id < maxId) { + int nbit = id - base; + int index = nbit >> INDEX_SHIFT; + bits[index] |= (1L << nbit); + } else { + throw new IllegalArgumentException("Can't set " + id + + ", base=" + base + ", window-size=" + getWindowSize()); + } + } + + /** + * Checks whether a id is valid (already set). + * + * @param id the id to check. + * @return true if valud + */ + @Override + public final boolean isValid(int id) { + if (id >= base && id < maxId) { + int nbit = id - base; + int index = nbit >> INDEX_SHIFT; + return (bits[index] & (1L << nbit)) != 0L; + } else if (id >= maxId) { + throw new IllegalArgumentException("Can't validate " + id + + ", base=" + base + ", window-size=" + getWindowSize()); + } + return true; + } + + + /** + * Slides the window: move base to the first incomplete word and shits bits + * left by base-delta. + */ + public final void slide() { + int index = 0; + while (index < bits.length && bits[index] == -1L) { + index++; + } + if (index > 0) { + base += index * WORD_BIT_COUNT; + System.arraycopy(bits, index, bits, 0, bits.length - index); + Arrays.fill(bits, bits.length - index, bits.length, 0L); + maxId = base + getWindowSize(); + } + } + + /** + * Clone the window (for scans). + * + * @return a cloned window. + * @throws CloneNotSupportedException + */ + @Override + public final UpdateWindow clone() throws CloneNotSupportedException { + UpdateWindow newUpdateWindow = (UpdateWindow) super.clone(); + newUpdateWindow.bits = this.bits.clone(); + return newUpdateWindow; + } + + } +} + + + diff --git a/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java b/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java index f4c7dc9..404c237 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java @@ -131,6 +131,7 @@ public class ScanDeleteTracker implements DeleteTracker { deleteBuffer = null; } else { //Should never happen, throw Exception + throw new AssertionError("Should never happen"); } } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index a42289d..9676070 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -23,15 +23,101 @@ package org.apache.hadoop.hbase.regionserver; import java.util.NavigableSet; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.util.Bytes; /** * A query matcher that is specifically designed for the scan case. */ -public class ScanQueryMatcher extends QueryMatcher { +public class ScanQueryMatcher { + /** + * {@link #match} return codes. These instruct the scanner moving through + * memstores and StoreFiles what to do with the current KeyValue. + *

            + * Additionally, this contains "early-out" language to tell the scanner to + * move on to the next File (memstore or Storefile), or to return immediately. + */ + static enum MatchCode { + /** + * Include KeyValue in the returned result + */ + INCLUDE, + + /** + * Do not include KeyValue in the returned result + */ + SKIP, + + /** + * Do not include, jump to next StoreFile or memstore (in time order) + */ + NEXT, + + /** + * Do not include, return current result + */ + DONE, + + /** + * These codes are used by the ScanQueryMatcher + */ + + /** + * Done with the row, seek there. + */ + SEEK_NEXT_ROW, + /** + * Done with column, seek to next. + */ + SEEK_NEXT_COL, + + /** + * Done with scan, thanks to the row filter. + */ + DONE_SCAN, + } + + /** + * Keeps track of deletes + */ + protected DeleteTracker deletes; + + /** + * Keeps track of columns and versions + */ + protected ColumnTracker columns; + + /** + * Key to seek to in memstore and StoreFiles + */ + protected KeyValue startKey; + + /** + * Row comparator for the region this query is for + */ + KeyValue.KeyComparator rowComparator; + + /** + * Row the query is on + */ + protected byte[] row; + + /** + * TimeRange the query is for + */ + protected TimeRange tr; + + /** + * Oldest allowed version stamp for TTL enforcement + */ + protected long oldestStamp; + + protected Filter filter; + // have to support old style filter for now. private RowFilterInterface oldFilter; // Optimization so we can skip lots of compares when we decide to skip @@ -55,7 +141,12 @@ public class ScanQueryMatcher extends QueryMatcher { this.rowComparator = rowComparator; this.deletes = new ScanDeleteTracker(); this.startKey = KeyValue.createFirstOnRow(scan.getStartRow()); - this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow()); + if (scan.isGetScan()) { + // support the get-scan use case + this.stopKey = KeyValue.createLastOnRow(scan.getStopRow()); + } else { + this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow()); + } this.filter = scan.getFilter(); this.oldFilter = scan.getOldFilter(); @@ -188,19 +279,104 @@ public class ScanQueryMatcher extends QueryMatcher { return MatchCode.SEEK_NEXT_ROW; } + // should be in KeyValue. + protected boolean isDelete(byte type) { + return (type != KeyValue.Type.Put.getCode()); + } + + protected boolean isExpired(long timestamp) { + return (timestamp < oldestStamp); + } + + /** + * If matcher returns SEEK_NEXT_COL you may be able + * to get a hint of the next column to seek to - call this. + * If it returns null, there is no hint. + * + * @return immediately after match returns SEEK_NEXT_COL - null if no hint, + * else the next column we want + */ + public ColumnCount getSeekColumn() { + return this.columns.getColumnHint(); + } + + /** + * Called after reading each section (memstore, snapshot, storefiles). + *

            + * This method will update the internal structures to be accurate for + * the next section. + */ + public void update() { + this.deletes.update(); + this.columns.update(); + } + + /** + * Resets the current columns and deletes + */ + public void reset() { + this.deletes.reset(); + this.columns.reset(); + stickyNextRow = false; + } + /** * Set current row + * * @param row */ - @Override - public void setRow(byte [] row) { + public void setRow(byte[] row) { this.row = row; reset(); } - @Override - public void reset() { - super.reset(); - stickyNextRow = false; + /** + * @return the start key + */ + public KeyValue getStartKey() { + return this.startKey; + } + + /** + * @return the TimeRange + */ + public TimeRange getTimeRange() { + return this.tr; + } + + /** + * @return the oldest stamp + */ + public long getOldestStamp() { + return this.oldestStamp; + } + + /** + * @return current KeyComparator + */ + public KeyValue.KeyComparator getRowComparator() { + return this.rowComparator; + } + + /** + * @return ColumnTracker + */ + public ColumnTracker getColumnTracker() { + return this.columns; + } + + /** + * @return DeleteTracker + */ + public DeleteTracker getDeleteTracker() { + return this.deletes; } + + /** + * @return true when done. + */ + public boolean isDone() { + return this.columns.done(); + } + } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java index 435d512..388a313 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java @@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode; +import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; /** diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/java/org/apache/hadoop/hbase/regionserver/Store.java index 8b432c6..06f2a91 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -47,8 +47,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.KeyValue.KeyComparator; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.Compression; @@ -1279,10 +1277,10 @@ public class Store implements HConstants, HeapSize { * Return a scanner for both the memstore and the HStore files */ protected KeyValueScanner getScanner(Scan scan, - final NavigableSet targetCols) { + final NavigableSet targetCols, RegionUpdateTracker.UpdateIdValidator validator) { lock.readLock().lock(); try { - return new StoreScanner(this, scan, targetCols); + return new StoreScanner(this, scan, targetCols, validator); } finally { lock.readLock().unlock(); } @@ -1418,55 +1416,6 @@ public class Store implements HConstants, HeapSize { // /** - * Retrieve results from this store given the specified Get parameters. - * @param get Get operation - * @param columns List of columns to match, can be empty (not null) - * @param result List to add results to - * @throws IOException - */ - public void get(Get get, NavigableSet columns, List result) - throws IOException { - KeyComparator keyComparator = this.comparator.getRawComparator(); - - // Column matching and version enforcement - QueryMatcher matcher = new QueryMatcher(get, this.family.getName(), columns, - this.ttl, keyComparator, versionsToReturn(get.getMaxVersions())); - this.lock.readLock().lock(); - try { - // Read from memstore - if(this.memstore.get(matcher, result)) { - // Received early-out from memstore - return; - } - - // Check if we even have storefiles - if (this.storefiles.isEmpty()) { - return; - } - - // Get storefiles for this store - List storefileScanners = new ArrayList(); - for (StoreFile sf : this.storefiles.descendingMap().values()) { - HFile.Reader r = sf.getReader(); - if (r == null) { - LOG.warn("StoreFile " + sf + " has a null Reader"); - continue; - } - // Get a scanner that caches the block and uses pread - storefileScanners.add(r.getScanner(true, true)); - } - - // StoreFileGetScan will handle reading this store's storefiles - StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher); - - // Run a GET scan and put results into the specified list - scanner.get(result); - } finally { - this.lock.readLock().unlock(); - } - } - - /** * Increments the value for the given row/family/qualifier * @param row * @param f @@ -1475,48 +1424,50 @@ public class Store implements HConstants, HeapSize { * @return memstore size delta * @throws IOException */ - public long updateColumnValue(byte [] row, byte [] f, - byte [] qualifier, long newValue) - throws IOException { + public long updateColumnValue(byte[] row, byte[] f, + byte[] qualifier, long newValue, int updateId) + throws IOException { List result = new ArrayList(); - KeyComparator keyComparator = this.comparator.getRawComparator(); - KeyValue kv = null; + KeyValue kv; // Setting up the QueryMatcher - Get get = new Get(row); + final Scan scan = new Scan(row, row); NavigableSet qualifiers = new TreeSet(Bytes.BYTES_COMPARATOR); qualifiers.add(qualifier); - QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl, - keyComparator, 1); + scan.addColumn(f, qualifier); // lock memstore snapshot for this critical section: this.lock.readLock().lock(); memstore.readLockLock(); try { - int memstoreCode = this.memstore.getWithCode(matcher, result); + StoreScanner scanner = new StoreScanner(scan, f, + ttl, memstore.comparator, qualifiers, memstore.getScanners(null)); + while (scanner.next(result)) ; - if (memstoreCode != 0) { + if (result.size() > 0) { // was in memstore (or snapshot) - kv = result.get(0).clone(); - byte [] buffer = kv.getBuffer(); + final KeyValue mskv = result.get(0); + boolean fromSnapshot = mskv.getUpdateId() == KeyValue.UNSET_UPDATE_ID; + kv = mskv.clone(); + byte[] buffer = kv.getBuffer(); int valueOffset = kv.getValueOffset(); Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0, - Bytes.SIZEOF_LONG); - if (memstoreCode == 2) { - // from snapshot, assign new TS + Bytes.SIZEOF_LONG); + if (fromSnapshot) { long currTs = System.currentTimeMillis(); if (currTs == kv.getTimestamp()) { currTs++; // unlikely but catastrophic } Bytes.putBytes(buffer, kv.getTimestampOffset(), - Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG); + Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG); } } else { kv = new KeyValue(row, f, qualifier, - System.currentTimeMillis(), - Bytes.toBytes(newValue)); + System.currentTimeMillis(), + Bytes.toBytes(newValue)); } + kv.setUpdateId(updateId); return add(kv); // end lock } finally { diff --git a/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java b/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java deleted file mode 100644 index 6be4a8b..0000000 --- a/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; - -/** - * Use to execute a get by scanning all the store files in order. - */ -public class StoreFileGetScan { - - private List scanners; - private QueryMatcher matcher; - - private KeyValue startKey; - - /** - * Constructor - * @param scanners - * @param matcher - */ - public StoreFileGetScan(List scanners, QueryMatcher matcher) { - this.scanners = scanners; - this.matcher = matcher; - this.startKey = matcher.getStartKey(); - } - - /** - * Performs a GET operation across multiple StoreFiles. - *

            - * This style of StoreFile scanning goes through each - * StoreFile in its entirety, most recent first, before - * proceeding to the next StoreFile. - *

            - * This strategy allows for optimal, stateless (no persisted Scanners) - * early-out scenarios. - * @param result List to add results to - * @throws IOException - */ - public void get(List result) throws IOException { - for(HFileScanner scanner : this.scanners) { - this.matcher.update(); - if (getStoreFile(scanner, result) || matcher.isDone()) { - return; - } - } - } - - /** - * Performs a GET operation on a single StoreFile. - * @param scanner - * @param result - * @return true if done with this store, false if must continue to next - * @throws IOException - */ - public boolean getStoreFile(HFileScanner scanner, List result) - throws IOException { - if (scanner.seekTo(startKey.getBuffer(), startKey.getKeyOffset(), - startKey.getKeyLength()) == -1) { - // No keys in StoreFile at or after specified startKey - // First row may be = our row, so we have to check anyways. - byte [] firstKey = scanner.getReader().getFirstKey(); - // Key may be null if storefile is empty. - if (firstKey == null) return false; - short rowLen = Bytes.toShort(firstKey, 0, Bytes.SIZEOF_SHORT); - int rowOffset = Bytes.SIZEOF_SHORT; - if (this.matcher.rowComparator.compareRows(firstKey, rowOffset, rowLen, - startKey.getBuffer(), startKey.getRowOffset(), startKey.getRowLength()) - != 0) - return false; - scanner.seekTo(); - } - do { - KeyValue kv = scanner.getKeyValue(); - switch(matcher.match(kv)) { - case INCLUDE: - result.add(kv); - break; - case SKIP: - break; - case NEXT: - return false; - case DONE: - return true; - } - } while(scanner.next()); - return false; - } - -} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 4a6ce1b..7b5eb49 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -47,13 +47,16 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb // Used to indicate that the scanner has closed (see HBASE-1107) private final AtomicBoolean closing = new AtomicBoolean(false); + private RegionUpdateTracker.UpdateIdValidator validator; /** * Opens a scanner across memstore, snapshot, and all StoreFiles. */ - StoreScanner(Store store, Scan scan, final NavigableSet columns) { + StoreScanner(Store store, Scan scan, final NavigableSet columns, + RegionUpdateTracker.UpdateIdValidator validator) { this.store = store; this.cacheBlocks = scan.getCacheBlocks(); + this.validator = validator; matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), columns, store.ttl, store.comparator.getRawComparator(), store.versionsToReturn(scan.getMaxVersions())); @@ -100,7 +103,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb final KeyValueScanner [] scanners) { this.store = null; this.cacheBlocks = scan.getCacheBlocks(); - this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, + this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, comparator.getRawComparator(), scan.getMaxVersions()); // Seek all scanners to the initial key @@ -115,7 +118,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb */ private List getScanners() { List scanners = getStoreFileScanners(); - KeyValueScanner [] memstorescanners = this.store.memstore.getScanners(); + KeyValueScanner [] memstorescanners = this.store.memstore.getScanners(this.validator); for (int i = memstorescanners.length - 1; i >= 0; i--) { scanners.add(memstorescanners[i]); } @@ -158,7 +161,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb KeyValue kv; List results = new ArrayList(); while((kv = this.heap.peek()) != null) { - QueryMatcher.MatchCode qcode = matcher.match(kv); + ScanQueryMatcher.MatchCode qcode = matcher.match(kv); switch(qcode) { case INCLUDE: KeyValue next = this.heap.next(); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/WildcardColumnTracker.java b/src/java/org/apache/hadoop/hbase/regionserver/WildcardColumnTracker.java deleted file mode 100644 index 5db8f35..0000000 --- a/src/java/org/apache/hadoop/hbase/regionserver/WildcardColumnTracker.java +++ /dev/null @@ -1,331 +0,0 @@ -/** - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * This class is used for the tracking and enforcement of columns and numbers - * of versions during the course of a Get or Scan operation, when all available - * column qualifiers have been asked for in the query. - *

            - * This class is utilized by {@link QueryMatcher} through two methods: - *

            • {@link #checkColumn} is called when a Put satisfies all other - * conditions of the query. This method returns a {@link MatchCode} to define - * what action should be taken. - *
            • {@link #update} is called at the end of every StoreFile or memstore. - *

              - * This class is NOT thread-safe as queries are never multi-threaded - */ -public class WildcardColumnTracker implements ColumnTracker { - - private int maxVersions; - - protected List columns; - private int index; - private ColumnCount column; - - private List newColumns; - private int newIndex; - private ColumnCount newColumn; - - /** - * Default constructor. - * @param maxVersions maximum versions to return per columns - */ - public WildcardColumnTracker(int maxVersions) { - this.maxVersions = maxVersions; - reset(); - } - - public void reset() { - this.index = 0; - this.column = null; - this.columns = null; - this.newColumns = new ArrayList(); - this.newIndex = 0; - this.newColumn = null; - } - - /** - * Can never early-out from reading more storefiles in Wildcard case. - */ - public boolean done() { - return false; - } - - // wildcard scanners never have column hints. - public ColumnCount getColumnHint() { - return null; - } - - /** - * Checks against the parameters of the query and the columns which have - * already been processed by this query. - * @param bytes KeyValue buffer - * @param offset offset to the start of the qualifier - * @param length length of the qualifier - * @return MatchCode telling QueryMatcher what action to take - */ - public MatchCode checkColumn(byte [] bytes, int offset, int length) { - boolean recursive = false; - do { - - // Nothing to match against, add to new and include - if(this.column == null && this.newColumn == null) { - newColumns.add(new ColumnCount(bytes, offset, length, 1)); - this.newColumn = newColumns.get(newIndex); - return MatchCode.INCLUDE; - } - - // Nothing old, compare against new - if(this.column == null && this.newColumn != null) { - int ret = Bytes.compareTo(newColumn.getBuffer(), newColumn.getOffset(), - newColumn.getLength(), bytes, offset, length); - - // Same column - if(ret == 0) { - if(newColumn.increment() > this.maxVersions) { - return MatchCode.SKIP; - } - return MatchCode.INCLUDE; - } - - // Specified column is bigger than current column - // Move down current column and check again - if(ret <= -1) { - if(++newIndex == newColumns.size()) { - // No more, add to end and include - newColumns.add(new ColumnCount(bytes, offset, length, 1)); - this.newColumn = newColumns.get(newIndex); - return MatchCode.INCLUDE; - } - this.newColumn = newColumns.get(newIndex); - //return checkColumn(bytes, offset, length); - recursive = true; - continue; - } - - // ret >= 1 - // Specified column is smaller than current column - // Nothing to match against, add to new and include - newColumns.add(new ColumnCount(bytes, offset, length, 1)); - this.newColumn = newColumns.get(++newIndex); - return MatchCode.INCLUDE; - } - - // Nothing new, compare against old - if(this.newColumn == null && this.column != null) { - int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), - column.getLength(), bytes, offset, length); - - // Same column - if(ret == 0) { - if(column.increment() > this.maxVersions) { - return MatchCode.SKIP; - } - return MatchCode.INCLUDE; - } - - // Specified column is bigger than current column - // Move down current column and check again - if(ret <= -1) { - if(++index == columns.size()) { - // No more, add to new and include (new was empty prior to this) - newColumns.add(new ColumnCount(bytes, offset, length, 1)); - this.newColumn = newColumns.get(newIndex); - this.column = null; - return MatchCode.INCLUDE; - } - this.column = columns.get(index); - //return checkColumn(bytes, offset, length); - recursive = true; - continue; - } - - // ret >= 1 - // Specified column is smaller than current column - // Nothing to match against, add to new and include - newColumns.add(new ColumnCount(bytes, offset, length, 1)); - this.newColumn = newColumns.get(newIndex); - return MatchCode.INCLUDE; - } - - if (column != null && newColumn != null) { - // There are new and old, figure which to check first - int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), - column.getLength(), newColumn.getBuffer(), newColumn.getOffset(), - newColumn.getLength()); - - // Old is smaller than new, compare against old - if(ret <= -1) { - ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), - column.getLength(), bytes, offset, length); - - // Same column - if(ret == 0) { - if(column.increment() > this.maxVersions) { - return MatchCode.SKIP; - } - return MatchCode.INCLUDE; - } - - // Specified column is bigger than current column - // Move down current column and check again - if(ret <= -1) { - if(++index == columns.size()) { - this.column = null; - } else { - this.column = columns.get(index); - } - //return checkColumn(bytes, offset, length); - recursive = true; - continue; - } - - // ret >= 1 - // Specified column is smaller than current column - // Nothing to match against, add to new and include - newColumns.add(new ColumnCount(bytes, offset, length, 1)); - return MatchCode.INCLUDE; - } - } - - if (newColumn != null) { - // Cannot be equal, so ret >= 1 - // New is smaller than old, compare against new - int ret = Bytes.compareTo(newColumn.getBuffer(), newColumn.getOffset(), - newColumn.getLength(), bytes, offset, length); - - // Same column - if(ret == 0) { - if(newColumn.increment() > this.maxVersions) { - return MatchCode.SKIP; - } - return MatchCode.INCLUDE; - } - - // Specified column is bigger than current column - // Move down current column and check again - if(ret <= -1) { - if(++newIndex == newColumns.size()) { - this.newColumn = null; - } else { - this.newColumn = newColumns.get(newIndex); - } - //return checkColumn(bytes, offset, length); - recursive = true; - continue; - } - - // ret >= 1 - // Specified column is smaller than current column - // Nothing to match against, add to new and include - newColumns.add(new ColumnCount(bytes, offset, length, 1)); - return MatchCode.INCLUDE; - } - } while(recursive); - - // No match happened, add to new and include - newColumns.add(new ColumnCount(bytes, offset, length, 1)); - return MatchCode.INCLUDE; - } - - /** - * Called at the end of every StoreFile or memstore. - */ - public void update() { - // If no previous columns, use new columns and return - if(this.columns == null || this.columns.size() == 0) { - if(this.newColumns.size() > 0){ - finish(newColumns); - } - return; - } - - // If no new columns, retain previous columns and return - if(this.newColumns.size() == 0) { - this.index = 0; - this.column = this.columns.get(index); - return; - } - - // Merge previous columns with new columns - // There will be no overlapping - List mergeColumns = new ArrayList( - columns.size() + newColumns.size()); - index = 0; - newIndex = 0; - column = columns.get(0); - newColumn = newColumns.get(0); - while(true) { - int ret = Bytes.compareTo( - column.getBuffer(), column.getOffset(),column.getLength(), - newColumn.getBuffer(), newColumn.getOffset(), newColumn.getLength()); - - // Existing is smaller than new, add existing and iterate it - if(ret <= -1) { - mergeColumns.add(column); - if(++index == columns.size()) { - // No more existing left, merge down rest of new and return - mergeDown(mergeColumns, newColumns, newIndex); - finish(mergeColumns); - return; - } - column = columns.get(index); - continue; - } - - // New is smaller than existing, add new and iterate it - mergeColumns.add(newColumn); - if(++newIndex == newColumns.size()) { - // No more new left, merge down rest of existing and return - mergeDown(mergeColumns, columns, index); - finish(mergeColumns); - return; - } - newColumn = newColumns.get(newIndex); - continue; - } - } - - private void mergeDown(List mergeColumns, - List srcColumns, int srcIndex) { - int index = srcIndex; - while(index < srcColumns.size()) { - mergeColumns.add(srcColumns.get(index++)); - } - } - - private void finish(List mergeColumns) { - this.columns = mergeColumns; - this.index = 0; - this.column = this.columns.size() > 0? columns.get(index) : null; - - this.newColumns = new ArrayList(); - this.newIndex = 0; - this.newColumn = null; - } - -} diff --git a/src/test/org/apache/hadoop/hbase/TestKeyValue.java b/src/test/org/apache/hadoop/hbase/TestKeyValue.java index 1b74ae1..3ee9037 100644 --- a/src/test/org/apache/hadoop/hbase/TestKeyValue.java +++ b/src/test/org/apache/hadoop/hbase/TestKeyValue.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.Set; import java.util.TreeSet; +import java.util.Comparator; import junit.framework.TestCase; @@ -276,4 +277,49 @@ public class TestKeyValue extends TestCase { // TODO actually write this test! } + + public void testLastOnRow() { + byte[] row = Bytes.toBytes("row"); + + final KeyValue lastOnRow = KeyValue.createLastOnRow(row); + final KeyValue rowKV = new KeyValue(row, Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v")); + final KeyValue firstOnRow = KeyValue.createFirstOnRow(row); + final KeyValue firstOnLesserRow = KeyValue.createFirstOnRow(Bytes.toBytes("rov")); + final KeyValue lesserRowKV = new KeyValue(Bytes.toBytes("rov"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v")); + final KeyValue firstOnGreaterRow = KeyValue.createFirstOnRow(Bytes.toBytes("rox")); + final KeyValue greaterRowKV = new KeyValue(Bytes.toBytes("rox"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v")); + assertEquals(lastOnRow.getType(), KeyValue.Type.Minimum.ordinal()); + + + checkComparison(firstOnRow, lastOnRow, false); + checkComparison(rowKV, lastOnRow, false); + checkComparison(lastOnRow, lastOnRow.clone(), null); + checkComparison(firstOnLesserRow, lastOnRow, false); + checkComparison(lesserRowKV, lastOnRow, false); + checkComparison(firstOnGreaterRow, lastOnRow, true); + checkComparison(greaterRowKV, lastOnRow, true); + } + + /** + * {@link #testLastOnRow()} subroutine. + * + * @param left left kv + * @param right right kv + * @param positive use null for equal + */ + private void checkComparison(KeyValue left, KeyValue right, Boolean positive) { + Comparator kvc = KeyValue.COMPARATOR; + final int compareResult = kvc.compare(left, right); + final int complementaryCompareResult = kvc.compare(right, left); + if (positive == null) { + assertEquals(0, compareResult); + assertEquals(0, complementaryCompareResult); + } else if (positive) { + assertTrue(compareResult > 0); + assertTrue(complementaryCompareResult < 0); + } else { + assertTrue(compareResult < 0); + assertTrue(complementaryCompareResult > 0); + } + } } diff --git a/src/test/org/apache/hadoop/hbase/client/TestClient.java b/src/test/org/apache/hadoop/hbase/client/TestClient.java index d8b58bc..e2a0f3e 100644 --- a/src/test/org/apache/hadoop/hbase/client/TestClient.java +++ b/src/test/org/apache/hadoop/hbase/client/TestClient.java @@ -1476,21 +1476,18 @@ public class TestClient extends HBaseClusterTestCase { put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); ht.put(put); - - // The Get returns the latest value but then does not return the - // oldest, which was never deleted, ts[1]. - + + // The Get and the scanner return the previous values, the expected-unexpected behavior + get = new Get(ROW); get.addFamily(FAMILIES[0]); get.setMaxVersions(Integer.MAX_VALUE); result = ht.get(get); assertNResult(result, ROW, FAMILIES[0], QUALIFIER, - new long [] {ts[2], ts[3], ts[4]}, - new byte[][] {VALUES[2], VALUES[3], VALUES[4]}, + new long [] {ts[1], ts[2], ts[3]}, + new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, 0, 2); - // The Scanner returns the previous values, the expected-unexpected behavior - scan = new Scan(ROW); scan.addFamily(FAMILIES[0]); scan.setMaxVersions(Integer.MAX_VALUE); @@ -1499,7 +1496,7 @@ public class TestClient extends HBaseClusterTestCase { new long [] {ts[1], ts[2], ts[3]}, new byte[][] {VALUES[1], VALUES[2], VALUES[3]}, 0, 2); - + // Test deleting an entire family from one row but not the other various ways put = new Put(ROWS[0]); @@ -1533,10 +1530,13 @@ public class TestClient extends HBaseClusterTestCase { delete = new Delete(ROWS[2]); delete.deleteColumn(FAMILIES[1], QUALIFIER); - delete.deleteColumn(FAMILIES[1], QUALIFIER); delete.deleteColumn(FAMILIES[2], QUALIFIER); ht.delete(delete); - + // API change: can't delete two 'latest' versions of the same row in one op. + delete = new Delete(ROWS[2]); + delete.deleteColumn(FAMILIES[1], QUALIFIER); + ht.delete(delete); + get = new Get(ROWS[0]); get.addFamily(FAMILIES[1]); get.addFamily(FAMILIES[2]); @@ -1576,7 +1576,7 @@ public class TestClient extends HBaseClusterTestCase { result = getSingleScanResult(ht, scan); assertTrue("Expected 2 keys but received " + result.size(), result.size() == 2); - + get = new Get(ROWS[2]); get.addFamily(FAMILIES[1]); get.addFamily(FAMILIES[2]); diff --git a/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java b/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java index c7a3b0a..f265a5e 100644 --- a/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -262,6 +262,7 @@ public class TestHeapSize extends TestCase { expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); expected += ClassSize.estimateBase(CopyOnWriteArraySet.class, false); expected += ClassSize.estimateBase(CopyOnWriteArrayList.class, false); + expected += ClassSize.estimateBase(AtomicInteger.class, false); if(expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(ReentrantReadWriteLock.class, true); @@ -269,6 +270,7 @@ public class TestHeapSize extends TestCase { ClassSize.estimateBase(ConcurrentSkipListMap.class, true); ClassSize.estimateBase(CopyOnWriteArraySet.class, true); ClassSize.estimateBase(CopyOnWriteArrayList.class, true); + ClassSize.estimateBase(AtomicInteger.class, true); assertEquals(expected, actual); } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestDeleteCompare.java b/src/test/org/apache/hadoop/hbase/regionserver/TestDeleteCompare.java deleted file mode 100644 index 21d9194..0000000 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestDeleteCompare.java +++ /dev/null @@ -1,191 +0,0 @@ -package org.apache.hadoop.hbase.regionserver; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueTestUtil; -import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode; -import org.apache.hadoop.hbase.util.Bytes; - -import junit.framework.TestCase; - -public class TestDeleteCompare extends TestCase { - - //Cases to compare: - //1. DeleteFamily and whatever of the same row - //2. DeleteColumn and whatever of the same row + qualifier - //3. Delete and the matching put - //4. Big test that include starting on the wrong row and qualifier - public void testDeleteCompare_DeleteFamily() { - //Creating memstore - Set memstore = new TreeSet(KeyValue.COMPARATOR); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col1", 3, "d-c")); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col1", 2, "d-c")); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col1", 1, "d-c")); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col2", 1, "d-c")); - - memstore.add(KeyValueTestUtil.create("row11", "fam", "col3", 3, "d-c")); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col3", 2, "d-c")); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col3", 1, "d-c")); - - memstore.add(KeyValueTestUtil.create("row21", "fam", "col1", 1, "d-c")); - - //Creating expected result - List expected = new ArrayList(); - expected.add(DeleteCode.SKIP); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.SKIP); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DONE); - - KeyValue delete = KeyValueTestUtil.create("row11", - "fam", "", 2, KeyValue.Type.DeleteFamily, "dont-care"); - byte [] deleteBuffer = delete.getBuffer(); - int deleteRowOffset = delete.getRowOffset(); - short deleteRowLen = delete.getRowLength(); - int deleteQualifierOffset = delete.getQualifierOffset(); - int deleteQualifierLen = delete.getQualifierLength(); - int deleteTimestampOffset = deleteQualifierOffset + deleteQualifierLen; - byte deleteType = deleteBuffer[deleteTimestampOffset +Bytes.SIZEOF_LONG]; - - List actual = new ArrayList(); - for(KeyValue mem : memstore){ - actual.add(DeleteCompare.deleteCompare(mem, deleteBuffer, deleteRowOffset, - deleteRowLen, deleteQualifierOffset, deleteQualifierLen, - deleteTimestampOffset, deleteType, KeyValue.KEY_COMPARATOR)); - - } - - assertEquals(expected.size(), actual.size()); - for(int i=0; i memstore = new TreeSet(KeyValue.COMPARATOR); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col1", 3, "d-c")); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col1", 2, "d-c")); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col1", 1, "d-c")); - memstore.add(KeyValueTestUtil.create("row21", "fam", "col1", 1, "d-c")); - - - //Creating expected result - List expected = new ArrayList(); - expected.add(DeleteCode.SKIP); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DONE); - - KeyValue delete = KeyValueTestUtil.create("row11", "fam", "col1", 2, - KeyValue.Type.DeleteColumn, "dont-care"); - byte [] deleteBuffer = delete.getBuffer(); - int deleteRowOffset = delete.getRowOffset(); - short deleteRowLen = delete.getRowLength(); - int deleteQualifierOffset = delete.getQualifierOffset(); - int deleteQualifierLen = delete.getQualifierLength(); - int deleteTimestampOffset = deleteQualifierOffset + deleteQualifierLen; - byte deleteType = deleteBuffer[deleteTimestampOffset +Bytes.SIZEOF_LONG]; - - List actual = new ArrayList(); - for(KeyValue mem : memstore){ - actual.add(DeleteCompare.deleteCompare(mem, deleteBuffer, deleteRowOffset, - deleteRowLen, deleteQualifierOffset, deleteQualifierLen, - deleteTimestampOffset, deleteType, KeyValue.KEY_COMPARATOR)); - - } - - assertEquals(expected.size(), actual.size()); - for(int i=0; i memstore = new TreeSet(KeyValue.COMPARATOR); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col1", 3, "d-c")); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col1", 2, "d-c")); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col1", 1, "d-c")); - - //Creating expected result - List expected = new ArrayList(); - expected.add(DeleteCode.SKIP); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DONE); - - KeyValue delete = KeyValueTestUtil.create("row11", "fam", "col1", 2, - KeyValue.Type.Delete, "dont-care"); - byte [] deleteBuffer = delete.getBuffer(); - int deleteRowOffset = delete.getRowOffset(); - short deleteRowLen = delete.getRowLength(); - int deleteQualifierOffset = delete.getQualifierOffset(); - int deleteQualifierLen = delete.getQualifierLength(); - int deleteTimestampOffset = deleteQualifierOffset + deleteQualifierLen; - byte deleteType = deleteBuffer[deleteTimestampOffset +Bytes.SIZEOF_LONG]; - - List actual = new ArrayList(); - for(KeyValue mem : memstore){ - actual.add(DeleteCompare.deleteCompare(mem, deleteBuffer, deleteRowOffset, - deleteRowLen, deleteQualifierOffset, deleteQualifierLen, - deleteTimestampOffset, deleteType, KeyValue.KEY_COMPARATOR)); - } - - assertEquals(expected.size(), actual.size()); - for(int i=0; i memstore = new TreeSet(KeyValue.COMPARATOR); - memstore.add(KeyValueTestUtil.create("row11", "fam", "col1", 1, "d-c")); - memstore.add(KeyValueTestUtil.create("row21", "fam", "col1", 4, "d-c")); - memstore.add(KeyValueTestUtil.create("row21", "fam", "col1", 3, "d-c")); - memstore.add(KeyValueTestUtil.create("row21", "fam", "col1", 2, "d-c")); - memstore.add(KeyValueTestUtil.create("row21", "fam", "col1", 1, - KeyValue.Type.Delete, "dont-care")); - memstore.add(KeyValueTestUtil.create("row31", "fam", "col1", 1, "dont-care")); - - //Creating expected result - List expected = new ArrayList(); - expected.add(DeleteCode.SKIP); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DELETE); - expected.add(DeleteCode.DONE); - - KeyValue delete = KeyValueTestUtil.create("row21", "fam", "col1", 5, - KeyValue.Type.DeleteColumn, "dont-care"); - byte [] deleteBuffer = delete.getBuffer(); - int deleteRowOffset = delete.getRowOffset(); - short deleteRowLen = delete.getRowLength(); - int deleteQualifierOffset = delete.getQualifierOffset(); - int deleteQualifierLen = delete.getQualifierLength(); - int deleteTimestampOffset = deleteQualifierOffset + deleteQualifierLen; - byte deleteType = deleteBuffer[deleteTimestampOffset +Bytes.SIZEOF_LONG]; - - List actual = new ArrayList(); - for(KeyValue mem : memstore){ - actual.add(DeleteCompare.deleteCompare(mem, deleteBuffer, deleteRowOffset, - deleteRowLen, deleteQualifierOffset, deleteQualifierLen, - deleteTimestampOffset, deleteType, KeyValue.KEY_COMPARATOR)); - - } - - assertEquals(expected.size(), actual.size()); - for(int i=0; i - res = dt.compareDeletes(del10, del11); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_OLD_NEXT_OLD, res); - res = dt.compareDeletes(del11, del10); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_NEW_NEXT_NEW, res); - - //Testing Delete ts1 and Delete ts2 and <==> - res = dt.compareDeletes(del10, del20); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_NEW_NEXT_NEW, res); - res = dt.compareDeletes(del20, del10); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_OLD_NEXT_OLD, res); - - - - //Testing DeleteColumn and DeleteColumn - res = dt.compareDeletes(delQf10, delQf10); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_OLD_NEXT_BOTH, res); - - //Testing DeleteColumn qf1 and DeleteColumn qf2 and <==> - res = dt.compareDeletes(delQf10, delQf11); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_OLD_NEXT_OLD, res); - res = dt.compareDeletes(delQf11, delQf10); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_NEW_NEXT_NEW, res); - - //Testing DeleteColumn ts1 and DeleteColumn ts2 and <==> - res = dt.compareDeletes(delQf10, delQf20); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_NEW_NEXT_BOTH, res); - res = dt.compareDeletes(delQf20, delQf10); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_OLD_NEXT_BOTH, res); - - - - //Testing Delete and DeleteColumn and <==> - res = dt.compareDeletes(del10, delQf10); - assertEquals(DeleteTracker.DeleteCompare.NEXT_OLD, res); - res = dt.compareDeletes(delQf10, del10); - assertEquals(DeleteTracker.DeleteCompare.NEXT_NEW, res); - - //Testing Delete qf1 and DeleteColumn qf2 and <==> - res = dt.compareDeletes(del10, delQf11); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_OLD_NEXT_OLD, res); - res = dt.compareDeletes(delQf11, del10); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_NEW_NEXT_NEW, res); - - //Testing Delete qf2 and DeleteColumn qf1 and <==> - res = dt.compareDeletes(del11, delQf10); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_NEW_NEXT_NEW, res); - res = dt.compareDeletes(delQf10, del11); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_OLD_NEXT_OLD, res); - - //Testing Delete ts2 and DeleteColumn ts1 and <==> - res = dt.compareDeletes(del20, delQf10); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_OLD_NEXT_OLD, res); - res = dt.compareDeletes(delQf10, del20); - assertEquals(DeleteTracker.DeleteCompare.INCLUDE_NEW_NEXT_NEW, res); - - //Testing Delete ts1 and DeleteColumn ts2 and <==> - res = dt.compareDeletes(del10, delQf20); - assertEquals(DeleteTracker.DeleteCompare.NEXT_OLD, res); - res = dt.compareDeletes(delQf20, del10); - assertEquals(DeleteTracker.DeleteCompare.NEXT_NEW, res); - - } - - public void testUpdate(){ - //Building lists - List dels1 = new ArrayList(); - dels1.add(delQf10); - dels1.add(del21); - - List dels2 = new ArrayList(); - dels2.add(delFam10); - dels2.add(del30); - dels2.add(delQf20); - - List res = new ArrayList(); - res.add(del30); - res.add(delQf20); - res.add(del21); - - //Adding entries - for(Delete del : dels1){ - dt.add(del.buffer, del.qualifierOffset, del.qualifierLength, - del.timestamp, del.type); - } - - //update() - dt.update(); - - //Check deleteList - List delList = dt.deletes; - assertEquals(dels1.size(), delList.size()); - for(int i=0; i dels = new ArrayList(); - dels.add(delQf10); - dels.add(del21); - - //Adding entries - for(Delete del : dels){ - dt.add(del.buffer, del.qualifierOffset, del.qualifierLength, - del.timestamp, del.type); - } - //update() - dt.update(); - assertEquals(false, dt.isDeleted(col2, 0, col2Len, ts3)); - assertEquals(false, dt.isDeleted(col2, 0, col2Len, ts1)); - } - public void testIsDeleted_Delete(){ - //Building lists - List dels = new ArrayList(); - dels.add(del21); - - //Adding entries - for(Delete del : dels){ - dt.add(del.buffer, del.qualifierOffset, del.qualifierLength, - del.timestamp, del.type); - } - - //update() - dt.update(); - - assertEquals(true, dt.isDeleted(col2, 0, col2Len, ts2)); - } - - public void testIsDeleted_DeleteColumn(){ - //Building lists - List dels = new ArrayList(); - dels.add(delQf21); - - //Adding entries - for(Delete del : dels){ - dt.add(del.buffer, del.qualifierOffset, del.qualifierLength, - del.timestamp, del.type); - } - - //update() - dt.update(); - - assertEquals(true, dt.isDeleted(col2, 0, col2Len, ts1)); - } - - public void testIsDeleted_DeleteFamily(){ - //Building lists - List dels = new ArrayList(); - dels.add(delFam20); - - //Adding entries - for(Delete del : dels){ - dt.add(del.buffer, del.qualifierOffset, del.qualifierLength, - del.timestamp, del.type); - } - - //update() - dt.update(); - - assertEquals(true, dt.isDeleted(col2, 0, col2Len, ts1)); - } - - // HBASE-1951 - public void testStackOverflow() { - List dels = new ArrayList(); - Delete adel = new Delete(col1, 0, col1Len, del, 0L); - for(long i = 0; i < 9000; i++) { - dt.add(adel.buffer, adel.qualifierOffset, adel.qualifierLength, - i, adel.type); - } - - - //update() - dt.update(); - assertEquals(false, dt.isDeleted(col2, 0, col2Len, 7000000)); - } - -} diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java index b0d8f00..d23a41b 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -257,6 +257,10 @@ public class TestHRegion extends HBaseTestCase { true); assertTrue(res); + //Create a new put to avoid memstore id inconsistency + put = new Put(row1); + put.add(fam1, qf1, val1); + // not empty anymore res = region.checkAndPut(row1, fam1, qf1, emptyVal, put, lockId, true); assertFalse(res); @@ -307,7 +311,11 @@ public class TestHRegion extends HBaseTestCase { Put put = new Put(row1); put.add(fam1, qf1, val1); region.put(put); - + + //Create a new put to avoid memstore id inconsistency + put = new Put(row1); + put.add(fam1, qf1, val1); + //checkAndPut with correct value boolean res = region.checkAndPut(row1, fam1, qf1, val1, put, lockId, true); assertEquals(true, res); @@ -385,7 +393,7 @@ public class TestHRegion extends HBaseTestCase { //testing existing family byte [] family = fam2; try { - region.delete(family, kvs, true); + region.delete(family, kvs, true, null); } catch (Exception e) { assertTrue("Family " +new String(family)+ " does not exist", false); } @@ -394,7 +402,7 @@ public class TestHRegion extends HBaseTestCase { boolean ok = false; family = fam4; try { - region.delete(family, kvs, true); + region.delete(family, kvs, true, null); } catch (Exception e) { ok = true; } @@ -595,7 +603,7 @@ public class TestHRegion extends HBaseTestCase { kvs.add(new KeyValue(row1, fam1, col2, null)); kvs.add(new KeyValue(row1, fam1, col3, null)); - region.delete(fam1, kvs, true); + region.delete(fam1, kvs, true, null); // extract the key values out the memstore: // This is kinda hacky, but better than nothing... @@ -1909,7 +1917,7 @@ public class TestHRegion extends HBaseTestCase { int numRows = 1; int numFamilies = 10; int numQualifiers = 100; - int flushInterval = 7; + int flushInterval = 11; int compactInterval = 5 * flushInterval; byte[][] families = new byte[numFamilies][]; for (int i = 0; i < numFamilies; i++) { @@ -1934,6 +1942,11 @@ public class TestHRegion extends HBaseTestCase { int expectedCount = numFamilies * numQualifiers; List res = new ArrayList(); + // wait for some records to go in the store already + while(putThread.getCurrentIteration() == 0){ + Thread.sleep(10); + } + long prevTimestamp = 0L; for (int i = 0; i < testCount; i++) { @@ -1946,16 +1959,13 @@ public class TestHRegion extends HBaseTestCase { flushThread.flush(); } - boolean previousEmpty = res.isEmpty(); res.clear(); InternalScanner scanner = region.getScanner(scan); while (scanner.next(res)) ; - if (!res.isEmpty() || !previousEmpty || i > compactInterval) { - Assert.assertEquals("i=" + i, expectedCount, res.size()); - long timestamp = res.get(0).getTimestamp(); - Assert.assertTrue(timestamp >= prevTimestamp); - prevTimestamp = timestamp; - } + Assert.assertEquals("i=" + i, expectedCount, res.size()); + long timestamp = res.get(0).getTimestamp(); + Assert.assertTrue(timestamp >= prevTimestamp); + prevTimestamp = timestamp; } putThread.done(); @@ -1973,12 +1983,15 @@ public class TestHRegion extends HBaseTestCase { private int numRows; private byte[][] families; private byte[][] qualifiers; + private volatile int currentIteration; private PutThread(int numRows, byte[][] families, byte[][] qualifiers) { this.numRows = numRows; this.families = families; this.qualifiers = qualifiers; + this.currentIteration = 0; + } public void done() { @@ -1994,10 +2007,13 @@ public class TestHRegion extends HBaseTestCase { } } + public int getCurrentIteration() { + return currentIteration; + } + @Override public void run() { done = false; - int val = 0; while (!done) { try { for (int r = 0; r < numRows; r++) { @@ -2005,18 +2021,161 @@ public class TestHRegion extends HBaseTestCase { Put put = new Put(row); for (int f = 0; f < families.length; f++) { for (int q = 0; q < qualifiers.length; q++) { - put.add(families[f], qualifiers[q], (long) val, - Bytes.toBytes(val)); + put.add(families[f], qualifiers[q], (long) currentIteration, + Bytes.toBytes(currentIteration)); } } region.put(put); - if (val > 0 && val % 47 == 0){ - //System.out.println("put iteration = " + val); - Delete delete = new Delete(row, (long)val-30, null); + if (currentIteration > 0 && currentIteration % 47 == 0) { + //System.out.println("put iteration = " + currentIteration); + Delete delete = new Delete(row, (long) currentIteration - 30, null); region.delete(delete, null, true); } - val++; + currentIteration++; + } + } catch (IOException e) { + LOG.error("error while putting records", e); + error = e; + break; + } + } + + } + + } + + public void testDeletesWhileScanning() + throws IOException, InterruptedException { + String method = "testDeletesWhileScanning"; + byte[] tableName = Bytes.toBytes("testDeletesWhileScanning"); + int numRows = 100; + int testCount = numRows / 10; + int numFamilies = 10; + int numQualifiers = 100; + int flushInterval = 13; + int compactInterval = 5 * flushInterval; + byte[][] families = new byte[numFamilies][]; + for (int i = 0; i < numFamilies; i++) { + families[i] = Bytes.toBytes("family" + i); + } + byte[][] qualifiers = new byte[numQualifiers][]; + for (int i = 0; i < numQualifiers; i++) { + qualifiers[i] = Bytes.toBytes("qual" + i); + } + + initHRegion(tableName, method, families); + DeleteThread deleteThread = new DeleteThread(); + deleteThread.start(); + FlushThread flushThread = new FlushThread(); + flushThread.start(); + + Scan scan = new Scan(); + List list = new ArrayList(); + for (byte[] family : families) { + for (byte[] qual : qualifiers) { + list.add(new SingleColumnValueFilter(family, qual, + CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(5)))); + } + } + FilterList filterList = new FilterList(list); + scan.setFilter(filterList); + + int expectedCount = numFamilies * numQualifiers; + List res = new ArrayList(); + + for (int val = 0; val < numRows; val++) { + byte[] row = Bytes.toBytes("row" + val); + Put put = new Put(row); + for (int f = 0; f < families.length; f++) { + for (int q = 0; q < qualifiers.length; q++) { + put.add(families[f], qualifiers[q], (long) val, + Bytes.toBytes(val % 10)); + } + } + region.put(put); + + if (val != 0 && val % compactInterval == 0) { + region.compactStores(true); + } + + if (val != 0 && val % flushInterval == 0) { + //System.out.println("put iteration = " + val); + flushThread.flush(); + } + + } + + + int expectedRows = numRows / 10; + for (int i = 0; i < testCount; i++) { + + if (i != 0 && i % compactInterval == 0) { + region.compactStores(true); + } + + if (i != 0 && i % flushInterval == 0) { + //System.out.println("scan iteration = " + i); + flushThread.flush(); + } + + res.clear(); + InternalScanner scanner = region.getScanner(scan); + deleteThread.delete(); + int rowCount = 0; + while (scanner.next(res)) { + Assert.assertEquals(expectedCount, res.size()); + res.clear(); + rowCount++; + } + Assert.assertEquals(expectedRows,rowCount); + expectedRows--; + } + + deleteThread.done(); + deleteThread.join(); + deleteThread.checkNoError(); + + flushThread.done(); + flushThread.join(); + flushThread.checkNoError(); + } + + protected class DeleteThread extends Thread { + private volatile boolean done; + private Throwable error = null; + + public void done() { + done = true; + synchronized (this) { + interrupt(); + } + } + + public void checkNoError() { + if (error != null) { + Assert.assertNull(error); + } + } + + @Override + public void run() { + done = false; + int row = 5; + while (!done) { + synchronized (this) { + try { + wait(); + } catch (InterruptedException ignored) { + if (done) { + break; + } } + } + + try { + Delete delete = new Delete(Bytes.toBytes("row"+row)); + region.delete(delete, null, false); + row += 10; } catch (IOException e) { LOG.error("error while putting records", e); error = e; @@ -2026,6 +2185,12 @@ public class TestHRegion extends HBaseTestCase { } + public void delete() { + synchronized (this) { + notify(); + } + } + } @@ -2100,12 +2265,12 @@ public class TestHRegion extends HBaseTestCase { } - public void testIndexesScanWithOneDeletedRow() throws IOException { - byte[] tableName = Bytes.toBytes("testIndexesScanWithOneDeletedRow"); + public void testScanWithOneDeletedRow() throws IOException { + String method = "testScanWithOneDeletedRow"; + byte[] tableName = Bytes.toBytes(method); byte[] family = Bytes.toBytes("family"); //Setting up region - String method = "testIndexesScanWithOneDeletedRow"; initHRegion(tableName, method, new HBaseConfiguration(), family); Put put = new Put(Bytes.toBytes(1L)); @@ -2122,9 +2287,9 @@ public class TestHRegion extends HBaseTestCase { put.add(family, qual1, 2L, Bytes.toBytes(2L)); region.put(put); - Scan idxScan = new Scan(); - idxScan.addFamily(family); - idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, + Scan scan = new Scan(); + scan.addFamily(family); + scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.asList(new SingleColumnValueFilter(family, qual1, CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(0L))), @@ -2132,7 +2297,7 @@ public class TestHRegion extends HBaseTestCase { CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))) ))); - InternalScanner scanner = region.getScanner(idxScan); + InternalScanner scanner = region.getScanner(scan); List res = new ArrayList(); //long start = System.nanoTime(); diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 1e602e7..57f2f12 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -46,6 +46,14 @@ public class TestMemStore extends TestCase { private static final byte [] FAMILY = Bytes.toBytes("column"); private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic"); private static final String CONTENTSTR = "contentstr"; + private static final RegionUpdateTracker.UpdateIdValidator DUMMY_VALIDATOR = + new RegionUpdateTracker.UpdateIdValidator(){ + + @Override + public boolean isValid(int updateId) { + return true; + } + }; @Override public void setUp() throws Exception { @@ -72,7 +80,7 @@ public class TestMemStore extends TestCase { */ public void testScanAcrossSnapshot() throws IOException { int rowCount = addRows(this.memstore); - KeyValueScanner [] memstorescanners = this.memstore.getScanners(); + KeyValueScanner [] memstorescanners = this.memstore.getScanners(DUMMY_VALIDATOR); Scan scan = new Scan(); List result = new ArrayList(); StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, @@ -93,7 +101,7 @@ public class TestMemStore extends TestCase { for (int i = 0; i < memstorescanners.length; i++) { memstorescanners[0].close(); } - memstorescanners = this.memstore.getScanners(); + memstorescanners = this.memstore.getScanners(DUMMY_VALIDATOR); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, this.memstore.comparator, null, memstorescanners); @@ -119,7 +127,7 @@ public class TestMemStore extends TestCase { for (int i = 0; i < memstorescanners.length; i++) { memstorescanners[0].close(); } - memstorescanners = this.memstore.getScanners(); + memstorescanners = this.memstore.getScanners(DUMMY_VALIDATOR); // Assert that new values are seen in kvset as we scan. long ts = System.currentTimeMillis(); s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, @@ -149,6 +157,48 @@ public class TestMemStore extends TestCase { assertEquals(rowCount, count); } + /** + * A simple test which verifies the 3 possible states when scanning accross snapshot. + */ + public void testScanAcrossSnapshot2() { + // we are going to the scanning across snapshot with two kvs + // kv1 should always be returned before kv2 + final byte[] one = Bytes.toBytes(1); + final byte[] two = Bytes.toBytes(2); + final byte[] f = Bytes.toBytes("f"); + final byte[] q = Bytes.toBytes("q"); + final byte[] v = Bytes.toBytes(3); + + final KeyValue kv1 = new KeyValue(one, f, q, v); + final KeyValue kv2 = new KeyValue(two, f, q, v); + + // use case 1: both kvs in kvset + this.memstore.add(kv1.clone()); + this.memstore.add(kv2.clone()); + verifyScanAcrossSnapshot2(kv1, kv2); + + // use case 2: both kvs in snapshot + this.memstore.snapshot(); + verifyScanAcrossSnapshot2(kv1, kv2); + + // use case 3: first in snapshot second in kvset + this.memstore = new MemStore(); + this.memstore.add(kv1.clone()); + this.memstore.snapshot(); + this.memstore.add(kv2.clone()); + verifyScanAcrossSnapshot2(kv1, kv2); + } + + private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) { + KeyValueScanner[] memstorescanners = this.memstore.getScanners(DUMMY_VALIDATOR); + assertEquals(1, memstorescanners.length); + final KeyValueScanner scanner = memstorescanners[0]; + scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW)); + assertEquals(kv1, scanner.next()); + assertEquals(kv2, scanner.next()); + assertNull(scanner.next()); + } + /** * Test memstore snapshots * @throws IOException @@ -220,81 +270,26 @@ public class TestMemStore extends TestCase { // Get tests ////////////////////////////////////////////////////////////////////////////// - /** Test getNextRow from memstore - * @throws InterruptedException - */ - public void testGetNextRow() throws Exception { - addRows(this.memstore); - // Add more versions to make it a little more interesting. - Thread.sleep(1); - addRows(this.memstore); - KeyValue closestToEmpty = this.memstore.getNextRow(KeyValue.LOWESTKEY); - assertTrue(KeyValue.COMPARATOR.compareRows(closestToEmpty, - new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0); - for (int i = 0; i < ROW_COUNT; i++) { - KeyValue nr = this.memstore.getNextRow(new KeyValue(Bytes.toBytes(i), - System.currentTimeMillis())); - if (i + 1 == ROW_COUNT) { - assertEquals(nr, null); - } else { - assertTrue(KeyValue.COMPARATOR.compareRows(nr, - new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0); - } - } - //starting from each row, validate results should contain the starting row - for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { - InternalScanner scanner = - new StoreScanner(new Scan(Bytes.toBytes(startRowId)), FAMILY, - Integer.MAX_VALUE, this.memstore.comparator, null, - new KeyValueScanner[]{memstore.getScanners()[0]}); - List results = new ArrayList(); - for (int i = 0; scanner.next(results); i++) { - int rowId = startRowId + i; - assertTrue("Row name", - KeyValue.COMPARATOR.compareRows(results.get(0), - Bytes.toBytes(rowId)) == 0); - assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); - List row = new ArrayList(); - for (KeyValue kv : results) { - row.add(kv); - } - isExpectedRowWithoutTimestamps(rowId, row); - // Clear out set. Otherwise row results accumulate. - results.clear(); - } - } - } - public void testGet_Basic_Found() throws IOException { - byte [] row = Bytes.toBytes("testrow"); - byte [] fam = Bytes.toBytes("testfamily"); - byte [] qf1 = Bytes.toBytes("testqualifier1"); - byte [] qf2 = Bytes.toBytes("testqualifier2"); - byte [] qf3 = Bytes.toBytes("testqualifier3"); - byte [] val = Bytes.toBytes("testval"); - + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] val = Bytes.toBytes("testval"); + //Setting up memstore - KeyValue add1 = new KeyValue(row, fam ,qf1, val); - KeyValue add2 = new KeyValue(row, fam ,qf2, val); - KeyValue add3 = new KeyValue(row, fam ,qf3, val); + KeyValue add1 = new KeyValue(row, fam, qf1, val); + KeyValue add2 = new KeyValue(row, fam, qf2, val); + KeyValue add3 = new KeyValue(row, fam, qf3, val); memstore.add(add1); memstore.add(add2); memstore.add(add3); - - //test - Get get = new Get(row); - NavigableSet columns = new TreeSet(Bytes.BYTES_COMPARATOR); - columns.add(qf2); - long ttl = Long.MAX_VALUE; - QueryMatcher matcher = - new QueryMatcher(get, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1); - - List result = new ArrayList(); - boolean res = memstore.get(matcher, result); - assertEquals(true, res); + //test + assertEquals(1, get(row, fam, -1L, qf2).size()); } - + public void testGet_Basic_NotFound() throws IOException { byte [] row = Bytes.toBytes("testrow"); byte [] fam = Bytes.toBytes("testfamily"); @@ -310,17 +305,7 @@ public class TestMemStore extends TestCase { memstore.add(add3); //test - Get get = new Get(row); - NavigableSet columns = new TreeSet(Bytes.BYTES_COMPARATOR); - columns.add(qf2); - long ttl = Long.MAX_VALUE; - - QueryMatcher matcher = - new QueryMatcher(get, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1); - - List result = new ArrayList(); - boolean res = memstore.get(matcher, result); - assertEquals(false, res); + assertEquals(0, get(row, fam, -1L, qf2).size()); } public void testGet_memstoreAndSnapShot() throws IOException { @@ -332,17 +317,7 @@ public class TestMemStore extends TestCase { byte [] qf4 = Bytes.toBytes("testqualifier4"); byte [] qf5 = Bytes.toBytes("testqualifier5"); byte [] val = Bytes.toBytes("testval"); - - //Creating get - Get get = new Get(row); - NavigableSet columns = new TreeSet(Bytes.BYTES_COMPARATOR); - columns.add(qf2); - columns.add(qf4); - long ttl = Long.MAX_VALUE; - QueryMatcher matcher = - new QueryMatcher(get, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1); - //Setting up memstore memstore.add(new KeyValue(row, fam ,qf1, val)); memstore.add(new KeyValue(row, fam ,qf2, val)); @@ -356,9 +331,7 @@ public class TestMemStore extends TestCase { memstore.add(new KeyValue(row, fam ,qf5, val)); assertEquals(2, memstore.kvset.size()); - List result = new ArrayList(); - boolean res = memstore.get(matcher, result); - assertEquals(true, res); + assertEquals(2, get(row, fam, -1L, qf2, qf4).size()); } public void testGet_SpecificTimeStamp() throws IOException { @@ -372,19 +345,7 @@ public class TestMemStore extends TestCase { long ts1 = System.currentTimeMillis(); long ts2 = ts1++; long ts3 = ts2++; - - //Creating get - Get get = new Get(row); - get.setTimeStamp(ts2); - NavigableSet columns = new TreeSet(Bytes.BYTES_COMPARATOR); - columns.add(qf1); - columns.add(qf2); - columns.add(qf3); - long ttl = Long.MAX_VALUE; - QueryMatcher matcher = new QueryMatcher(get, fam, columns, ttl, - KeyValue.KEY_COMPARATOR, 1); - //Setting up expected List expected = new ArrayList(); KeyValue kv1 = new KeyValue(row, fam ,qf1, ts2, val); @@ -406,9 +367,8 @@ public class TestMemStore extends TestCase { memstore.add(new KeyValue(row, fam ,qf3, ts3, val)); //Get - List result = new ArrayList(); - memstore.get(matcher, result); - + List result = get(row, fam, ts2, qf1, qf2, qf3); + assertEquals(expected.size(), result.size()); for(int i=0; i expected = new ArrayList(); - expected.add(put3); - expected.add(del2); - expected.add(put1); - - assertEquals(3, memstore.kvset.size()); - int i = 0; - for(KeyValue kv : memstore.kvset) { - assertEquals(expected.get(i++), kv); - } + verifyKvset(put3, del2, put2, put1); + verifyScan(fam, put3, put1); } - + public void testGetWithDeleteColumn() throws IOException { byte [] row = Bytes.toBytes("testrow"); byte [] fam = Bytes.toBytes("testfamily"); @@ -473,15 +425,8 @@ public class TestMemStore extends TestCase { new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val); memstore.delete(del2); - List expected = new ArrayList(); - expected.add(put3); - expected.add(del2); - - assertEquals(2, memstore.kvset.size()); - int i = 0; - for (KeyValue kv: memstore.kvset) { - assertEquals(expected.get(i++), kv); - } + verifyKvset(put3, del2, put2, put1); + verifyScan(fam, put3); } @@ -508,15 +453,8 @@ public class TestMemStore extends TestCase { new KeyValue(row, fam, null, ts, KeyValue.Type.DeleteFamily, val); memstore.delete(del); - List expected = new ArrayList(); - expected.add(del); - expected.add(put4); - - assertEquals(2, memstore.kvset.size()); - int i = 0; - for (KeyValue kv: memstore.kvset) { - assertEquals(expected.get(i++), kv); - } + verifyKvset(del, put1, put2, put4, put3); + verifyScan(fam, put4); } public void testKeepDeleteInmemstore() { @@ -528,7 +466,7 @@ public class TestMemStore extends TestCase { memstore.add(new KeyValue(row, fam, qf, ts, val)); KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } @@ -541,7 +479,7 @@ public class TestMemStore extends TestCase { "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } public void testRetainsDeleteColumn() throws IOException { @@ -553,7 +491,7 @@ public class TestMemStore extends TestCase { KeyValue.Type.DeleteColumn, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } public void testRetainsDeleteFamily() throws IOException { @@ -565,7 +503,7 @@ public class TestMemStore extends TestCase { KeyValue.Type.DeleteFamily, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } @@ -643,4 +581,43 @@ public class TestMemStore extends TestCase { return new KeyValue(row, Bytes.toBytes("test_col:"), HConstants.LATEST_TIMESTAMP, value); } + + private void verifyKvset(KeyValue... expected) { + assertEquals(expected.length, memstore.kvset.size()); + int i = 0; + for (KeyValue kv : memstore.kvset) { + assertEquals(expected[i++], kv); + } + } + + private void verifyScan(byte[] fam, KeyValue... expected) throws IOException { + StoreScanner scanner = new StoreScanner(new Scan().setMaxVersions(), fam, + -1, memstore.comparator, null, memstore.getScanners(null)); + List result = new ArrayList(); + while (scanner.next(result)) ; + assertEquals(expected.length, result.size()); + int i = 0; + for (KeyValue kv : result) { + assertEquals(expected[i++], kv); + } + } + + private List get(byte[] row, byte[] fam, long ts, byte[]... quals) + throws IOException { + Scan scan = new Scan(row, row); + NavigableSet columns = new TreeSet(Bytes.BYTES_COMPARATOR); + for (byte[] qual : quals) { + columns.add(qual); + } + if (ts > 0) { + scan.setTimeStamp(ts); + } + + List result = new ArrayList(); + StoreScanner scanner = new StoreScanner(scan, fam, + Long.MAX_VALUE, memstore.comparator, columns, memstore.getScanners(null)); + while (scanner.next(result)) ; + return result; + } + } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/src/test/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java deleted file mode 100644 index 010d135..0000000 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.HBaseTestCase; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueTestUtil; -import org.apache.hadoop.hbase.KeyValue.KeyComparator; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode; -import org.apache.hadoop.hbase.util.Bytes; - - -public class TestQueryMatcher extends HBaseTestCase -implements HConstants { - private final boolean PRINT = false; - - private byte [] row1; - private byte [] row2; - private byte [] fam1; - private byte [] fam2; - private byte [] col1; - private byte [] col2; - private byte [] col3; - private byte [] col4; - private byte [] col5; - private byte [] col6; - - private byte [] data; - - private Get get; - - long ttl = Long.MAX_VALUE; - KeyComparator rowComparator; - - public void setUp(){ - row1 = Bytes.toBytes("row1"); - row2 = Bytes.toBytes("row2"); - fam1 = Bytes.toBytes("fam1"); - fam2 = Bytes.toBytes("fam2"); - col1 = Bytes.toBytes("col1"); - col2 = Bytes.toBytes("col2"); - col3 = Bytes.toBytes("col3"); - col4 = Bytes.toBytes("col4"); - col5 = Bytes.toBytes("col5"); - col6 = Bytes.toBytes("col6"); - - data = Bytes.toBytes("data"); - - //Create Get - get = new Get(row1); - get.addFamily(fam1); - get.addColumn(fam2, col2); - get.addColumn(fam2, col4); - get.addColumn(fam2, col5); - - rowComparator = KeyValue.KEY_COMPARATOR; - - } - - public void testMatch_ExplicitColumns() - throws IOException { - //Moving up from the Tracker by using Gets and List instead - //of just byte [] - - //Expected result - List expected = new ArrayList(); - expected.add(MatchCode.SKIP); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.SKIP); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.DONE); - - QueryMatcher qm = new QueryMatcher(get, fam2, - get.getFamilyMap().get(fam2), ttl, rowComparator, 1); - - List memstore = new ArrayList(); - memstore.add(new KeyValue(row1, fam2, col1, data)); - memstore.add(new KeyValue(row1, fam2, col2, data)); - memstore.add(new KeyValue(row1, fam2, col3, data)); - memstore.add(new KeyValue(row1, fam2, col4, data)); - memstore.add(new KeyValue(row1, fam2, col5, data)); - - memstore.add(new KeyValue(row2, fam1, col1, data)); - - List actual = new ArrayList(); - - for(KeyValue kv : memstore){ - actual.add(qm.match(kv)); - } - - assertEquals(expected.size(), actual.size()); - for(int i=0; i< expected.size(); i++){ - assertEquals(expected.get(i), actual.get(i)); - if(PRINT){ - System.out.println("expected "+expected.get(i)+ - ", actual " +actual.get(i)); - } - } - } - - - public void testMatch_Wildcard() - throws IOException { - //Moving up from the Tracker by using Gets and List instead - //of just byte [] - - //Expected result - List expected = new ArrayList(); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.NEXT); - - QueryMatcher qm = new QueryMatcher(get, fam2, null, ttl, rowComparator, 1); - - List memstore = new ArrayList(); - memstore.add(new KeyValue(row1, fam2, col1, data)); - memstore.add(new KeyValue(row1, fam2, col2, data)); - memstore.add(new KeyValue(row1, fam2, col3, data)); - memstore.add(new KeyValue(row1, fam2, col4, data)); - memstore.add(new KeyValue(row1, fam2, col5, data)); - memstore.add(new KeyValue(row2, fam1, col1, data)); - - List actual = new ArrayList(); - - for(KeyValue kv : memstore){ - actual.add(qm.match(kv)); - } - - assertEquals(expected.size(), actual.size()); - for(int i=0; i< expected.size(); i++){ - assertEquals(expected.get(i), actual.get(i)); - if(PRINT){ - System.out.println("expected "+expected.get(i)+ - ", actual " +actual.get(i)); - } - } - } - - - /** - * Verify that {@link QueryMatcher} only skips expired KeyValue - * instances and does not exit early from the row (skipping - * later non-expired KeyValues). This version mimics a Get with - * explicitly specified column qualifiers. - * - * @throws IOException - */ - public void testMatch_ExpiredExplicit() - throws IOException { - - long testTTL = 1000; - MatchCode [] expected = new MatchCode[] { - MatchCode.SKIP, - MatchCode.INCLUDE, - MatchCode.SKIP, - MatchCode.INCLUDE, - MatchCode.SKIP, - MatchCode.NEXT - }; - - QueryMatcher qm = new QueryMatcher(get, fam2, - get.getFamilyMap().get(fam2), testTTL, rowComparator, 1); - - long now = System.currentTimeMillis(); - KeyValue [] kvs = new KeyValue[] { - new KeyValue(row1, fam2, col1, now-100, data), - new KeyValue(row1, fam2, col2, now-50, data), - new KeyValue(row1, fam2, col3, now-5000, data), - new KeyValue(row1, fam2, col4, now-500, data), - new KeyValue(row1, fam2, col5, now-10000, data), - new KeyValue(row2, fam1, col1, now-10, data) - }; - - List actual = new ArrayList(kvs.length); - for (KeyValue kv : kvs) { - actual.add( qm.match(kv) ); - } - - assertEquals(expected.length, actual.size()); - for (int i=0; i actual = new ArrayList(kvs.length); - for (KeyValue kv : kvs) { - actual.add( qm.match(kv) ); - } - - assertEquals(expected.length, actual.size()); - for (int i=0; i> results = new ArrayList>(threadCount); + + for (int i = 0; i < threadCount; i++) { + results.add(service.submit(new Callable() { + + @Override + public Integer call() throws Exception { + int runCount = 0; + while (System.currentTimeMillis() < endTime) { + int id = tracker.nextUpdateId(); + tracker.commit(id); + runCount++; + } + return runCount; + } + })); + } + + service.shutdown(); + int sum = 0; + for (Future result : results) { + int count = result.get(); + assertTrue(count > 0); + sum += count; + } + //System.out.println("sum=" + sum); + assertEquals(RegionUpdateTracker.DEFAULT_BASE + sum, tracker.nextUpdateId()); + } + + + public void testUpdateIdValidator() { + RegionUpdateTracker tracker = new RegionUpdateTracker(); + + int id1 = tracker.nextUpdateId(); + int id2 = tracker.nextUpdateId(); + int id3 = tracker.nextUpdateId(); + + RegionUpdateTracker.UpdateIdValidator validator1 = tracker.getValidator(); + assertFalse(validator1.isValid(id1)); + assertFalse(validator1.isValid(id2)); + assertFalse(validator1.isValid(id3)); + + tracker.commit(id3); + RegionUpdateTracker.UpdateIdValidator validator2 = tracker.getValidator(); + assertFalse(validator1.isValid(id1)); + assertFalse(validator1.isValid(id2)); + assertFalse(validator1.isValid(id3)); + assertFalse(validator2.isValid(id1)); + assertFalse(validator2.isValid(id2)); + assertTrue(validator2.isValid(id3)); + + tracker.commit(id2); + RegionUpdateTracker.UpdateIdValidator validator3 = tracker.getValidator(); + assertFalse(validator1.isValid(id1)); + assertFalse(validator1.isValid(id2)); + assertFalse(validator1.isValid(id3)); + assertFalse(validator2.isValid(id1)); + assertFalse(validator2.isValid(id2)); + assertTrue(validator2.isValid(id3)); + assertFalse(validator3.isValid(id1)); + assertTrue(validator3.isValid(id2)); + assertTrue(validator3.isValid(id3)); + + + } +} diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java b/src/test/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java index ed23d64..4656c08 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java @@ -5,7 +5,7 @@ import java.util.List; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode; +import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; public class TestScanWildcardColumnTracker extends HBaseTestCase { diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java index 9d1ae00..db9f58b 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -9,6 +9,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.Progressable; @@ -89,7 +90,14 @@ public class TestStore extends TestCase { reporter); } - + + void get(byte[] row, NavigableSet qualifiers, List result) throws IOException { + Scan scan = new Scan(row, row); + InternalScanner scanner = (InternalScanner) store.getScanner(scan, qualifiers, null); + assertTrue(scanner.next(result)); + assertFalse(scanner.next(new ArrayList())); + } + ////////////////////////////////////////////////////////////////////////////// // Get tests ////////////////////////////////////////////////////////////////////////////// @@ -121,7 +129,7 @@ public class TestStore extends TestCase { this.store.getFamily(), fs, null, c, null); System.out.println(this.store.getHRegionInfo().getEncodedName()); assertEquals(2, this.store.getStorefilesCount()); - this.store.get(get, qualifiers, result); + get(row, qualifiers, result); assertEquals(1, result.size()); } @@ -141,7 +149,7 @@ public class TestStore extends TestCase { this.store.add(new KeyValue(row, family, qf6, null)); //Get - this.store.get(get, qualifiers, result); + get(row, qualifiers, result); //Compare assertCheck(); @@ -173,7 +181,7 @@ public class TestStore extends TestCase { flush(3); //Get - this.store.get(get, qualifiers, result); + get(row, qualifiers, result); //Need to sort the result since multiple files Collections.sort(result, KeyValue.COMPARATOR); @@ -206,7 +214,7 @@ public class TestStore extends TestCase { this.store.add(new KeyValue(row, family, qf6, null)); //Get - this.store.get(get, qualifiers, result); + get(row, qualifiers, result); //Need to sort the result since multiple files Collections.sort(result, KeyValue.COMPARATOR); @@ -254,7 +262,7 @@ public class TestStore extends TestCase { Bytes.toBytes(oldValue))); // update during the snapshot. - long ret = this.store.updateColumnValue(row, family, qf1, newValue); + long ret = this.store.updateColumnValue(row, family, qf1, newValue, 0); // memstore should have grown by some amount. assertTrue(ret > 0); @@ -266,15 +274,17 @@ public class TestStore extends TestCase { assertEquals(2, this.store.memstore.kvset.size()); // how many key/values for this row are there? - Get get = new Get(row); - get.addColumn(family, qf1); - get.setMaxVersions(); // all versions. + Scan scan = new Scan(row, row); + scan.addColumn(family, qf1); + scan.setMaxVersions(); // all versions. List results = new ArrayList(); NavigableSet cols = new TreeSet(); cols.add(qf1); - this.store.get(get, cols, results); + InternalScanner internalScanner = (InternalScanner) store.getScanner(scan, cols, null); + assertTrue(internalScanner.next(results)); + assertFalse(internalScanner.next(results)); assertEquals(2, results.size()); long ts1 = results.get(0).getTimestamp(); diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestWildcardColumnTracker.java b/src/test/org/apache/hadoop/hbase/regionserver/TestWildcardColumnTracker.java deleted file mode 100644 index 2b04eeb..0000000 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestWildcardColumnTracker.java +++ /dev/null @@ -1,353 +0,0 @@ -package org.apache.hadoop.hbase.regionserver; - -import java.util.ArrayList; -import java.util.List; -import java.util.TreeSet; - -import org.apache.hadoop.hbase.HBaseTestCase; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode; -import org.apache.hadoop.hbase.util.Bytes; - -public class TestWildcardColumnTracker extends HBaseTestCase -implements HConstants { - private boolean PRINT = false; - - public void testGet_SingleVersion() { - if(PRINT) { - System.out.println("SingleVersion"); - } - byte [] col1 = Bytes.toBytes("col1"); - byte [] col2 = Bytes.toBytes("col2"); - byte [] col3 = Bytes.toBytes("col3"); - byte [] col4 = Bytes.toBytes("col4"); - byte [] col5 = Bytes.toBytes("col5"); - - //Create tracker - List expected = new ArrayList(); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.INCLUDE); - expected.add(MatchCode.INCLUDE); - int maxVersions = 1; - - ColumnTracker exp = new WildcardColumnTracker(maxVersions); - - //Create "Scanner" - List scanner = new ArrayList(); - scanner.add(col1); - scanner.add(col2); - scanner.add(col3); - scanner.add(col4); - scanner.add(col5); - - //Initialize result - List result = new ArrayList(); - - //"Match" - for(byte [] col : scanner){ - result.add(exp.checkColumn(col, 0, col.length)); - } - - assertEquals(expected.size(), result.size()); - for(int i=0; i< expected.size(); i++){ - assertEquals(expected.get(i), result.get(i)); - if(PRINT){ - System.out.println("Expected " +expected.get(i) + ", actual " + - result.get(i)); - } - } - } - - - public void testGet_MultiVersion() { - if(PRINT) { - System.out.println("\nMultiVersion"); - } - byte [] col1 = Bytes.toBytes("col1"); - byte [] col2 = Bytes.toBytes("col2"); - byte [] col3 = Bytes.toBytes("col3"); - byte [] col4 = Bytes.toBytes("col4"); - byte [] col5 = Bytes.toBytes("col5"); - - //Create tracker - List expected = new ArrayList(); - int size = 5; - for(int i=0; i scanner = new ArrayList(); - scanner.add(col1); - scanner.add(col1); - scanner.add(col1); - scanner.add(col2); - scanner.add(col2); - scanner.add(col2); - scanner.add(col3); - scanner.add(col3); - scanner.add(col3); - scanner.add(col4); - scanner.add(col4); - scanner.add(col4); - scanner.add(col5); - scanner.add(col5); - scanner.add(col5); - - //Initialize result - List result = new ArrayList(); - - //"Match" - for(byte [] col : scanner){ - result.add(exp.checkColumn(col, 0, col.length)); - } - - assertEquals(expected.size(), result.size()); - for(int i=0; i< expected.size(); i++){ - assertEquals(expected.get(i), result.get(i)); - if(PRINT){ - System.out.println("Expected " +expected.get(i) + ", actual " + - result.get(i)); - } - } - } - - public void testUpdate_SameColumns(){ - if(PRINT) { - System.out.println("\nUpdate_SameColumns"); - } - byte [] col1 = Bytes.toBytes("col1"); - byte [] col2 = Bytes.toBytes("col2"); - byte [] col3 = Bytes.toBytes("col3"); - byte [] col4 = Bytes.toBytes("col4"); - byte [] col5 = Bytes.toBytes("col5"); - - //Create tracker - List expected = new ArrayList(); - int size = 10; - for(int i=0; i scanner = new ArrayList(); - scanner.add(col1); - scanner.add(col2); - scanner.add(col3); - scanner.add(col4); - scanner.add(col5); - - //Initialize result - List result = new ArrayList(); - - //"Match" - for(int i=0; i<3; i++){ - for(byte [] col : scanner){ - result.add(wild.checkColumn(col, 0, col.length)); - } - wild.update(); - } - - assertEquals(expected.size(), result.size()); - for(int i=0; i expected = new ArrayList(); - int size = 10; - for(int i=0; i scanner = new ArrayList(); - scanner.add(col0); - scanner.add(col1); - scanner.add(col2); - scanner.add(col3); - scanner.add(col4); - - //Initialize result - List result = new ArrayList(); - - for(byte [] col : scanner){ - result.add(wild.checkColumn(col, 0, col.length)); - } - wild.update(); - - //Create "Scanner1" - List scanner1 = new ArrayList(); - scanner1.add(col5); - scanner1.add(col6); - scanner1.add(col7); - scanner1.add(col8); - scanner1.add(col9); - for(byte [] col : scanner1){ - result.add(wild.checkColumn(col, 0, col.length)); - } - wild.update(); - - //Scanner again - for(byte [] col : scanner){ - result.add(wild.checkColumn(col, 0, col.length)); - } - - //"Match" - assertEquals(expected.size(), result.size()); - for(int i=0; i expected = new ArrayList(); - int size = 5; - for(int i=0; i scanner = new ArrayList(); - scanner.add(col0); - scanner.add(col2); - scanner.add(col4); - scanner.add(col6); - scanner.add(col8); - - //Initialize result - List result = new ArrayList(); - - for(int i=0; i<2; i++){ - for(byte [] col : scanner){ - result.add(wild.checkColumn(col, 0, col.length)); - } - wild.update(); - } - - //Create "Scanner1" - List scanner1 = new ArrayList(); - scanner1.add(col1); - scanner1.add(col3); - scanner1.add(col5); - scanner1.add(col7); - scanner1.add(col9); - for(byte [] col : scanner1){ - result.add(wild.checkColumn(col, 0, col.length)); - } - wild.update(); - - //Scanner again - for(byte [] col : scanner){ - result.add(wild.checkColumn(col, 0, col.length)); - } - - //"Match" - assertEquals(expected.size(), result.size()); - - for(int i=0; i