diff --git hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/ColumnSpec.java hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/ColumnSpec.java new file mode 100644 index 0000000..c018b11 --- /dev/null +++ hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/ColumnSpec.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark; + +import org.apache.hadoop.hbase.util.Bytes; + +public class ColumnSpec { + public int start; + public int length; + public String name; + + public ColumnSpec(int s, int l, String n) { + start = s; + length = l; + name = n; + } + + public byte[] toBytes() { + byte[] nb = Bytes.toBytes(name); + byte[] b = new byte[Bytes.SIZEOF_INT * 2 + nb.length]; + System.arraycopy(Bytes.toBytes(start), 0, b, 0, Bytes.SIZEOF_INT); + System.arraycopy(Bytes.toBytes(length), 0, b, Bytes.SIZEOF_INT, Bytes.SIZEOF_INT); + System.arraycopy(nb, 0, b, Bytes.SIZEOF_INT * 2, nb.length); + return b; + } + + public static ColumnSpec fromBytes(byte[] b) { + int s = Bytes.toInt(b, 0); + int l = Bytes.toInt(b, Bytes.SIZEOF_INT); + String n = Bytes.toString(b, Bytes.SIZEOF_INT * 2); + ColumnSpec cs = new ColumnSpec(s, l, n); + return cs; + + } +} diff --git hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java index 057853f..e6cfd8b 100644 --- hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java +++ hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java @@ -35,6 +35,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** * This filter will push down all qualifier logic given to us @@ -47,19 +49,19 @@ public class SparkSQLPushDownFilter extends FilterBase{ //The following values are populated with protobuffer DynamicLogicExpression dynamicLogicExpression; byte[][] valueFromQueryArray; - HashMap> + // We need to change the last field String to (String, start, offset) to support composite key + HashMap> currentCellToColumnIndexMap; //The following values are transient HashMap columnToCurrentRowValueMap = null; static final byte[] rowKeyFamily = new byte[0]; - static final byte[] rowKeyQualifier = Bytes.toBytes("key"); public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression, byte[][] valueFromQueryArray, HashMap> + HashMap> currentCellToColumnIndexMap) { this.dynamicLogicExpression = dynamicLogicExpression; this.valueFromQueryArray = valueFromQueryArray; @@ -83,7 +85,7 @@ public class SparkSQLPushDownFilter extends FilterBase{ ByteArrayComparable familyByteComparable = new ByteArrayComparable(cfBytes, 0, cfBytes.length); - HashMap qualifierIndexMap = + HashMap qualifierIndexMap = currentCellToColumnIndexMap.get(familyByteComparable); if (qualifierIndexMap == null) { @@ -94,7 +96,7 @@ public class SparkSQLPushDownFilter extends FilterBase{ ByteArrayComparable qualifierByteComparable = new ByteArrayComparable(qBytes, 0, qBytes.length); - qualifierIndexMap.put(qualifierByteComparable, field.colName()); + qualifierIndexMap.put(qualifierByteComparable, field.columnSpec().toBytes()); } } @@ -105,20 +107,25 @@ public class SparkSQLPushDownFilter extends FilterBase{ // the row key if (columnToCurrentRowValueMap == null) { columnToCurrentRowValueMap = new HashMap<>(); - HashMap qualifierColumnMap = + HashMap qualifierColumnMap = currentCellToColumnIndexMap.get( new ByteArrayComparable(rowKeyFamily, 0, rowKeyFamily.length)); if (qualifierColumnMap != null) { - String rowKeyColumnName = - qualifierColumnMap.get( - new ByteArrayComparable(rowKeyQualifier, 0, - rowKeyQualifier.length)); - //Make sure that the rowKey is part of the where clause - if (rowKeyColumnName != null) { - columnToCurrentRowValueMap.put(rowKeyColumnName, - new ByteArrayComparable(c.getRowArray(), - c.getRowOffset(), c.getRowLength())); + Set> entries = qualifierColumnMap.entrySet(); + // We should get a sequence of rowKeyColumnName (name, start, length) + // and fill all the columns belong to row key. + for (Entry entry: entries) { + ColumnSpec cs = ColumnSpec.fromBytes(entry.getValue()); + System.out.println("rowkey: " + cs.name + " start " + cs.start + " length " + cs.length); + int length = cs.length; + // If we do not know the length, assume to extend to the end. + if (length == -1) { + length = c.getRowLength() - cs.start; + } + columnToCurrentRowValueMap.put(cs.name, + new ByteArrayComparable(c.getRowArray(), + c.getRowOffset() + cs.start, length)); } } } @@ -129,17 +136,18 @@ public class SparkSQLPushDownFilter extends FilterBase{ c.getFamilyOffset(), c.getFamilyLength()); - HashMap qualifierColumnMap = + HashMap qualifierColumnMap = currentCellToColumnIndexMap.get( currentFamilyByteComparable); if (qualifierColumnMap != null) { - String columnName = + byte[] columnSpec = qualifierColumnMap.get( new ByteArrayComparable(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength())); + String columnName = ColumnSpec.fromBytes(columnSpec).name; if (columnName != null) { columnToCurrentRowValueMap.put(columnName, @@ -196,7 +204,7 @@ public class SparkSQLPushDownFilter extends FilterBase{ } //Load mapping from HBase family/qualifier to Spark SQL columnName - HashMap> + HashMap> currentCellToColumnIndexMap = new HashMap<>(); for (FilterProtos.SQLPredicatePushDownCellToColumnMapping @@ -207,7 +215,7 @@ public class SparkSQLPushDownFilter extends FilterBase{ sqlPredicatePushDownCellToColumnMapping.getColumnFamily().toByteArray(); ByteArrayComparable familyByteComparable = new ByteArrayComparable(familyArray, 0, familyArray.length); - HashMap qualifierMap = + HashMap qualifierMap = currentCellToColumnIndexMap.get(familyByteComparable); if (qualifierMap == null) { @@ -221,7 +229,7 @@ public class SparkSQLPushDownFilter extends FilterBase{ new ByteArrayComparable(qualifierArray, 0 ,qualifierArray.length); qualifierMap.put(qualifierByteComparable, - sqlPredicatePushDownCellToColumnMapping.getColumnName()); + sqlPredicatePushDownCellToColumnMapping.getColumnSpec().toByteArray()); } return new SparkSQLPushDownFilter(dynamicLogicExpression, @@ -244,15 +252,15 @@ public class SparkSQLPushDownFilter extends FilterBase{ builder.addValueFromQueryArray(ByteStringer.wrap(valueFromQuery)); } - for (Map.Entry> + for (Entry> familyEntry : currentCellToColumnIndexMap.entrySet()) { - for (Map.Entry qualifierEntry : + for (Entry qualifierEntry : familyEntry.getValue().entrySet()) { columnMappingBuilder.setColumnFamily( ByteStringer.wrap(familyEntry.getKey().bytes())); columnMappingBuilder.setQualifier( ByteStringer.wrap(qualifierEntry.getKey().bytes())); - columnMappingBuilder.setColumnName(qualifierEntry.getValue()); + columnMappingBuilder.setColumnSpec(ByteStringer.wrap(qualifierEntry.getValue())); builder.addCellToColumnMapping(columnMappingBuilder.build()); } } diff --git hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java index 1968d32..76a63c2 100644 --- hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java +++ hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java @@ -31,20 +31,15 @@ public final class FilterProtos { */ com.google.protobuf.ByteString getQualifier(); - // required string column_name = 3; + // required bytes column_spec = 3; /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - boolean hasColumnName(); + boolean hasColumnSpec(); /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - java.lang.String getColumnName(); - /** - * required string column_name = 3; - */ - com.google.protobuf.ByteString - getColumnNameBytes(); + com.google.protobuf.ByteString getColumnSpec(); } /** * Protobuf type {@code hbase.pb.SQLPredicatePushDownCellToColumnMapping} @@ -109,7 +104,7 @@ public final class FilterProtos { } case 26: { bitField0_ |= 0x00000004; - columnName_ = input.readBytes(); + columnSpec_ = input.readBytes(); break; } } @@ -184,53 +179,26 @@ public final class FilterProtos { return qualifier_; } - // required string column_name = 3; - public static final int COLUMN_NAME_FIELD_NUMBER = 3; - private java.lang.Object columnName_; + // required bytes column_spec = 3; + public static final int COLUMN_SPEC_FIELD_NUMBER = 3; + private com.google.protobuf.ByteString columnSpec_; /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public boolean hasColumnName() { + public boolean hasColumnSpec() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public java.lang.String getColumnName() { - java.lang.Object ref = columnName_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - if (bs.isValidUtf8()) { - columnName_ = s; - } - return s; - } - } - /** - * required string column_name = 3; - */ - public com.google.protobuf.ByteString - getColumnNameBytes() { - java.lang.Object ref = columnName_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - columnName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } + public com.google.protobuf.ByteString getColumnSpec() { + return columnSpec_; } private void initFields() { columnFamily_ = com.google.protobuf.ByteString.EMPTY; qualifier_ = com.google.protobuf.ByteString.EMPTY; - columnName_ = ""; + columnSpec_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -245,7 +213,7 @@ public final class FilterProtos { memoizedIsInitialized = 0; return false; } - if (!hasColumnName()) { + if (!hasColumnSpec()) { memoizedIsInitialized = 0; return false; } @@ -263,7 +231,7 @@ public final class FilterProtos { output.writeBytes(2, qualifier_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeBytes(3, getColumnNameBytes()); + output.writeBytes(3, columnSpec_); } getUnknownFields().writeTo(output); } @@ -284,7 +252,7 @@ public final class FilterProtos { } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(3, getColumnNameBytes()); + .computeBytesSize(3, columnSpec_); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -319,10 +287,10 @@ public final class FilterProtos { result = result && getQualifier() .equals(other.getQualifier()); } - result = result && (hasColumnName() == other.hasColumnName()); - if (hasColumnName()) { - result = result && getColumnName() - .equals(other.getColumnName()); + result = result && (hasColumnSpec() == other.hasColumnSpec()); + if (hasColumnSpec()) { + result = result && getColumnSpec() + .equals(other.getColumnSpec()); } result = result && getUnknownFields().equals(other.getUnknownFields()); @@ -345,9 +313,9 @@ public final class FilterProtos { hash = (37 * hash) + QUALIFIER_FIELD_NUMBER; hash = (53 * hash) + getQualifier().hashCode(); } - if (hasColumnName()) { - hash = (37 * hash) + COLUMN_NAME_FIELD_NUMBER; - hash = (53 * hash) + getColumnName().hashCode(); + if (hasColumnSpec()) { + hash = (37 * hash) + COLUMN_SPEC_FIELD_NUMBER; + hash = (53 * hash) + getColumnSpec().hashCode(); } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; @@ -462,7 +430,7 @@ public final class FilterProtos { bitField0_ = (bitField0_ & ~0x00000001); qualifier_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000002); - columnName_ = ""; + columnSpec_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -503,7 +471,7 @@ public final class FilterProtos { if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.columnName_ = columnName_; + result.columnSpec_ = columnSpec_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -526,10 +494,8 @@ public final class FilterProtos { if (other.hasQualifier()) { setQualifier(other.getQualifier()); } - if (other.hasColumnName()) { - bitField0_ |= 0x00000004; - columnName_ = other.columnName_; - onChanged(); + if (other.hasColumnSpec()) { + setColumnSpec(other.getColumnSpec()); } this.mergeUnknownFields(other.getUnknownFields()); return this; @@ -544,7 +510,7 @@ public final class FilterProtos { return false; } - if (!hasColumnName()) { + if (!hasColumnSpec()) { return false; } @@ -642,76 +608,38 @@ public final class FilterProtos { return this; } - // required string column_name = 3; - private java.lang.Object columnName_ = ""; + // required bytes column_spec = 3; + private com.google.protobuf.ByteString columnSpec_ = com.google.protobuf.ByteString.EMPTY; /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public boolean hasColumnName() { + public boolean hasColumnSpec() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public java.lang.String getColumnName() { - java.lang.Object ref = columnName_; - if (!(ref instanceof java.lang.String)) { - java.lang.String s = ((com.google.protobuf.ByteString) ref) - .toStringUtf8(); - columnName_ = s; - return s; - } else { - return (java.lang.String) ref; - } - } - /** - * required string column_name = 3; - */ - public com.google.protobuf.ByteString - getColumnNameBytes() { - java.lang.Object ref = columnName_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - columnName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } + public com.google.protobuf.ByteString getColumnSpec() { + return columnSpec_; } /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public Builder setColumnName( - java.lang.String value) { + public Builder setColumnSpec(com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; - columnName_ = value; + columnSpec_ = value; onChanged(); return this; } /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public Builder clearColumnName() { + public Builder clearColumnSpec() { bitField0_ = (bitField0_ & ~0x00000004); - columnName_ = getDefaultInstance().getColumnName(); - onChanged(); - return this; - } - /** - * required string column_name = 3; - */ - public Builder setColumnNameBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000004; - columnName_ = value; + columnSpec_ = getDefaultInstance().getColumnSpec(); onChanged(); return this; } @@ -1802,7 +1730,7 @@ public final class FilterProtos { "\n\014Filter.proto\022\010hbase.pb\"h\n\'SQLPredicate" + "PushDownCellToColumnMapping\022\025\n\rcolumn_fa" + "mily\030\001 \002(\014\022\021\n\tqualifier\030\002 \002(\014\022\023\n\013column_" + - "name\030\003 \002(\t\"\261\001\n\032SQLPredicatePushDownFilte" + + "spec\030\003 \002(\014\"\261\001\n\032SQLPredicatePushDownFilte" + "r\022 \n\030dynamic_logic_expression\030\001 \002(\t\022\036\n\026v" + "alue_from_query_array\030\002 \003(\014\022Q\n\026cell_to_c" + "olumn_mapping\030\003 \003(\01321.hbase.pb.SQLPredic" + @@ -1820,7 +1748,7 @@ public final class FilterProtos { internal_static_hbase_pb_SQLPredicatePushDownCellToColumnMapping_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_SQLPredicatePushDownCellToColumnMapping_descriptor, - new java.lang.String[] { "ColumnFamily", "Qualifier", "ColumnName", }); + new java.lang.String[] { "ColumnFamily", "Qualifier", "ColumnSpec", }); internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable = new diff --git hbase-spark/src/main/protobuf/Filter.proto hbase-spark/src/main/protobuf/Filter.proto index e076ce8..67cfd61 100644 --- hbase-spark/src/main/protobuf/Filter.proto +++ hbase-spark/src/main/protobuf/Filter.proto @@ -28,11 +28,11 @@ option optimize_for = SPEED; message SQLPredicatePushDownCellToColumnMapping { required bytes column_family = 1; required bytes qualifier = 2; - required string column_name = 3; + required bytes column_spec = 3; } message SQLPredicatePushDownFilter { required string dynamic_logic_expression = 1; repeated bytes value_from_query_array = 2; repeated SQLPredicatePushDownCellToColumnMapping cell_to_column_mapping = 3; -} \ No newline at end of file +} diff --git hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala index fce92fb..7887fac 100644 --- hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala +++ hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala @@ -25,7 +25,7 @@ class ByteArrayComparable(val bytes:Array[Byte], val offset:Int = 0, var length: extends Comparable[ByteArrayComparable] { if (length == -1) { - length = bytes.length + length = bytes.length - offset } override def compareTo(o: ByteArrayComparable): Int = { diff --git hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index 788a704..a9f9765 100644 --- hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -23,10 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat -import org.apache.hadoop.hbase.spark.datasources.BoundRange -import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf -import org.apache.hadoop.hbase.spark.datasources.HBaseTableScanRDD -import org.apache.hadoop.hbase.spark.datasources.SerializableConfiguration +import org.apache.hadoop.hbase.spark.datasources._ import org.apache.hadoop.hbase.types._ import org.apache.hadoop.hbase.util.{Bytes, PositionedByteRange, SimplePositionedMutableByteRange} import org.apache.hadoop.hbase._ @@ -134,6 +131,26 @@ case class HBaseRelation ( override val schema: StructType = userSpecifiedSchema.getOrElse(catalog.toDataType) + def rows = catalog.row + + def getField(name: String): Field = { + catalog.getField(name) + } + + // check whether the column is the first key in the rowkey + def isPrimaryKey(c: String): Boolean = { + val f1 = catalog.getRowKey(0) + val f2 = getField(c) + f1 == f2 + } + + def isComposite(): Boolean = { + catalog.getRowKey.size > 1 + } + def isColumn(c: String): Boolean = { + !catalog.getRowKey.map(_.colName).contains(c) + } + def createTable() { val numReg = parameters.get(HBaseTableCatalog.newTable).map(x => x.toInt).getOrElse(0) @@ -239,20 +256,9 @@ case class HBaseRelation ( // Return the new index and appended value (idx + field.length, parsed ++ Seq((field, value))) } else { - field.dt match { - case StringType => - val pos = row.indexOf(HBaseTableCatalog.delimiter, idx) - if (pos == -1 || pos > row.length) { - // this is at the last dimension - val value = Utils.hbaseFieldToScalaType(field, row, idx, row.length) - (row.length + 1, parsed ++ Seq((field, value))) - } else { - val value = Utils.hbaseFieldToScalaType(field, row, idx, pos - idx) - (pos, parsed ++ Seq((field, value))) - } - // We don't know the length, assume it extends to the end of the rowkey. - case _ => (row.length + 1, parsed ++ Seq((field, Utils.hbaseFieldToScalaType(field, row, idx, row.length)))) - } + // This is the last dimension. + val value = Utils.hbaseFieldToScalaType(field, row, idx, row.length - idx) + (row.length + 1, parsed ++ Seq((field, value))) } })._2.toMap } @@ -390,7 +396,7 @@ case class HBaseRelation ( }) val queryValueArray = queryValueList.toArray - println(s"""superRowKeyFilter: $superRowKeyFilter""") + if (superRowKeyFilter == null) { superRowKeyFilter = new RowKeyFilter } @@ -398,19 +404,44 @@ case class HBaseRelation ( (superRowKeyFilter, superDynamicLogicExpression, queryValueArray) } + // convert the point to the range if it is composite key + private def padding(value: Array[Byte], length: Int): Array[Byte] = { + if (!isComposite || value == null || value.length == length ) { + value + } else { + println(s"""padding: ${value.length}, $length""") + val end = Array.fill(length)(-1: Byte) + System.arraycopy(value, 0, end, 0, value.length) + end + } + } + + // convert the upper bound to extend to the whole rowkey length + private def convertUpperBound(value: Array[Byte], length: Int): Array[Byte] = { + val upper = Array.fill(length)(-1: Byte) + System.arraycopy(value, 0, upper, 0, value.length) + upper + } + def transverseFilterTree(parentRowKeyFilter:RowKeyFilter, valueArray:mutable.MutableList[Array[Byte]], filter:Filter): DynamicLogicExpression = { - println(s"""filter = $filter""") filter match { case EqualTo(attr, value) => val field = catalog.getField(attr) if (field != null) { - if (field.isRowKey) { - parentRowKeyFilter.mergeIntersect(new RowKeyFilter( - DefaultSourceStaticUtils.getByteValue(field, - value.toString), null)) + if (isPrimaryKey(attr)) { + val bytes = DefaultSourceStaticUtils.getByteValue(field, + value.toString) + if (!isComposite()) { + parentRowKeyFilter.mergeIntersect(new RowKeyFilter( + bytes, null)) + } else { + // we have to convert the point to range in case of composite key + parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null, + new ScanRange(padding(bytes, rows.length), true, bytes, true))) + } } val byteValue = DefaultSourceStaticUtils.getByteValue(field, value.toString) @@ -420,12 +451,16 @@ case class HBaseRelation ( case LessThan(attr, value) => val field = catalog.getField(attr) if (field != null) { - if (field.isRowKey) { + if (isPrimaryKey(attr)) { val b = BoundRange(value) var inc = false b.map(_.less.map { x => val r = new RowKeyFilter(null, - new ScanRange(x.upper, inc, x.low, true) + new ScanRange(if (inc) { + padding(x.upper, rows.length) + } else { + x.upper + }, inc, x.low, true) ) inc = true r @@ -437,18 +472,17 @@ case class HBaseRelation ( } val byteValue = FilterOps.encode(field.dt, value) valueArray += byteValue - } new LessThanLogicExpression(attr, valueArray.length - 1) case GreaterThan(attr, value) => val field = catalog.getField(attr) if (field != null) { - if (field.isRowKey) { + if (isPrimaryKey(attr)) { val b = BoundRange(value) var inc = false - b.map(_.greater.map{x => + b.map(_.greater.map { x => val r = new RowKeyFilter(null, - new ScanRange(x.upper, true, x.low, inc)) + new ScanRange(padding(x.upper, rows.length), true, x.low, inc)) inc = true r }).map { x => @@ -464,16 +498,17 @@ case class HBaseRelation ( case LessThanOrEqual(attr, value) => val field = catalog.getField(attr) if (field != null) { - if (field.isRowKey) { + if (isPrimaryKey(attr)) { val b = BoundRange(value) b.map(_.less.map(x => new RowKeyFilter(null, - new ScanRange(x.upper, true, x.low, true)))) + new ScanRange(padding(x.upper, rows.length), true, x.low, true)))) .map { x => - x.reduce{ (i, j) => + x.reduce { (i, j) => i.mergeUnion(j) } }.map(parentRowKeyFilter.mergeIntersect(_)) + } val byteValue = FilterOps.encode(field.dt, value) valueArray += byteValue @@ -482,11 +517,11 @@ case class HBaseRelation ( case GreaterThanOrEqual(attr, value) => val field = catalog.getField(attr) if (field != null) { - if (field.isRowKey) { + if (isPrimaryKey(attr)) { val b = BoundRange(value) b.map(_.greater.map(x => new RowKeyFilter(null, - new ScanRange(x.upper, true, x.low, true)))) + new ScanRange(padding(x.upper, rows.length), true, x.low, true)))) .map { x => x.reduce { (i, j) => i.mergeUnion(j) @@ -563,7 +598,7 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean, * * @param other Other scan object */ - def mergeUnion(other:ScanRange): Unit = { + def mergeUnion(other:ScanRange): ScanRange = { val upperBoundCompare = compareRange(upperBound, other.upperBound) val lowerBoundCompare = compareRange(lowerBound, other.lowerBound) @@ -578,6 +613,7 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean, isUpperBoundEqualTo = if (upperBoundCompare == 0) isUpperBoundEqualTo || other.isUpperBoundEqualTo else if (upperBoundCompare < 0) other.isUpperBoundEqualTo else isUpperBoundEqualTo + this } /** @@ -660,13 +696,10 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean, (!isUpperBoundEqualTo && upperCompare < 0)) } - /* override def toString:String = { + override def toString:String = { "ScanRange:(upperBound:" + Bytes.toString(upperBound) + ",isUpperBoundEqualTo:" + isUpperBoundEqualTo + ",lowerBound:" + Bytes.toString(lowerBound) + ",isLowerBoundEqualTo:" + isLowerBoundEqualTo + ")" - }*/ - override def toString: String = { - s"""ScanRange(${ if (upperBound != null && upperBound.size == 4) Bytes.toInt(upperBound) else null },$isUpperBoundEqualTo, ${if (lowerBound != null && lowerBound.size == 4) Bytes.toInt(lowerBound) else null }, $isLowerBoundEqualTo)""".stripMargin } } @@ -1073,18 +1106,28 @@ class RowKeyFilter (currentPoint:Array[Byte] = null, * @param other Filter to merge */ def mergeUnion(other:RowKeyFilter): RowKeyFilter = { - println(s"""mergeUnion $this $other""") + println(s"""mergeUnion: $this $other""") other.points.foreach( p => points += p) - other.ranges.foreach( otherR => { + // we may change it in the mergeUnion below. + var newR = otherR var doesOverLap = false + // This is the ranges after otherR union with existing ranges + // It is used to keep the changed ranges after each iteration. + var newRanges = mutable.MutableList[ScanRange]() ranges.foreach{ r => - if (r.getOverLapScanRange(otherR) != null) { - r.mergeUnion(otherR) - doesOverLap = true + // If there is overlap, we update newR to be compared + // and do not add them to new ranges. We will not lose it + // since newR has it covered. Otherwise, we may have duplicate range + if (r.getOverLapScanRange(newR) != null) { + newR = r.mergeUnion(newR) + } else { + newRanges :+= r }} - if (!doesOverLap) ranges.+=(otherR) + newRanges :+= newR + ranges = newRanges }) + println(s"""mergeUnion: done $this """) this } @@ -1095,7 +1138,6 @@ class RowKeyFilter (currentPoint:Array[Byte] = null, * @param other Filter to merge */ def mergeIntersect(other:RowKeyFilter): RowKeyFilter = { - println(s"""mergeIntersect $this $other""") val survivingPoints = new mutable.MutableList[Array[Byte]]() val didntSurviveFirstPassPoints = new mutable.MutableList[Array[Byte]]() if (points == null || points.length == 0) { @@ -1158,7 +1200,7 @@ class RowKeyFilter (currentPoint:Array[Byte] = null, ranges.foreach( r => { if (isFirst) isFirst = false else strBuilder.append(",") - strBuilder.append(r.toString) + strBuilder.append(r) }) strBuilder.append("))") strBuilder.toString() diff --git hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala index 00e88e5..f988ae2 100644 --- hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala +++ hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala @@ -45,7 +45,6 @@ object FilterOps extends Enumeration { val UnknownEnc = nextCode def compare(c: Int, ops: FilterOps): Boolean = { - println(s"""compare $c, $ops""") ops match { case Greater => c > 0 case GreaterEqual => c >= 0 @@ -226,7 +225,9 @@ class EqualLogicExpression (val columnName:String, valueFromQueryValueArray:Array[Array[Byte]]): Boolean = { val currentRowValue = columnToCurrentRowValueMap.get(columnName) val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex) - + System.out.println("query: " + Bytes.toString(valueFromQuery) + " input: offset " + currentRowValue.offset + + " length: " + currentRowValue.length + " value " + + Bytes.toString(currentRowValue.bytes, currentRowValue.offset, currentRowValue.length)) currentRowValue != null && Bytes.equals(valueFromQuery, 0, valueFromQuery.length, currentRowValue.bytes, @@ -308,7 +309,6 @@ object DynamicLogicExpressionBuilder { private def build(expressionArray:Array[String], offSet:Int): (DynamicLogicExpression, Int) = { - println(s"""expression array ${expressionArray.mkString(":")}""") if (expressionArray(offSet).equals("(")) { val left = build(expressionArray, offSet + 1) val right = build(expressionArray, left._2 + 1) diff --git hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala index ce7b55a..0a672f0 100644 --- hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala +++ hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala @@ -27,6 +27,7 @@ package object hbase { def bytesMax = null val ByteMax = -1.asInstanceOf[Byte] val ByteMin = 0.asInstanceOf[Byte] + val MaxLength = 256 val ord: Ordering[HBaseType] = new Ordering[HBaseType] { def compare(x: Array[Byte], y: Array[Byte]): Int = { return Bytes.compareTo(x, y) diff --git hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala index c2d611f..098471e 100644 --- hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala +++ hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.datasources.hbase import org.apache.avro.Schema -import org.apache.hadoop.hbase.spark.SchemaConverters +import org.apache.hadoop.hbase.spark.{ColumnSpec, SchemaConverters} import org.apache.hadoop.hbase.spark.datasources._ import org.apache.hadoop.hbase.spark.hbase._ import org.apache.hadoop.hbase.util.Bytes @@ -39,7 +39,7 @@ case class Field( sType: Option[String] = None, avroSchema: Option[String] = None, serdes: Option[SerDes]= None, - len: Int = -1) extends Logging { + private val len: Int = -1) extends Logging { override def toString = s"$colName $cf $col" val isRowKey = cf == HBaseTableCatalog.rowKey var start: Int = _ @@ -69,11 +69,7 @@ case class Field( } } def colBytes: Array[Byte] = { - if (isRowKey) { - Bytes.toBytes("key") - } else { - Bytes.toBytes(col) - } + Bytes.toBytes(col) } val dt = { @@ -108,6 +104,8 @@ case class Field( colName == that.colName && cf == that.cf && col == that.col case _ => false } + + def columnSpec = new ColumnSpec(start, length, colName) } // The row key definition, with each key refer to the col defined in Field, e.g., @@ -117,12 +115,13 @@ case class RowKey(k: String) { var fields: Seq[Field] = _ var varLength = false def length = { - if (varLength) { - -1 - } else { - fields.foldLeft(0){case (x, y) => - x + y.length + fields.foldLeft(0) { case (x, y) => + val yLen = if (y.length == -1) { + MaxLength + } else { + y.length } + x + yLen } } } @@ -155,49 +154,20 @@ case class HBaseTableCatalog( def get(key: String) = params.get(key) - // Setup the start and length for each dimension of row key at runtime. - def dynSetupRowKey(rowKey: Array[Byte]) { - logDebug(s"length: ${rowKey.length}") - if(row.varLength) { - var start = 0 - row.fields.foreach { f => - logDebug(s"start: $start") - f.start = start - f.length = { - // If the length is not defined - if (f.length == -1) { - f.dt match { - case StringType => - var pos = rowKey.indexOf(HBaseTableCatalog.delimiter, start) - if (pos == -1 || pos > rowKey.length) { - // this is at the last dimension - pos = rowKey.length - } - pos - start - // We don't know the length, assume it extend to the end of the rowkey. - case _ => rowKey.length - start - } - } else { - f.length - } - } - start += f.length - } - } - } - + // Setup the start and length for each dimension of row key def initRowKey = { val fields = sMap.fields.filter(_.cf == HBaseTableCatalog.rowKey) row.fields = row.keys.flatMap(n => fields.find(_.col == n)) - // The length is determined at run time if it is string or binary and the length is undefined. - if (row.fields.filter(_.length == -1).isEmpty) { + // We only allowed there is one key at the end that is determined at runtime. + if (row.fields.reverse.tail.filter(_.length == -1).isEmpty) { var start = 0 row.fields.foreach { f => f.start = start start += f.length } } else { - row.varLength = true + throw new Exception("Only the last dimension of " + + "RowKey is allowed to have varied length") } } initRowKey diff --git hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/CompositeKeySuite.scala hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/CompositeKeySuite.scala new file mode 100644 index 0000000..8f40728 --- /dev/null +++ hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/CompositeKeySuite.scala @@ -0,0 +1,215 @@ +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf +import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility} +import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} + +case class HBaseCompositeRecord( + col00: String, + col01: Int, + col1: Boolean, + col2: Double, + col3: Float, + col4: Int, + col5: Long, + col6: Short, + col7: String, + col8: Byte) + +object HBaseCompositeRecord { + def apply(i: Int): HBaseCompositeRecord = { + HBaseCompositeRecord(s"row${"%03d".format(i)}", + if (i % 2 == 0) { + i + } else { + -i + }, + i % 2 == 0, + i.toDouble, + i.toFloat, + i, + i.toLong, + i.toShort, + s"String$i extra", + i.toByte) + } +} +class CompositeKeySuite extends FunSuite with + BeforeAndAfterEach with BeforeAndAfterAll with Logging { + @transient var sc: SparkContext = null + var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility + + var sqlContext: SQLContext = null + var df: DataFrame = null + + def withCatalog(cat: String): DataFrame = { + sqlContext + .read + .options(Map(HBaseTableCatalog.tableCatalog -> cat)) + .format("org.apache.hadoop.hbase.spark") + .load() + } + + override def beforeAll() { + + TEST_UTIL.startMiniCluster + val sparkConf = new SparkConf + sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true") + sparkConf.set(HBaseSparkConf.BATCH_NUM, "100") + sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100") + + sc = new SparkContext("local", "test", sparkConf) + new HBaseContext(sc, TEST_UTIL.getConfiguration) + sqlContext = new SQLContext(sc) + } + + override def afterAll() { + logInfo("shuting down minicluster") + TEST_UTIL.shutdownMiniCluster() + + sc.stop() + } + + override def beforeEach(): Unit = { + DefaultSourceStaticUtils.lastFiveExecutionRules.clear() + } + + def catalog = s"""{ + |"table":{"namespace":"default", "name":"table1"}, + |"rowkey":"key1:key2", + |"columns":{ + |"col00":{"cf":"rowkey", "col":"key1", "type":"string", "length":"6"}, + |"col01":{"cf":"rowkey", "col":"key2", "type":"int"}, + |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, + |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, + |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, + |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, + |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, + |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, + |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, + |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} + |} + |}""".stripMargin + + + test("populate table with composite key") { + val sql = sqlContext + import sql.implicits._ + + val data = (0 to 255).map { i => + HBaseCompositeRecord(i) + } + sc.parallelize(data).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) + .format("org.apache.hadoop.hbase.spark") + .save() + } + + test("full query") { + val df = withCatalog(catalog) + df.show + assert(df.count() == 256) + } + + test("filtered query1") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"col00" <= "row050" && $"col01" > 40) + .select("col00", "col01","col1") + s.show + assert(s.count() == 5) + } + + test("filtered query2") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"col00" <= "row050" && $"col01" >= 40) + .select("col00", "col01","col1") + s.show + assert(s.count() == 6) + } + + test("filtered query3") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"col00" >= "row250" && $"col01" < 50) + .select("col00", "col01","col1") + s.show + assert(s.count() == 3) + } + + test("filtered query4") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"col00" <= "row010") // row005 not included + .select("col00", "col01","col1") + s.show + assert(s.count() == 11) + } + + test("filtered query5") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"col00" === "row010") // row005 not included + .select("col00", "col01","col1") + s.show + assert(s.count() == 1) + } + test("filtered query51") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"col00" === "row011") // row005 not included + .select("col00", "col01","col1") + s.show + assert(s.count() == 1) + } + + test("filtered query52") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"col00" === "row005") // row005 not included + .select("col00", "col01","col1") + s.show + assert(s.count() == 1) + } + + test("filtered query6") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter(($"col00" <= "row050" && $"col00" > "row040") || + $"col00" === "row010" || // no included, since it is composite key + $"col00" === "row020" || // not inlcuded + $"col00" === "r20" || // not included + $"col00" <= "row010") // row005 not included + .select("col00", "col01","col1") + s.show(40) + assert(s.count() == 22) + } + + + test("filtered query7") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter(($"col00" <= "row050" && $"col00" > "row040") || + $"col00" === "row005" || // no included, since it is composite key + $"col00" === "row020" || // not inlcuded + $"col00" === "r20" || // not included + $"col00" <= "row005") // row005 not included + .select("col00", "col01","col1") + s.show(40) + assert(s.count() == 17) + } + +} diff --git hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala index dd23336..73adfb9 100644 --- hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala +++ hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala @@ -275,7 +275,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { * A example of a OR merge between to ranges the result is one range * Also an example of less then and greater then */ - ignore("Test two range rowKey query") { + test("Test two range rowKey query") { val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " + "WHERE " + "( KEY_FIELD < 'get2' or KEY_FIELD > 'get3')").take(10) @@ -294,12 +294,12 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes(""))) assert(Bytes.equals(scanRange1.upperBound,Bytes.toBytes("get2"))) assert(scanRange1.isLowerBoundEqualTo) - // assert(!scanRange1.isUpperBoundEqualTo) + assert(!scanRange1.isUpperBoundEqualTo) val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes("get3"))) assert(scanRange2.upperBound == null) - //assert(!scanRange2.isLowerBoundEqualTo) + assert(!scanRange2.isLowerBoundEqualTo) assert(scanRange2.isUpperBoundEqualTo) } @@ -323,14 +323,6 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(executionRules.rowKeyFilter.points.size == 0) assert(executionRules.rowKeyFilter.ranges.size == 2) -/* - val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get - println(s"scanRange1 $scanRange1") - assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes(""))) - assert(scanRange1.upperBound == null) - assert(scanRange1.isLowerBoundEqualTo) - assert(scanRange1.isUpperBoundEqualTo) -*/ assert(results.length == 5) } @@ -355,15 +347,11 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { assert(executionRules.rowKeyFilter.ranges.size == 3) val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get - //assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes(""))) assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2))) assert(scanRange1.isLowerBoundEqualTo) assert(!scanRange1.isUpperBoundEqualTo) val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get - // assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes(4))) - // assert(scanRange2.upperBound == null) - // assert(!scanRange2.isLowerBoundEqualTo) assert(scanRange2.isUpperBoundEqualTo) assert(results.length == 2) @@ -414,7 +402,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { * A complex query with one point and one range for both the * rowKey and the a column */ - ignore("Test SQL point and range combo") { + test("Test SQL point and range combo") { val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTable1 " + "WHERE " + "(KEY_FIELD = 'get1' and B_FIELD < '3') or " + @@ -468,7 +456,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes(3))) assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes(5))) - // assert(!scanRange2.isLowerBoundEqualTo) + assert(!scanRange2.isLowerBoundEqualTo) assert(scanRange2.isUpperBoundEqualTo) } @@ -476,7 +464,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { /** * A complex query with two complex ranges that does merge into one */ - ignore("Test two complete range merge rowKey query") { + test("Test two complete range merge rowKey query") { val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " + "WHERE " + "( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" + @@ -502,7 +490,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging { val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes("get3"))) assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes("get5"))) - // assert(!scanRange2.isLowerBoundEqualTo) + assert(!scanRange2.isLowerBoundEqualTo) assert(scanRange2.isUpperBoundEqualTo) } diff --git hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala index 49e2f6c..5a4b230 100644 --- hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala +++ hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala @@ -33,7 +33,7 @@ class HBaseCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndA |"table":{"namespace":"default", "name":"htable"}, |"rowkey":"key1:key2", |"columns":{ - |"col1":{"cf":"rowkey", "col":"key1", "type":"string"}, + |"col1":{"cf":"rowkey", "col":"key1", "type":"string", "length":"5"}, |"col2":{"cf":"rowkey", "col":"key2", "type":"double"}, |"col3":{"cf":"cf1", "col":"col2", "type":"binary"}, |"col4":{"cf":"cf1", "col":"col3", "type":"timestamp"},