diff --git hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/ColumnSpec.java hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/ColumnSpec.java new file mode 100644 index 0000000..cd7291e --- /dev/null +++ hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/ColumnSpec.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Used by SparkSQLPushDownFilter.currentCellToColumnIndexMap to maintain + * the information for a column with start and length in an HBase cell, used by + * composite key. Because to support composite key, one cell may consists of multiple columns + * of Spark table. + */ +public class ColumnSpec { + public int start; + public int length; + // the column name of the spark table. + public String name; + + public ColumnSpec(int s, int l, String n) { + start = s; + length = l; + name = n; + } + + /** + * Used in Spark Driver to serialize to byte array to be shipped to region server. + * @return bytes[] + */ + public byte[] toBytes() { + byte[] nb = Bytes.toBytes(name); + byte[] b = new byte[Bytes.SIZEOF_INT * 2 + nb.length]; + System.arraycopy(Bytes.toBytes(start), 0, b, 0, Bytes.SIZEOF_INT); + System.arraycopy(Bytes.toBytes(length), 0, b, Bytes.SIZEOF_INT, Bytes.SIZEOF_INT); + System.arraycopy(nb, 0, b, Bytes.SIZEOF_INT * 2, nb.length); + return b; + } + + /** + * Used inside of region server to deserialize the ColumnSpec from byte array. + * @param b: serialized byte array. + * @return deserialized ColumnSpec + */ + public static ColumnSpec fromBytes(byte[] b) { + int s = Bytes.toInt(b, 0); + int l = Bytes.toInt(b, Bytes.SIZEOF_INT); + String n = Bytes.toString(b, Bytes.SIZEOF_INT * 2); + ColumnSpec cs = new ColumnSpec(s, l, n); + return cs; + } +} diff --git hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java index 071c1ca..ab85129 100644 --- hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java +++ hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java @@ -37,6 +37,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** * This filter will push down all qualifier logic given to us @@ -49,21 +51,22 @@ public class SparkSQLPushDownFilter extends FilterBase{ //The following values are populated with protobuffer DynamicLogicExpression dynamicLogicExpression; byte[][] valueFromQueryArray; - HashMap> - currentCellToColumnIndexMap; + // This is the map for Map[Columnfamilty, Map[ColumnQualifier, ColumnSpec]] + // ColumnSpec is the format of (String, start, offset) to support composite key + HashMap> + currentCellToColumnIndexMap; //The following values are transient HashMap columnToCurrentRowValueMap = null; static final byte[] rowKeyFamily = new byte[0]; - static final byte[] rowKeyQualifier = Bytes.toBytes("key"); String encoderClassName; public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression, byte[][] valueFromQueryArray, HashMap> + HashMap> currentCellToColumnIndexMap, String encoderClassName) { this.dynamicLogicExpression = dynamicLogicExpression; this.valueFromQueryArray = valueFromQueryArray; @@ -89,7 +92,7 @@ public class SparkSQLPushDownFilter extends FilterBase{ ByteArrayComparable familyByteComparable = new ByteArrayComparable(cfBytes, 0, cfBytes.length); - HashMap qualifierIndexMap = + HashMap qualifierIndexMap = currentCellToColumnIndexMap.get(familyByteComparable); if (qualifierIndexMap == null) { @@ -100,7 +103,7 @@ public class SparkSQLPushDownFilter extends FilterBase{ ByteArrayComparable qualifierByteComparable = new ByteArrayComparable(qBytes, 0, qBytes.length); - qualifierIndexMap.put(qualifierByteComparable, field.colName()); + qualifierIndexMap.put(qualifierByteComparable, field.columnSpec().toBytes()); } } @@ -111,20 +114,24 @@ public class SparkSQLPushDownFilter extends FilterBase{ // the row key if (columnToCurrentRowValueMap == null) { columnToCurrentRowValueMap = new HashMap<>(); - HashMap qualifierColumnMap = + HashMap qualifierColumnMap = currentCellToColumnIndexMap.get( new ByteArrayComparable(rowKeyFamily, 0, rowKeyFamily.length)); if (qualifierColumnMap != null) { - String rowKeyColumnName = - qualifierColumnMap.get( - new ByteArrayComparable(rowKeyQualifier, 0, - rowKeyQualifier.length)); - //Make sure that the rowKey is part of the where clause - if (rowKeyColumnName != null) { - columnToCurrentRowValueMap.put(rowKeyColumnName, - new ByteArrayComparable(c.getRowArray(), - c.getRowOffset(), c.getRowLength())); + Set> entries = qualifierColumnMap.entrySet(); + // We should get a sequence of rowKeyColumnName (name, start, length) + // and fill all the columns belong to row key. + for (Entry entry: entries) { + ColumnSpec cs = ColumnSpec.fromBytes(entry.getValue()); + int length = cs.length; + // If we do not know the length, assume to extend to the end. + if (length == -1) { + length = c.getRowLength() - cs.start; + } + columnToCurrentRowValueMap.put(cs.name, + new ByteArrayComparable(c.getRowArray(), + c.getRowOffset() + cs.start, length)); } } } @@ -135,17 +142,18 @@ public class SparkSQLPushDownFilter extends FilterBase{ c.getFamilyOffset(), c.getFamilyLength()); - HashMap qualifierColumnMap = + HashMap qualifierColumnMap = currentCellToColumnIndexMap.get( currentFamilyByteComparable); if (qualifierColumnMap != null) { - String columnName = + byte[] columnSpec = qualifierColumnMap.get( new ByteArrayComparable(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength())); + String columnName = ColumnSpec.fromBytes(columnSpec).name; if (columnName != null) { columnToCurrentRowValueMap.put(columnName, @@ -205,7 +213,7 @@ public class SparkSQLPushDownFilter extends FilterBase{ } //Load mapping from HBase family/qualifier to Spark SQL columnName - HashMap> + HashMap> currentCellToColumnIndexMap = new HashMap<>(); for (FilterProtos.SQLPredicatePushDownCellToColumnMapping @@ -216,7 +224,7 @@ public class SparkSQLPushDownFilter extends FilterBase{ sqlPredicatePushDownCellToColumnMapping.getColumnFamily().toByteArray(); ByteArrayComparable familyByteComparable = new ByteArrayComparable(familyArray, 0, familyArray.length); - HashMap qualifierMap = + HashMap qualifierMap = currentCellToColumnIndexMap.get(familyByteComparable); if (qualifierMap == null) { @@ -230,7 +238,7 @@ public class SparkSQLPushDownFilter extends FilterBase{ new ByteArrayComparable(qualifierArray, 0 ,qualifierArray.length); qualifierMap.put(qualifierByteComparable, - sqlPredicatePushDownCellToColumnMapping.getColumnName()); + sqlPredicatePushDownCellToColumnMapping.getColumnSpec().toByteArray()); } return new SparkSQLPushDownFilter(dynamicLogicExpression, @@ -253,15 +261,15 @@ public class SparkSQLPushDownFilter extends FilterBase{ builder.addValueFromQueryArray(ByteStringer.wrap(valueFromQuery)); } - for (Map.Entry> + for (Entry> familyEntry : currentCellToColumnIndexMap.entrySet()) { - for (Map.Entry qualifierEntry : + for (Entry qualifierEntry : familyEntry.getValue().entrySet()) { columnMappingBuilder.setColumnFamily( ByteStringer.wrap(familyEntry.getKey().bytes())); columnMappingBuilder.setQualifier( ByteStringer.wrap(qualifierEntry.getKey().bytes())); - columnMappingBuilder.setColumnName(qualifierEntry.getValue()); + columnMappingBuilder.setColumnSpec(ByteStringer.wrap(qualifierEntry.getValue())); builder.addCellToColumnMapping(columnMappingBuilder.build()); } } diff --git hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java index cbef134..6947c62 100644 --- hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java +++ hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/protobuf/generated/FilterProtos.java @@ -31,20 +31,15 @@ public final class FilterProtos { */ com.google.protobuf.ByteString getQualifier(); - // required string column_name = 3; + // required bytes column_spec = 3; /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - boolean hasColumnName(); + boolean hasColumnSpec(); /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - java.lang.String getColumnName(); - /** - * required string column_name = 3; - */ - com.google.protobuf.ByteString - getColumnNameBytes(); + com.google.protobuf.ByteString getColumnSpec(); } /** * Protobuf type {@code hbase.pb.SQLPredicatePushDownCellToColumnMapping} @@ -109,7 +104,7 @@ public final class FilterProtos { } case 26: { bitField0_ |= 0x00000004; - columnName_ = input.readBytes(); + columnSpec_ = input.readBytes(); break; } } @@ -184,53 +179,26 @@ public final class FilterProtos { return qualifier_; } - // required string column_name = 3; - public static final int COLUMN_NAME_FIELD_NUMBER = 3; - private java.lang.Object columnName_; + // required bytes column_spec = 3; + public static final int COLUMN_SPEC_FIELD_NUMBER = 3; + private com.google.protobuf.ByteString columnSpec_; /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public boolean hasColumnName() { + public boolean hasColumnSpec() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public java.lang.String getColumnName() { - java.lang.Object ref = columnName_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - if (bs.isValidUtf8()) { - columnName_ = s; - } - return s; - } - } - /** - * required string column_name = 3; - */ - public com.google.protobuf.ByteString - getColumnNameBytes() { - java.lang.Object ref = columnName_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - columnName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } + public com.google.protobuf.ByteString getColumnSpec() { + return columnSpec_; } private void initFields() { columnFamily_ = com.google.protobuf.ByteString.EMPTY; qualifier_ = com.google.protobuf.ByteString.EMPTY; - columnName_ = ""; + columnSpec_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -245,7 +213,7 @@ public final class FilterProtos { memoizedIsInitialized = 0; return false; } - if (!hasColumnName()) { + if (!hasColumnSpec()) { memoizedIsInitialized = 0; return false; } @@ -263,7 +231,7 @@ public final class FilterProtos { output.writeBytes(2, qualifier_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeBytes(3, getColumnNameBytes()); + output.writeBytes(3, columnSpec_); } getUnknownFields().writeTo(output); } @@ -284,7 +252,7 @@ public final class FilterProtos { } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(3, getColumnNameBytes()); + .computeBytesSize(3, columnSpec_); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -319,10 +287,10 @@ public final class FilterProtos { result = result && getQualifier() .equals(other.getQualifier()); } - result = result && (hasColumnName() == other.hasColumnName()); - if (hasColumnName()) { - result = result && getColumnName() - .equals(other.getColumnName()); + result = result && (hasColumnSpec() == other.hasColumnSpec()); + if (hasColumnSpec()) { + result = result && getColumnSpec() + .equals(other.getColumnSpec()); } result = result && getUnknownFields().equals(other.getUnknownFields()); @@ -345,9 +313,9 @@ public final class FilterProtos { hash = (37 * hash) + QUALIFIER_FIELD_NUMBER; hash = (53 * hash) + getQualifier().hashCode(); } - if (hasColumnName()) { - hash = (37 * hash) + COLUMN_NAME_FIELD_NUMBER; - hash = (53 * hash) + getColumnName().hashCode(); + if (hasColumnSpec()) { + hash = (37 * hash) + COLUMN_SPEC_FIELD_NUMBER; + hash = (53 * hash) + getColumnSpec().hashCode(); } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; @@ -462,7 +430,7 @@ public final class FilterProtos { bitField0_ = (bitField0_ & ~0x00000001); qualifier_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000002); - columnName_ = ""; + columnSpec_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -503,7 +471,7 @@ public final class FilterProtos { if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.columnName_ = columnName_; + result.columnSpec_ = columnSpec_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -526,10 +494,8 @@ public final class FilterProtos { if (other.hasQualifier()) { setQualifier(other.getQualifier()); } - if (other.hasColumnName()) { - bitField0_ |= 0x00000004; - columnName_ = other.columnName_; - onChanged(); + if (other.hasColumnSpec()) { + setColumnSpec(other.getColumnSpec()); } this.mergeUnknownFields(other.getUnknownFields()); return this; @@ -544,7 +510,7 @@ public final class FilterProtos { return false; } - if (!hasColumnName()) { + if (!hasColumnSpec()) { return false; } @@ -642,76 +608,38 @@ public final class FilterProtos { return this; } - // required string column_name = 3; - private java.lang.Object columnName_ = ""; + // required bytes column_spec = 3; + private com.google.protobuf.ByteString columnSpec_ = com.google.protobuf.ByteString.EMPTY; /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public boolean hasColumnName() { + public boolean hasColumnSpec() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public java.lang.String getColumnName() { - java.lang.Object ref = columnName_; - if (!(ref instanceof java.lang.String)) { - java.lang.String s = ((com.google.protobuf.ByteString) ref) - .toStringUtf8(); - columnName_ = s; - return s; - } else { - return (java.lang.String) ref; - } - } - /** - * required string column_name = 3; - */ - public com.google.protobuf.ByteString - getColumnNameBytes() { - java.lang.Object ref = columnName_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - columnName_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } + public com.google.protobuf.ByteString getColumnSpec() { + return columnSpec_; } /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public Builder setColumnName( - java.lang.String value) { + public Builder setColumnSpec(com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; - columnName_ = value; + columnSpec_ = value; onChanged(); return this; } /** - * required string column_name = 3; + * required bytes column_spec = 3; */ - public Builder clearColumnName() { + public Builder clearColumnSpec() { bitField0_ = (bitField0_ & ~0x00000004); - columnName_ = getDefaultInstance().getColumnName(); - onChanged(); - return this; - } - /** - * required string column_name = 3; - */ - public Builder setColumnNameBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000004; - columnName_ = value; + columnSpec_ = getDefaultInstance().getColumnSpec(); onChanged(); return this; } @@ -1967,7 +1895,7 @@ public final class FilterProtos { "\n\014Filter.proto\022\010hbase.pb\"h\n\'SQLPredicate" + "PushDownCellToColumnMapping\022\025\n\rcolumn_fa" + "mily\030\001 \002(\014\022\021\n\tqualifier\030\002 \002(\014\022\023\n\013column_" + - "name\030\003 \002(\t\"\313\001\n\032SQLPredicatePushDownFilte" + + "spec\030\003 \002(\014\"\313\001\n\032SQLPredicatePushDownFilte" + "r\022 \n\030dynamic_logic_expression\030\001 \002(\t\022\036\n\026v" + "alue_from_query_array\030\002 \003(\014\022Q\n\026cell_to_c" + "olumn_mapping\030\003 \003(\01321.hbase.pb.SQLPredic" + @@ -1986,7 +1914,7 @@ public final class FilterProtos { internal_static_hbase_pb_SQLPredicatePushDownCellToColumnMapping_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_SQLPredicatePushDownCellToColumnMapping_descriptor, - new java.lang.String[] { "ColumnFamily", "Qualifier", "ColumnName", }); + new java.lang.String[] { "ColumnFamily", "Qualifier", "ColumnSpec", }); internal_static_hbase_pb_SQLPredicatePushDownFilter_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_hbase_pb_SQLPredicatePushDownFilter_fieldAccessorTable = new diff --git hbase-spark/src/main/protobuf/Filter.proto hbase-spark/src/main/protobuf/Filter.proto index d17b48c..d9d96b6 100644 --- hbase-spark/src/main/protobuf/Filter.proto +++ hbase-spark/src/main/protobuf/Filter.proto @@ -28,7 +28,7 @@ option optimize_for = SPEED; message SQLPredicatePushDownCellToColumnMapping { required bytes column_family = 1; required bytes qualifier = 2; - required string column_name = 3; + required bytes column_spec = 3; } message SQLPredicatePushDownFilter { diff --git hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala index fce92fb..7887fac 100644 --- hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala +++ hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/ByteArrayComparable.scala @@ -25,7 +25,7 @@ class ByteArrayComparable(val bytes:Array[Byte], val offset:Int = 0, var length: extends Comparable[ByteArrayComparable] { if (length == -1) { - length = bytes.length + length = bytes.length - offset } override def compareTo(o: ByteArrayComparable): Int = { diff --git hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala index 0c29f50..e3cbe23 100644 --- hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala +++ hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala @@ -138,7 +138,23 @@ case class HBaseRelation ( */ override val schema: StructType = userSpecifiedSchema.getOrElse(catalog.toDataType) + def rows = catalog.row + def getField(name: String): Field = { + catalog.getField(name) + } + // check whether the column is the first key in the rowkey + def isPrimaryKey(c: String): Boolean = { + val f1 = catalog.getRowKey(0) + val f2 = getField(c) + f1 == f2 + } + def isComposite(): Boolean = { + catalog.getRowKey.size > 1 + } + def isColumn(c: String): Boolean = { + !catalog.getRowKey.map(_.colName).contains(c) + } def createTable() { val numReg = parameters.get(HBaseTableCatalog.newTable).map(x => x.toInt).getOrElse(0) @@ -244,20 +260,9 @@ case class HBaseRelation ( // Return the new index and appended value (idx + field.length, parsed ++ Seq((field, value))) } else { - field.dt match { - case StringType => - val pos = row.indexOf(HBaseTableCatalog.delimiter, idx) - if (pos == -1 || pos > row.length) { - // this is at the last dimension - val value = Utils.hbaseFieldToScalaType(field, row, idx, row.length) - (row.length + 1, parsed ++ Seq((field, value))) - } else { - val value = Utils.hbaseFieldToScalaType(field, row, idx, pos - idx) - (pos, parsed ++ Seq((field, value))) - } - // We don't know the length, assume it extends to the end of the rowkey. - case _ => (row.length + 1, parsed ++ Seq((field, Utils.hbaseFieldToScalaType(field, row, idx, row.length)))) - } + // This is the last dimension. + val value = Utils.hbaseFieldToScalaType(field, row, idx, row.length - idx) + (row.length + 1, parsed ++ Seq((field, value))) } })._2.toMap } @@ -395,7 +400,6 @@ case class HBaseRelation ( }) val queryValueArray = queryValueList.toArray - if (superRowKeyFilter == null) { superRowKeyFilter = new RowKeyFilter } @@ -403,6 +407,23 @@ case class HBaseRelation ( (superRowKeyFilter, superDynamicLogicExpression, queryValueArray) } + // convert the point to the range if it is composite key + private def padding(value: Array[Byte], length: Int): Array[Byte] = { + if (!isComposite || value == null || value.length == length ) { + value + } else { + val end = Array.fill(length)(-1: Byte) + System.arraycopy(value, 0, end, 0, value.length) + end + } + } + // convert the upper bound to extend to the whole rowkey length + private def convertUpperBound(value: Array[Byte], length: Int): Array[Byte] = { + val upper = Array.fill(length)(-1: Byte) + System.arraycopy(value, 0, upper, 0, value.length) + upper + } + /** * For some codec, the order may be inconsistent between java primitive * type and its byte array. We may have to split the predicates on some @@ -421,15 +442,19 @@ case class HBaseRelation ( filter match { case EqualTo(attr, value) => val field = catalog.getField(attr) + val bytes = Utils.toBytes(value, field) if (field != null) { - if (field.isRowKey) { - parentRowKeyFilter.mergeIntersect(new RowKeyFilter( - DefaultSourceStaticUtils.getByteValue(field, - value.toString), null)) + if (isPrimaryKey(attr)) { + if (!isComposite()) { + parentRowKeyFilter.mergeIntersect(new RowKeyFilter( + bytes, null)) + } else { + // we have to convert the point to range in case of composite key + parentRowKeyFilter.mergeIntersect(new RowKeyFilter(null, + new ScanRange(padding(bytes, rows.length), true, bytes, true))) + } } - val byteValue = - DefaultSourceStaticUtils.getByteValue(field, value.toString) - valueArray += byteValue + valueArray += bytes } new EqualLogicExpression(attr, valueArray.length - 1, false) @@ -448,12 +473,16 @@ case class HBaseRelation ( case LessThan(attr, value) => val field = catalog.getField(attr) if (field != null) { - if (field.isRowKey) { + if (isPrimaryKey(attr)) { val b = encoder.ranges(value) var inc = false b.map(_.less.map { x => val r = new RowKeyFilter(null, - new ScanRange(x.upper, inc, x.low, true) + new ScanRange(if (inc) { + padding(x.upper, rows.length) + } else { + x.upper + }, inc, x.low, true) ) inc = true r @@ -470,12 +499,12 @@ case class HBaseRelation ( case GreaterThan(attr, value) => val field = catalog.getField(attr) if (field != null) { - if (field.isRowKey) { + if (isPrimaryKey(attr)) { val b = encoder.ranges(value) var inc = false - b.map(_.greater.map{x => + b.map(_.greater.map { x => val r = new RowKeyFilter(null, - new ScanRange(x.upper, true, x.low, inc)) + new ScanRange(padding(x.upper, rows.length), true, x.low, inc)) inc = true r }).map { x => @@ -491,11 +520,11 @@ case class HBaseRelation ( case LessThanOrEqual(attr, value) => val field = catalog.getField(attr) if (field != null) { - if (field.isRowKey) { + if (isPrimaryKey(attr)) { val b = encoder.ranges(value) b.map(_.less.map(x => new RowKeyFilter(null, - new ScanRange(x.upper, true, x.low, true)))) + new ScanRange(padding(x.upper, rows.length), true, x.low, true)))) .map { x => x.reduce{ (i, j) => i.mergeUnion(j) @@ -509,11 +538,11 @@ case class HBaseRelation ( case GreaterThanOrEqual(attr, value) => val field = catalog.getField(attr) if (field != null) { - if (field.isRowKey) { + if (isPrimaryKey(attr)) { val b = encoder.ranges(value) b.map(_.greater.map(x => new RowKeyFilter(null, - new ScanRange(x.upper, true, x.low, true)))) + new ScanRange(padding(x.upper, rows.length), true, x.low, true)))) .map { x => x.reduce { (i, j) => i.mergeUnion(j) @@ -590,7 +619,7 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean, * * @param other Other scan object */ - def mergeUnion(other:ScanRange): Unit = { + def mergeUnion(other:ScanRange): ScanRange = { val upperBoundCompare = compareRange(upperBound, other.upperBound) val lowerBoundCompare = compareRange(lowerBound, other.lowerBound) @@ -605,6 +634,7 @@ class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean, isUpperBoundEqualTo = if (upperBoundCompare == 0) isUpperBoundEqualTo || other.isUpperBoundEqualTo else if (upperBoundCompare < 0) other.isUpperBoundEqualTo else isUpperBoundEqualTo + this } /** @@ -1112,13 +1142,24 @@ class RowKeyFilter (currentPoint:Array[Byte] = null, other.points.foreach( p => points += p) other.ranges.foreach( otherR => { + // we may change it in the mergeUnion below. + var newR = otherR var doesOverLap = false + // This is the ranges after otherR union with existing ranges + // It is used to keep the changed ranges after each iteration. + var newRanges = mutable.MutableList[ScanRange]() ranges.foreach{ r => - if (r.getOverLapScanRange(otherR) != null) { - r.mergeUnion(otherR) - doesOverLap = true - }} - if (!doesOverLap) ranges.+=(otherR) + // If there is overlap, we update newR to be compared + // and do not add them to new ranges. We will not lose it + // since newR has it covered. Otherwise, we may have duplicate range + if (r.getOverLapScanRange(newR) != null) { + newR = r.mergeUnion(newR) + } else { + newRanges :+= r + } + } + newRanges :+= newR + ranges = newRanges }) this } @@ -1192,7 +1233,7 @@ class RowKeyFilter (currentPoint:Array[Byte] = null, ranges.foreach( r => { if (isFirst) isFirst = false else strBuilder.append(",") - strBuilder.append(r) + strBuilder.append(r.toString) }) strBuilder.append("))") strBuilder.toString() diff --git hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala index 1a1d478..0a95a05 100644 --- hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala +++ hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpression.scala @@ -113,7 +113,6 @@ class EqualLogicExpression (val columnName:String, valueFromQueryValueArray:Array[Array[Byte]]): Boolean = { val currentRowValue = columnToCurrentRowValueMap.get(columnName) val valueFromQuery = valueFromQueryValueArray(valueFromQueryIndex) - currentRowValue != null && Bytes.equals(valueFromQuery, 0, valueFromQuery.length, currentRowValue.bytes, diff --git hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala index ce7b55a..0a672f0 100644 --- hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala +++ hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/package.scala @@ -27,6 +27,7 @@ package object hbase { def bytesMax = null val ByteMax = -1.asInstanceOf[Byte] val ByteMin = 0.asInstanceOf[Byte] + val MaxLength = 256 val ord: Ordering[HBaseType] = new Ordering[HBaseType] { def compare(x: Array[Byte], y: Array[Byte]): Int = { return Bytes.compareTo(x, y) diff --git hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala index c2d611f..959ba5a 100644 --- hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala +++ hbase-spark/src/main/scala/org/apache/spark/sql/datasources/hbase/HBaseTableCatalog.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.datasources.hbase import org.apache.avro.Schema -import org.apache.hadoop.hbase.spark.SchemaConverters +import org.apache.hadoop.hbase.spark.{ColumnSpec, SchemaConverters} import org.apache.hadoop.hbase.spark.datasources._ import org.apache.hadoop.hbase.spark.hbase._ import org.apache.hadoop.hbase.util.Bytes @@ -39,7 +39,7 @@ case class Field( sType: Option[String] = None, avroSchema: Option[String] = None, serdes: Option[SerDes]= None, - len: Int = -1) extends Logging { + private val len: Int = -1) extends Logging { override def toString = s"$colName $cf $col" val isRowKey = cf == HBaseTableCatalog.rowKey var start: Int = _ @@ -69,11 +69,7 @@ case class Field( } } def colBytes: Array[Byte] = { - if (isRowKey) { - Bytes.toBytes("key") - } else { - Bytes.toBytes(col) - } + Bytes.toBytes(col) } val dt = { @@ -108,6 +104,8 @@ case class Field( colName == that.colName && cf == that.cf && col == that.col case _ => false } + + def columnSpec = new ColumnSpec(start, length, colName) } // The row key definition, with each key refer to the col defined in Field, e.g., @@ -117,12 +115,13 @@ case class RowKey(k: String) { var fields: Seq[Field] = _ var varLength = false def length = { - if (varLength) { - -1 - } else { - fields.foldLeft(0){case (x, y) => - x + y.length + fields.foldLeft(0) { case (x, y) => + val yLen = if (y.length == -1) { + MaxLength + } else { + y.length } + x + yLen } } } @@ -155,49 +154,20 @@ case class HBaseTableCatalog( def get(key: String) = params.get(key) - // Setup the start and length for each dimension of row key at runtime. - def dynSetupRowKey(rowKey: Array[Byte]) { - logDebug(s"length: ${rowKey.length}") - if(row.varLength) { - var start = 0 - row.fields.foreach { f => - logDebug(s"start: $start") - f.start = start - f.length = { - // If the length is not defined - if (f.length == -1) { - f.dt match { - case StringType => - var pos = rowKey.indexOf(HBaseTableCatalog.delimiter, start) - if (pos == -1 || pos > rowKey.length) { - // this is at the last dimension - pos = rowKey.length - } - pos - start - // We don't know the length, assume it extend to the end of the rowkey. - case _ => rowKey.length - start - } - } else { - f.length - } - } - start += f.length - } - } - } - + // Setup the start and length for each dimension of row key def initRowKey = { val fields = sMap.fields.filter(_.cf == HBaseTableCatalog.rowKey) row.fields = row.keys.flatMap(n => fields.find(_.col == n)) - // The length is determined at run time if it is string or binary and the length is undefined. - if (row.fields.filter(_.length == -1).isEmpty) { + // We only allowed there is one key at the end that is determined at runtime. + if (row.fields.reverse.tail.filter(_.length == -1).isEmpty) { var start = 0 row.fields.foreach { f => f.start = start start += f.length } } else { - row.varLength = true + throw new Exception("Only the last dimension of " + + "RowKey is allowed to have varied length") } } initRowKey @@ -232,11 +202,28 @@ object HBaseTableCatalog { val length = "length" /** - * User provide table schema definition - * {"tablename":"name", "rowkey":"key1:key2", - * "columns":{"col1":{"cf":"cf1", "col":"col1", "type":"type1"}, - * "col2":{"cf":"cf2", "col":"col2", "type":"type2"}}} - * Note that any col in the rowKey, there has to be one corresponding col defined in columns + * + * params is a mapping with key as "catalog" and value as json defined the schema mapping between + * hbase and spark. Following is an exmaple. + * Note that + * 1. any col in the rowKey, there has to be one corresponding col defined in columns with cf as "rowkey" + * 2. for composite rowkey, different dimensions are separate by ":" + * 3. in the column definition mapping, the key is the name of the spark table column name, with column family + * and column qualifier defined. + * s"""{ + * |"table":{"namespace":"default", "name":"htable"}, + * |"rowkey":"key1:key2", + * |"columns":{ + * |"col1":{"cf":"rowkey", "col":"key1", "type":"string", "length":"6"}, + * |"col2":{"cf":"rowkey", "col":"key2", "type":"double"}, + * |"col3":{"cf":"cf1", "col":"col2", "type":"binary"}, + * |"col4":{"cf":"cf1", "col":"col3", "type":"timestamp"}, + * |"col5":{"cf":"cf1", "col":"col4", "type":"double", "serdes":"${classOf[DoubleSerDes].getName}"}, + * |"col6":{"cf":"cf1", "col":"col5", "type":"$map"}, + * |"col7":{"cf":"cf1", "col":"col6", "type":"$array"}, + * |"col8":{"cf":"cf1", "col":"col7", "type":"$arrayMap"} + * |} + * */ def apply(params: Map[String, String]): HBaseTableCatalog = { val parameters = convert(params) diff --git hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/CompositeKeySuite.scala hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/CompositeKeySuite.scala new file mode 100644 index 0000000..c495136 --- /dev/null +++ hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/CompositeKeySuite.scala @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.spark + +import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf +import org.apache.hadoop.hbase.{TableName, HBaseTestingUtility} +import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} + +case class HBaseCompositeRecord( + stringCol00: String, + intCol01: Int, + booleanCol1: Boolean, + doubleCol2: Double, + floatCol3: Float, + intCol4: Int, + longCol5: Long, + shortCol6: Short, + stringCol7: String, + byteCol8: Byte) + +object HBaseCompositeRecord { + def apply(i: Int): HBaseCompositeRecord = { + HBaseCompositeRecord(s"row${"%03d".format(i)}", + if (i % 2 == 0) { + i + } else { + -i + }, + i % 2 == 0, + i.toDouble, + i.toFloat, + i, + i.toLong, + i.toShort, + s"String$i extra", + i.toByte) + } +} + +case class HBaseCompositeRecordInt( + intCol00: Int, + stringCol01: String, + booleanCol1: Boolean, + doubleCol2: Double, + floatCol3: Float, + intCol4: Int, + longCol5: Long, + shortCol6: Short, + stringCol7: String, + byteCol8: Byte) + +object HBaseCompositeRecordInt { + def apply(i: Int): HBaseCompositeRecordInt = { + HBaseCompositeRecordInt( + if (i % 2 == 0) { + i + } else { + -i + }, + s"row${"%03d".format(i)}", + i % 2 == 0, + i.toDouble, + i.toFloat, + i, + i.toLong, + i.toShort, + s"String$i extra", + i.toByte) + } +} + +case class HBaseCompositeRecordBool( + booleanCol00: Boolean, + stringCol01: String, + doubleCol2: Double, + floatCol3: Float, + intCol4: Int, + longCol5: Long, + shortCol6: Short, + stringCol7: String, + byteCol8: Byte) + +object HBaseCompositeRecordBool { + def apply(i: Int): HBaseCompositeRecordBool = { + HBaseCompositeRecordBool( + i % 2 == 0, + s"row${"%03d".format(i)}", + i.toDouble, + i.toFloat, + i, + i.toLong, + i.toShort, + s"String$i extra", + i.toByte) + } +} + +class CompositeKeySuite extends FunSuite with + BeforeAndAfterEach with BeforeAndAfterAll with Logging { + @transient var sc: SparkContext = null + var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility + + var sqlContext: SQLContext = null + var df: DataFrame = null + // The original raw data used for construct result set without going through + // data frame logic. It is used to verify the result set retrieved from data frame logic. + val rawResult = (0 to 255).map { i => + HBaseCompositeRecord(i) + } + val rawResultInt = (0 to 255).map { i => + HBaseCompositeRecordInt(i) + } + + val rawResultBool = (0 to 255).map { i => + HBaseCompositeRecordBool(i) + } + + def collectToSet(df: DataFrame): Set[(String, Int, Boolean)] = { + df.collect() + .map(x => (x.getAs[String](0), x.getAs[Int](1), x.getAs[Boolean](2))).toSet + } + + + def withCatalog(cat: String): DataFrame = { + sqlContext + .read + .options(Map(HBaseTableCatalog.tableCatalog -> cat)) + .format("org.apache.hadoop.hbase.spark") + .load() + } + + override def beforeAll() { + + TEST_UTIL.startMiniCluster + val sparkConf = new SparkConf + sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true") + sparkConf.set(HBaseSparkConf.BATCH_NUM, "100") + sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100") + + sc = new SparkContext("local", "test", sparkConf) + new HBaseContext(sc, TEST_UTIL.getConfiguration) + sqlContext = new SQLContext(sc) + } + + override def afterAll() { + logInfo("shuting down minicluster") + TEST_UTIL.shutdownMiniCluster() + + sc.stop() + } + + override def beforeEach(): Unit = { + DefaultSourceStaticUtils.lastFiveExecutionRules.clear() + } + + def catalog = s"""{ + |"table":{"namespace":"default", "name":"table1"}, + |"rowkey":"key1:key2", + |"columns":{ + |"stringCol00":{"cf":"rowkey", "col":"key1", "type":"string", "length":"6"}, + |"intCol01":{"cf":"rowkey", "col":"key2", "type":"int"}, + |"booleanCol1":{"cf":"cf1", "col":"booleanCol1", "type":"boolean"}, + |"doubleCol2":{"cf":"cf2", "col":"doubleCol2", "type":"double"}, + |"floatCol3":{"cf":"cf3", "col":"floatCol3", "type":"float"}, + |"intCol4":{"cf":"cf4", "col":"intCol4", "type":"int"}, + |"longCol5":{"cf":"cf5", "col":"longCol5", "type":"bigint"}, + |"shortCol6":{"cf":"cf6", "col":"shortCol6", "type":"smallint"}, + |"stringCol7":{"cf":"cf7", "col":"stringCol7", "type":"string"}, + |"byteCol8":{"cf":"cf8", "col":"byteCol8", "type":"tinyint"} + |} + |}""".stripMargin + + def catalogInt = s"""{ + |"table":{"namespace":"default", "name":"intTable1"}, + |"rowkey":"key1:key2", + |"columns":{ + |"intCol00":{"cf":"rowkey", "col":"key1", "type":"int"}, + |"stringCol01":{"cf":"rowkey", "col":"key2", "type":"string"}, + |"booleanCol1":{"cf":"cf1", "col":"booleanCol1", "type":"boolean"}, + |"doubleCol2":{"cf":"cf2", "col":"doubleCol2", "type":"double"}, + |"floatCol3":{"cf":"cf3", "col":"floatCol3", "type":"float"}, + |"intCol4":{"cf":"cf4", "col":"intCol4", "type":"int"}, + |"longCol5":{"cf":"cf5", "col":"longCol5", "type":"bigint"}, + |"shortCol6":{"cf":"cf6", "col":"shortCol6", "type":"smallint"}, + |"stringCol7":{"cf":"cf7", "col":"stringCol7", "type":"string"}, + |"byteCol8":{"cf":"cf8", "col":"byteCol8", "type":"tinyint"} + |} + |}""".stripMargin + + def catalogBool = s"""{ + |"table":{"namespace":"default", "name":"boolTable1"}, + |"rowkey":"key1:key2", + |"columns":{ + |"booleanCol00":{"cf":"rowkey", "col":"key1", "type":"boolean"}, + |"stringCol01":{"cf":"rowkey", "col":"key2", "type":"string", "length":"6"}, + |"doubleCol2":{"cf":"cf2", "col":"doubleCol2", "type":"double"}, + |"floatCol3":{"cf":"cf3", "col":"floatCol3", "type":"float"}, + |"intCol4":{"cf":"cf4", "col":"intCol4", "type":"int"}, + |"longCol5":{"cf":"cf5", "col":"longCol5", "type":"bigint"}, + |"shortCol6":{"cf":"cf6", "col":"shortCol6", "type":"smallint"}, + |"stringCol7":{"cf":"cf7", "col":"stringCol7", "type":"string"}, + |"byteCol8":{"cf":"cf8", "col":"byteCol8", "type":"tinyint"} + |} + |}""".stripMargin + + + test("populate table with composite key") { + val sql = sqlContext + import sql.implicits._ + + sc.parallelize(rawResult).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) + .format("org.apache.hadoop.hbase.spark") + .save() + + sc.parallelize(rawResultInt).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> catalogInt, HBaseTableCatalog.newTable -> "5")) + .format("org.apache.hadoop.hbase.spark") + .save() + + sc.parallelize(rawResultBool).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> catalogBool, HBaseTableCatalog.newTable -> "5")) + .format("org.apache.hadoop.hbase.spark") + .save() + } + + test("full query") { + val df = withCatalog(catalog) + df.show + assert(df.count() == 256) + } + + test("filtered query1") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"stringCol00" <= "row050" && $"intCol01" > 40) + .select("stringCol00", "intCol01","booleanCol1") + s.show + + val expected = rawResult.filter { x => + x.stringCol00 <= "row050" && x.intCol01 > 40 + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + test("int filtered query1") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalogInt) + val s = df.filter($"stringCol01" <= "row050" && $"intCol00" > 40) + .select("stringCol01", "intCol00","booleanCol1") + s.show + + val expected = rawResultInt.filter { x => + x.stringCol01 <= "row050" && x.intCol00 > 40 + }.map(x => (x.stringCol01, x.intCol00, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + + test("bool filtered query1") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalogBool) + val s = df.filter($"stringCol01" <= "row050" && $"booleanCol00" === true) + .select("stringCol01", "intCol4","booleanCol00") + s.show + + val expected = rawResultBool.filter { x => + x.stringCol01 <= "row050" && x.booleanCol00 == true + }.map(x => (x.stringCol01, x.intCol4, x.booleanCol00)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + test("filtered query11") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"stringCol00" <= "row050" && $"intCol01" < -40) + .select("stringCol00", "intCol01","booleanCol1") + s.show + + val expected = rawResult.filter { x => + x.stringCol00 <= "row050" && x.intCol01 < -40 + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + test("int filtered query11") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalogInt) + val s = df.filter($"stringCol01" <= "row050" && $"intCol00" < -40) + .select("stringCol01", "intCol00","booleanCol1") + s.show + + val expected = rawResultInt.filter { x => + x.stringCol01 <= "row050" && x.intCol00 < -40 + }.map(x => (x.stringCol01, x.intCol00, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + + test("filtered query12") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"stringCol00" <= "row050" && $"intCol01" > -40) + .select("stringCol00", "intCol01","booleanCol1") + s.show + + val expected = rawResult.filter { x => + x.stringCol00 <= "row050" && x.intCol01 > -40 + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + test("int filtered query12") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalogInt) + val s = df.filter($"stringCol01" <= "row050" && $"intCol00" > -40) + .select("stringCol01", "intCol00","booleanCol1") + s.show + + val expected = rawResultInt.filter { x => + x.stringCol01 <= "row050" && x.intCol00 > -40 + }.map(x => (x.stringCol01, x.intCol00, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + test("filtered query2") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"stringCol00" <= "row050" && $"intCol01" >= 40) + .select("stringCol00", "intCol01","booleanCol1") + s.show + + val expected = rawResult.filter { x => + x.stringCol00 <= "row050" && x.intCol01 >= 40 + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + test("filtered query3") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"stringCol00" >= "row250" && $"intCol01" < 50) + .select("stringCol00", "intCol01","booleanCol1") + s.show + + val expected = rawResult.filter { x => + x.stringCol00 >= "row250" && x.intCol01 < 50 + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + test("filtered query4") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"stringCol00" <= "row010") // row005 not included + .select("stringCol00", "intCol01","booleanCol1") + s.show + + val expected = rawResult.filter { x => + x.stringCol00 <= "row010" + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + test("filtered query5") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"stringCol00" === "row010") // row005 not included + .select("stringCol00", "intCol01","booleanCol1") + s.show + + val expected = rawResult.filter { x => + x.stringCol00 === "row010" + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + test("filtered query51") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"stringCol00" === "row011") // row005 not included + .select("stringCol00", "intCol01","booleanCol1") + s.show + + val expected = rawResult.filter { x => + x.stringCol00 === "row011" + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + test("filtered query52") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter($"stringCol00" === "row005") // row005 not included + .select("stringCol00", "intCol01","booleanCol1") + s.show + + val expected = rawResult.filter { x => + x.stringCol00 === "row005" + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + + test("filtered query6") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter(($"stringCol00" <= "row050" && $"stringCol00" > "row040") || + $"stringCol00" === "row010" || // no included, since it is composite key + $"stringCol00" === "row020" || // not inlcuded + $"stringCol00" === "r20" || // not included + $"stringCol00" <= "row010") // row005 not included + .select("stringCol00", "intCol01","booleanCol1") + s.show(40) + + val expected = rawResult.filter { x => + (x.stringCol00 <= "row050" && x.stringCol00 > "row040") || + x.stringCol00 === "row010" || // no included, since it is composite key + x.stringCol00 === "row020" || // not inlcuded + x.stringCol00 === "r20" || // not included + x.stringCol00 <= "row010" + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + + } + + + test("filtered query7") { + val sql = sqlContext + import sql.implicits._ + val df = withCatalog(catalog) + val s = df.filter(($"stringCol00" <= "row050" && $"stringCol00" > "row040") || + $"stringCol00" === "row005" || // no included, since it is composite key + $"stringCol00" === "row020" || // not inlcuded + $"stringCol00" === "r20" || // not included + $"stringCol00" <= "row005") // row005 not included + .select("stringCol00", "intCol01","booleanCol1") + s.show(40) + + val expected = rawResult.filter { x => + (x.stringCol00 <= "row050" && x.stringCol00 > "row040") || + x.stringCol00 === "row005" || // no included, since it is composite key + x.stringCol00 === "row020" || // not inlcuded + x.stringCol00 === "r20" || // not included + x.stringCol00 <= "row005" // row005 not included + }.map(x => (x.stringCol00, x.intCol01, x.booleanCol1)).toSet + val result = collectToSet(s) + assert(expected === result) + } + +} diff --git hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala index 49e2f6c..5a4b230 100644 --- hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala +++ hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseCatalogSuite.scala @@ -33,7 +33,7 @@ class HBaseCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndA |"table":{"namespace":"default", "name":"htable"}, |"rowkey":"key1:key2", |"columns":{ - |"col1":{"cf":"rowkey", "col":"key1", "type":"string"}, + |"col1":{"cf":"rowkey", "col":"key1", "type":"string", "length":"5"}, |"col2":{"cf":"rowkey", "col":"key2", "type":"double"}, |"col3":{"cf":"cf1", "col":"col2", "type":"binary"}, |"col4":{"cf":"cf1", "col":"col3", "type":"timestamp"},