.../hadoop/hbase/regionserver/ColumnCount.java | 18 ++-- .../hadoop/hbase/regionserver/ColumnTracker.java | 46 +++++++++ .../hbase/regionserver/ExplicitColumnTracker.java | 109 ++++++++++++++++++++- .../regionserver/ScanWildcardColumnTracker.java | 78 +++++++++++++-- 4 files changed, 231 insertions(+), 20 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java index 71ea1bd..42f584b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.nio.ByteBuffer; + import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -27,7 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public class ColumnCount { - private final byte [] bytes; + private final ByteBuffer buffer; private final int offset; private final int length; private int count; @@ -36,7 +38,7 @@ public class ColumnCount { * Constructor * @param column the qualifier to count the versions for */ - public ColumnCount(byte [] column) { + public ColumnCount(ByteBuffer column) { this(column, 0); } @@ -45,8 +47,8 @@ public class ColumnCount { * @param column the qualifier to count the versions for * @param count initial count */ - public ColumnCount(byte [] column, int count) { - this(column, 0, column.length, count); + public ColumnCount(ByteBuffer column, int count) { + this(column, 0, column.capacity(), count); } /** @@ -56,8 +58,8 @@ public class ColumnCount { * @param length of the qualifier * @param count initial count */ - public ColumnCount(byte [] column, int offset, int length, int count) { - this.bytes = column; + public ColumnCount(ByteBuffer column, int offset, int length, int count) { + this.buffer = column; this.offset = offset; this.length = length; this.count = count; @@ -66,8 +68,8 @@ public class ColumnCount { /** * @return the buffer */ - public byte [] getBuffer(){ - return this.bytes; + public byte[] getBuffer(){ + return this.buffer.array(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java index 8568cfc..a36b27a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; @@ -66,6 +67,24 @@ public interface ColumnTracker { */ ScanQueryMatcher.MatchCode checkColumn(byte[] bytes, int offset, int length, byte type) throws IOException; + + /** + * Checks if the column is present in the list of requested columns by returning the match code + * instance. It does not check against the number of versions for the columns asked for. To do the + * version check, one has to call {@link #checkVersions(buffer, int, int, long, byte, boolean)} + * method based on the return type (INCLUDE) of this method. The values that can be returned by + * this method are {@link MatchCode#INCLUDE}, {@link MatchCode#SEEK_NEXT_COL} and + * {@link MatchCode#SEEK_NEXT_ROW}. + * @param buffer + * @param offset + * @param length + * @param type The type of the KeyValue + * @return The match code instance. + * @throws IOException in case there is an internal consistency problem caused by a data + * corruption. + */ + ScanQueryMatcher.MatchCode checkColumn(ByteBuffer buffer, int offset, int length, byte type) + throws IOException; /** * Keeps track of the number of versions for the columns asked for. It assumes that the user has @@ -86,6 +105,26 @@ public interface ColumnTracker { */ ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length, long ttl, byte type, boolean ignoreCount) throws IOException; + + /** + * Keeps track of the number of versions for the columns asked for. It assumes that the user has + * already checked if the keyvalue needs to be included by calling the + * {@link #checkColumn(buffer, int, int, byte)} method. The enum values returned by this method + * are {@link MatchCode#SKIP}, {@link MatchCode#INCLUDE}, + * {@link MatchCode#INCLUDE_AND_SEEK_NEXT_COL} and {@link MatchCode#INCLUDE_AND_SEEK_NEXT_ROW}. + * Implementations which include all the columns could just return {@link MatchCode#INCLUDE} in + * the {@link #checkColumn(byte[], int, int, byte)} method and perform all the operations in this + * checkVersions method. + * @param type the type of the key value (Put/Delete) + * @param ttl The timeToLive to enforce. + * @param ignoreCount indicates if the KV needs to be excluded while counting (used during + * compactions. We only count KV's that are older than all the scanners' read points.) + * @return the scan query matcher match code instance + * @throws IOException in case there is an internal consistency problem caused by a data + * corruption. + */ + ScanQueryMatcher.MatchCode checkVersions(ByteBuffer buffer, int offset, int length, long ttl, + byte type, boolean ignoreCount) throws IOException; /** * Resets the Matcher */ @@ -116,6 +155,13 @@ public interface ColumnTracker { MatchCode getNextRowOrNextColumn( byte[] bytes, int offset, int qualLength ); + + /** + * Retrieve the MatchCode for the next row or column + */ + MatchCode getNextRowOrNextColumn( + ByteBuffer buffer, int offset, int qualLength + ); /** * Give the tracker a chance to declare it's done based on only the timestamp diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java index 470d36a..28ff461 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java @@ -19,12 +19,14 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.NavigableSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; /** @@ -93,7 +95,7 @@ public class ExplicitColumnTracker implements ColumnTracker { this.columns = new ColumnCount[columns.size()]; int i=0; for(byte [] column : columns) { - this.columns[i++] = new ColumnCount(column); + this.columns[i++] = new ColumnCount(ByteBuffer.wrap(column)); } reset(); } @@ -165,6 +167,62 @@ public class ExplicitColumnTracker implements ColumnTracker { } while(true); } + /** + * {@inheritDoc} + */ + @Override + public ScanQueryMatcher.MatchCode checkColumn(ByteBuffer buffer, int offset, + int length, byte type) { + // delete markers should never be passed to an + // *Explicit*ColumnTracker + assert !CellUtil.isDelete(type); + do { + // No more columns left, we are done with this query + if(done()) { + return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row + } + + // No more columns to match against, done with storefile + if(this.column == null) { + return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row + } + + // Compare specific column to current column + int ret = ByteBufferUtils.compareTo(ByteBuffer.wrap(column.getBuffer(), offset, length), + column.getOffset(), column.getLength(), buffer, offset, length); + + // Column Matches. Return include code. The caller would call checkVersions + // to limit the number of versions. + if(ret == 0) { + return ScanQueryMatcher.MatchCode.INCLUDE; + } + + resetTS(); + + if (ret > 0) { + // The current KV is smaller than the column the ExplicitColumnTracker + // is interested in, so seek to that column of interest. + return this.skipCount++ < this.lookAhead ? ScanQueryMatcher.MatchCode.SKIP + : ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; + } + + // The current KV is bigger than the column the ExplicitColumnTracker + // is interested in. That means there is no more data for the column + // of interest. Advance the ExplicitColumnTracker state to next + // column of interest, and check again. + if (ret <= -1) { + ++this.index; + this.skipCount = 0; + if (done()) { + // No more to match, do not include, done with this row. + return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row + } + // This is the recursive case. + this.column = this.columns[this.index]; + } + } while(true); + } + @Override public ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length, long timestamp, byte type, boolean ignoreCount) throws IOException { @@ -195,6 +253,36 @@ public class ExplicitColumnTracker implements ColumnTracker { return ScanQueryMatcher.MatchCode.INCLUDE; } + @Override + public ScanQueryMatcher.MatchCode checkVersions(ByteBuffer buffer, int offset, int length, + long timestamp, byte type, boolean ignoreCount) throws IOException { + assert !CellUtil.isDelete(type); + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + // Check if it is a duplicate timestamp + if (sameAsPreviousTS(timestamp)) { + // If duplicate, skip this Key + return ScanQueryMatcher.MatchCode.SKIP; + } + int count = this.column.increment(); + if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) { + // Done with versions for this column + ++this.index; + this.skipCount = 0; + resetTS(); + if (done()) { + // We have served all the requested columns. + this.column = null; + return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; + } + // We are done with current column; advance to next column + // of interest. + this.column = this.columns[this.index]; + return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL; + } + setTS(timestamp); + return ScanQueryMatcher.MatchCode.INCLUDE; + } + // Called between every row. public void reset() { this.index = 0; @@ -231,7 +319,7 @@ public class ExplicitColumnTracker implements ColumnTracker { * @param offset * @param length */ - public void doneWithColumn(byte [] bytes, int offset, int length) { + public void doneWithColumn(byte[] bytes, int offset, int length) { while (this.column != null) { int compare = Bytes.compareTo(column.getBuffer(), column.getOffset(), column.getLength(), bytes, offset, length); @@ -252,9 +340,22 @@ public class ExplicitColumnTracker implements ColumnTracker { } } + @Override public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, int qualLength) { - doneWithColumn(bytes, offset,qualLength); + doneWithColumn(bytes, offset, qualLength); + + if (getColumnHint() == null) { + return MatchCode.SEEK_NEXT_ROW; + } else { + return MatchCode.SEEK_NEXT_COL; + } + } + + @Override + public MatchCode getNextRowOrNextColumn(ByteBuffer buffer, int offset, + int qualLength) { + doneWithColumn(buffer.array(), offset,qualLength); if (getColumnHint() == null) { return MatchCode.SEEK_NEXT_ROW; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java index 85b36fb..975885f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java @@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.nio.ByteBuffer; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; /** @@ -32,7 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes; */ @InterfaceAudience.Private public class ScanWildcardColumnTracker implements ColumnTracker { - private byte [] columnBuffer = null; + private ByteBuffer columnBuffer = null; private int columnOffset = 0; private int columnLength = 0; private int currentCount = 0; @@ -70,6 +72,15 @@ public class ScanWildcardColumnTracker implements ColumnTracker { } /** + * {@inheritDoc} This receives puts *and* deletes. + */ + @Override + public MatchCode checkColumn(ByteBuffer buffer, int offset, int length, byte type) + throws IOException { + return MatchCode.INCLUDE; + } + + /** * {@inheritDoc} * This receives puts *and* deletes. Deletes do not count as a version, but rather * take the version of the previous put (so eventually all but the last can be reclaimed). @@ -80,13 +91,13 @@ public class ScanWildcardColumnTracker implements ColumnTracker { if (columnBuffer == null) { // first iteration. - resetBuffer(bytes, offset, length); + resetBuffer(ByteBuffer.wrap(bytes, offset, length), offset, length); if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; // do not count a delete marker as another version return checkVersion(type, timestamp); } - int cmp = Bytes.compareTo(bytes, offset, length, - columnBuffer, columnOffset, columnLength); + int cmp = Bytes.compareTo(bytes, offset, length, columnBuffer.array(), columnOffset + , columnLength); if (cmp == 0) { if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; @@ -102,7 +113,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker { // new col > old col if (cmp > 0) { // switched columns, lets do something.x - resetBuffer(bytes, offset, length); + resetBuffer(ByteBuffer.wrap(bytes, offset, length), offset, length); if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; return checkVersion(type, timestamp); } @@ -117,8 +128,51 @@ public class ScanWildcardColumnTracker implements ColumnTracker { Bytes.toStringBinary(bytes, offset, length)); } - private void resetBuffer(byte[] bytes, int offset, int length) { - columnBuffer = bytes; + @Override + public ScanQueryMatcher.MatchCode checkVersions(ByteBuffer buffer, int offset, int length, + long timestamp, byte type, boolean ignoreCount) throws IOException { + + if (columnBuffer == null) { + // first iteration. + resetBuffer(buffer, offset, length); + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + // do not count a delete marker as another version + return checkVersion(type, timestamp); + } + int cmp = ByteBufferUtils.compareTo(buffer, offset, length, + columnBuffer, columnOffset, columnLength); + if (cmp == 0) { + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + + //If column matches, check if it is a duplicate timestamp + if (sameAsPreviousTSAndType(timestamp, type)) { + return ScanQueryMatcher.MatchCode.SKIP; + } + return checkVersion(type, timestamp); + } + + resetTSAndType(); + + // new col > old col + if (cmp > 0) { + // switched columns, lets do something.x + resetBuffer(buffer, offset, length); + if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE; + return checkVersion(type, timestamp); + } + + // new col < oldcol + // WARNING: This means that very likely an edit for some other family + // was incorrectly stored into the store for this one. Throw an exception, + // because this might lead to data corruption. + throw new IOException( + "ScanWildcardColumnTracker.checkColumn ran into a column actually " + + "smaller than the previous column: " + + Bytes.toStringBinary(buffer.array(), offset, length)); + } + + private void resetBuffer(ByteBuffer buffer, int offset, int length) { + columnBuffer = buffer; columnOffset = offset; columnLength = length; currentCount = 0; @@ -181,6 +235,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker { * * @return The column count. */ + @Override public ColumnCount getColumnHint() { return null; } @@ -194,11 +249,18 @@ public class ScanWildcardColumnTracker implements ColumnTracker { return false; } + @Override public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, int qualLength) { return MatchCode.SEEK_NEXT_COL; } + @Override + public MatchCode getNextRowOrNextColumn(ByteBuffer buffer, int offset, + int qualLength) { + return MatchCode.SEEK_NEXT_COL; + } + public boolean isDone(long timestamp) { return minVersions <= 0 && isExpired(timestamp); }