diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java index fbd1308..5cb3752 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import com.google.common.collect.Iterators; @@ -42,11 +44,17 @@ public class ColumnMappings implements Iterable { private final int keyIndex; + private final int timestampIndex; private final ColumnMapping[] columnsMapping; public ColumnMappings(List columnMapping, int keyIndex) { + this(columnMapping, keyIndex, -1); + } + + public ColumnMappings(List columnMapping, int keyIndex, int timestampIndex) { this.columnsMapping = columnMapping.toArray(new ColumnMapping[columnMapping.size()]); this.keyIndex = keyIndex; + this.timestampIndex = timestampIndex; } @Override @@ -109,7 +117,9 @@ void setHiveColumnDescription(String serdeName, // where key extends LazyPrimitive and thus has type Category.PRIMITIVE for (int i = 0; i < columnNames.size(); i++) { ColumnMapping colMap = columnsMapping[i]; - if (colMap.qualifierName == null && !colMap.hbaseRowKey) { + colMap.columnName = columnNames.get(i); + colMap.columnType = columnTypes.get(i); + if (colMap.qualifierName == null && !colMap.hbaseRowKey && !colMap.hbaseTimestamp) { TypeInfo typeInfo = columnTypes.get(i); if ((typeInfo.getCategory() != ObjectInspector.Category.MAP) || (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory() @@ -122,8 +132,14 @@ void setHiveColumnDescription(String serdeName, + typeInfo.getTypeName()); } } - colMap.columnName = columnNames.get(i); - colMap.columnType = columnTypes.get(i); + if (colMap.hbaseTimestamp) { + TypeInfo typeInfo = columnTypes.get(i); + if (!colMap.isCategory(PrimitiveCategory.TIMESTAMP) && + !colMap.isCategory(PrimitiveCategory.LONG)) { + throw new SerDeException(serdeName + ": timestamp columns should be of " + + "timestamp or bigint type, but is mapped to " + typeInfo.getTypeName()); + } + } } } @@ -299,10 +315,18 @@ public ColumnMapping getKeyMapping() { return columnsMapping[keyIndex]; } + public ColumnMapping getTimestampMapping() { + return timestampIndex < 0 ? null : columnsMapping[timestampIndex]; + } + public int getKeyIndex() { return keyIndex; } + public int getTimestampIndex() { + return timestampIndex; + } + public ColumnMapping[] getColumnsMapping() { return columnsMapping; } @@ -326,6 +350,7 @@ public int getKeyIndex() { byte[] qualifierNameBytes; List binaryStorage; boolean hbaseRowKey; + boolean hbaseTimestamp; String mappingSpec; String qualifierPrefix; byte[] qualifierPrefixBytes; @@ -377,5 +402,14 @@ public String getQualifierPrefix() { public boolean isCategory(ObjectInspector.Category category) { return columnType.getCategory() == category; } + + public boolean isCategory(PrimitiveCategory category) { + return columnType.getCategory() == ObjectInspector.Category.PRIMITIVE && + ((PrimitiveTypeInfo)columnType).getPrimitiveCategory() == category; + } + + public boolean isComparable() { + return binaryStorage.get(0) || isCategory(PrimitiveCategory.STRING); + } } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java index 12c5377..98bc73f 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java @@ -21,17 +21,18 @@ import java.io.IOException; import java.util.Properties; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; public class DefaultHBaseKeyFactory extends AbstractHBaseKeyFactory implements HBaseKeyFactory { - protected LazySimpleSerDe.SerDeParameters serdeParams; + protected SerDeParameters serdeParams; protected HBaseRowSerializer serializer; @Override @@ -56,4 +57,12 @@ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException public byte[] serializeKey(Object object, StructField field) throws IOException { return serializer.serializeKeyField(object, field, keyMapping); } + + @VisibleForTesting + static DefaultHBaseKeyFactory forTest(SerDeParameters params, ColumnMappings mappings) { + DefaultHBaseKeyFactory factory = new DefaultHBaseKeyFactory(); + factory.serdeParams = params; + factory.keyMapping = mappings.getKeyMapping(); + return factory; + } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java index c6c42b4..3bbab20 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java @@ -35,7 +35,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.io.Writable; public class HBaseRowSerializer { @@ -45,7 +47,9 @@ private final LazySimpleSerDe.SerDeParameters serdeParam; private final int keyIndex; + private final int timestampIndex; private final ColumnMapping keyMapping; + private final ColumnMapping timestampMapping; private final ColumnMapping[] columnMappings; private final byte[] separators; // the separators array private final boolean escaped; // whether we need to escape the data when writing out @@ -66,8 +70,10 @@ public HBaseRowSerializer(HBaseSerDeParameters hbaseParam) { this.escapeChar = serdeParam.getEscapeChar(); this.needsEscape = serdeParam.getNeedsEscape(); this.keyIndex = hbaseParam.getKeyIndex(); + this.timestampIndex = hbaseParam.getTimestampIndex(); this.columnMappings = hbaseParam.getColumnMappings().getColumnsMapping(); this.keyMapping = hbaseParam.getColumnMappings().getKeyMapping(); + this.timestampMapping = hbaseParam.getColumnMappings().getTimestampMapping(); this.putTimestamp = hbaseParam.getPutTimestamp(); } @@ -81,25 +87,36 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws Excep // Prepare the field ObjectInspectors StructObjectInspector soi = (StructObjectInspector) objInspector; List fields = soi.getAllStructFieldRefs(); - List list = soi.getStructFieldsDataAsList(obj); + List values = soi.getStructFieldsDataAsList(obj); StructField field = fields.get(keyIndex); - Object value = list.get(keyIndex); + Object value = values.get(keyIndex); byte[] key = keyFactory.serializeKey(value, field); if (key == null) { throw new SerDeException("HBase row key cannot be NULL"); } + long timestamp = putTimestamp; + if (timestamp < 0 && timestampIndex >= 0) { + ObjectInspector inspector = fields.get(timestampIndex).getFieldObjectInspector(); + value = values.get(timestampIndex); + if (inspector instanceof LongObjectInspector) { + timestamp = ((LongObjectInspector)inspector).get(value); + } else { + PrimitiveObjectInspector primitive = (PrimitiveObjectInspector) inspector; + timestamp = PrimitiveObjectInspectorUtils.getTimestamp(value, primitive).getTime(); + } + } - Put put = putTimestamp >= 0 ? new Put(key, putTimestamp) : new Put(key); + Put put = timestamp >= 0 ? new Put(key, timestamp) : new Put(key); // Serialize each field for (int i = 0; i < fields.size(); i++) { - if (i == keyIndex) { + if (i == keyIndex || i == timestampIndex) { continue; } field = fields.get(i); - value = list.get(i); + value = values.get(i); serializeField(value, field, columnMappings[i], put); } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index aedd843..b96678b 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -49,6 +49,7 @@ public static final String HBASE_TABLE_NAME = "hbase.table.name"; public static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE = "hbase.table.default.storage.type"; public static final String HBASE_KEY_COL = ":key"; + public static final String HBASE_TIMESTAMP_COL = ":timestamp"; public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp"; public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class"; public static final String HBASE_COMPOSITE_KEY_TYPES = "hbase.composite.key.types"; @@ -98,8 +99,7 @@ public void initialize(Configuration conf, Properties tbl) serdeParams.getValueFactories()); cachedHBaseRow = new LazyHBaseRow( - (LazySimpleStructObjectInspector) cachedObjectInspector, - serdeParams.getKeyIndex(), serdeParams.getKeyFactory(), serdeParams.getValueFactories()); + (LazySimpleStructObjectInspector) cachedObjectInspector, serdeParams); serializer = new HBaseRowSerializer(serdeParams); @@ -135,6 +135,7 @@ public static ColumnMappings parseColumnsMapping( } int rowKeyIndex = -1; + int timestampIndex = -1; List columnsMapping = new ArrayList(); String[] columnSpecs = columnsMappingSpec.split(","); @@ -160,12 +161,20 @@ public static ColumnMappings parseColumnsMapping( columnMapping.qualifierName = null; columnMapping.qualifierNameBytes = null; columnMapping.hbaseRowKey = true; + } else if (colInfo.equals(HBASE_TIMESTAMP_COL)) { + timestampIndex = i; + columnMapping.familyName = colInfo; + columnMapping.familyNameBytes = Bytes.toBytes(colInfo); + columnMapping.qualifierName = null; + columnMapping.qualifierNameBytes = null; + columnMapping.hbaseTimestamp = true; } else { String [] parts = colInfo.split(":"); assert(parts.length > 0 && parts.length <= 2); columnMapping.familyName = parts[0]; columnMapping.familyNameBytes = Bytes.toBytes(parts[0]); columnMapping.hbaseRowKey = false; + columnMapping.hbaseTimestamp = false; if (parts.length == 2) { @@ -205,7 +214,7 @@ public static ColumnMappings parseColumnsMapping( columnsMapping.add(0, columnMapping); } - return new ColumnMappings(columnsMapping, rowKeyIndex); + return new ColumnMappings(columnsMapping, rowKeyIndex, timestampIndex); } public LazySimpleSerDe.SerDeParameters getSerdeParams() { @@ -228,7 +237,7 @@ public Object deserialize(Writable result) throws SerDeException { throw new SerDeException(getClass().getName() + ": expects ResultWritable!"); } - cachedHBaseRow.init(((ResultWritable) result).getResult(), serdeParams.getColumnMappings()); + cachedHBaseRow.init(((ResultWritable) result).getResult()); return cachedHBaseRow; } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java index 9efa494..006e985 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDeParameters.java @@ -128,6 +128,14 @@ public ColumnMapping getKeyColumnMapping() { return columnMappings.getKeyMapping(); } + public int getTimestampIndex() { + return columnMappings.getTimestampIndex(); + } + + public ColumnMapping getTimestampColumnMapping() { + return columnMappings.getTimestampMapping(); + } + public ColumnMappings getColumnMappings() { return columnMappings; } @@ -354,4 +362,4 @@ private Schema getSchema(Configuration conf, Properties tbl, ColumnMapping colMa return schema; } -} \ No newline at end of file +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index feb3cd1..e1c9977 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -186,7 +186,7 @@ public void preCreateTable(Table tbl) throws MetaException { Set uniqueColumnFamilies = new HashSet(); for (ColumnMapping colMap : columnMappings) { - if (!colMap.hbaseRowKey) { + if (!colMap.hbaseRowKey && !colMap.hbaseTimestamp) { uniqueColumnFamilies.add(colMap.familyName); } } @@ -213,7 +213,7 @@ public void preCreateTable(Table tbl) throws MetaException { for (ColumnMapping colMap : columnMappings) { - if (colMap.hbaseRowKey) { + if (colMap.hbaseRowKey || colMap.hbaseTimestamp) { continue; } @@ -495,34 +495,38 @@ public static DecomposedPredicate decomposePredicate( HBaseSerDe hBaseSerDe, ExprNodeDesc predicate) { ColumnMapping keyMapping = hBaseSerDe.getHBaseSerdeParam().getKeyColumnMapping(); + ColumnMapping tsMapping = hBaseSerDe.getHBaseSerdeParam().getTimestampColumnMapping(); IndexPredicateAnalyzer analyzer = HiveHBaseTableInputFormat.newIndexPredicateAnalyzer( - keyMapping.columnName, keyMapping.columnType, keyMapping.binaryStorage.get(0)); - List searchConditions = - new ArrayList(); + keyMapping.columnName, keyMapping.isComparable(), + tsMapping == null ? null : tsMapping.columnName); + List conditions = new ArrayList(); ExprNodeGenericFuncDesc residualPredicate = - (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions); - int scSize = searchConditions.size(); - if (scSize < 1 || 2 < scSize) { - // Either there was nothing which could be pushed down (size = 0), - // there were complex predicates which we don't support yet. - // Currently supported are one of the form: - // 1. key < 20 (size = 1) - // 2. key = 20 (size = 1) - // 3. key < 20 and key > 10 (size = 2) - return null; - } - if (scSize == 2 && - (searchConditions.get(0).getComparisonOp() - .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual") || - searchConditions.get(1).getComparisonOp() - .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"))) { - // If one of the predicates is =, then any other predicate with it is illegal. - return null; + (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, conditions); + + for (List searchConditions: + HiveHBaseInputFormatUtil.decompose(conditions).values()) { + int scSize = searchConditions.size(); + if (scSize < 1 || 2 < scSize) { + // Either there was nothing which could be pushed down (size = 0), + // there were complex predicates which we don't support yet. + // Currently supported are one of the form: + // 1. key < 20 (size = 1) + // 2. key = 20 (size = 1) + // 3. key < 20 and key > 10 (size = 2) + return null; + } + if (scSize == 2 && + (searchConditions.get(0).getComparisonOp() + .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual") || + searchConditions.get(1).getComparisonOp() + .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"))) { + // If one of the predicates is =, then any other predicate with it is illegal. + return null; + } } DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); - decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions( - searchConditions); + decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(conditions); decomposedPredicate.residualPredicate = residualPredicate; return decomposedPredicate; } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java index 5aa1d796..0524572 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java @@ -23,13 +23,16 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.mapred.JobConf; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Util code common between HiveHBaseTableInputFormat and HiveHBaseTableSnapshotInputFormat. @@ -74,7 +77,7 @@ public static Scan getScan(JobConf jobConf) throws IOException { ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); for (int i : readColIDs) { ColumnMapping colMap = columnsMapping[i]; - if (colMap.hbaseRowKey) { + if (colMap.hbaseRowKey || colMap.hbaseTimestamp) { continue; } @@ -99,7 +102,7 @@ public static Scan getScan(JobConf jobConf) throws IOException { // tables column projection. if (empty) { for (ColumnMapping colMap: columnMappings) { - if (colMap.hbaseRowKey) { + if (colMap.hbaseRowKey || colMap.hbaseTimestamp) { continue; } @@ -153,4 +156,19 @@ public static boolean getStorageFormatOfKey(String spec, String defaultFormat) t throw new IOException("Malformed string: " + spec); } } + + public static Map> decompose( + List searchConditions) { + Map> result = + new HashMap>(); + for (IndexSearchCondition condition : searchConditions) { + List conditions = result.get(condition.getColumnDesc().getColumn()); + if (conditions == null) { + conditions = new ArrayList(); + result.put(condition.getColumnDesc().getColumn(), conditions); + } + conditions.add(condition); + } + return result; + } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 4ac0803..280cfdd 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,9 +53,11 @@ import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; @@ -171,7 +175,7 @@ public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws * * @return converted table split if any */ - private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) + private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean isKeyBinary) throws IOException { // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL @@ -193,22 +197,29 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) if (filterExprSerialized == null) { return scan; } + ExprNodeGenericFuncDesc filterExpr = Utilities.deserializeExpression(filterExprSerialized); - String colName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; + String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey]; - IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(colName,colType, isKeyBinary); + boolean isKeyComparable = isKeyBinary || colType.equalsIgnoreCase("string"); + + String tsColName = null; + if (iTimestamp >= 0) { + tsColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp]; + } - List searchConditions = - new ArrayList(); - ExprNodeDesc residualPredicate = - analyzer.analyzePredicate(filterExpr, searchConditions); + IndexPredicateAnalyzer analyzer = + newIndexPredicateAnalyzer(keyColName, isKeyComparable, tsColName); + + List conditions = new ArrayList(); + ExprNodeDesc residualPredicate = analyzer.analyzePredicate(filterExpr, conditions); // There should be no residual since we already negotiated that earlier in // HBaseStorageHandler.decomposePredicate. However, with hive.optimize.index.filter // OpProcFactory#pushFilterToStorageHandler pushes the original filter back down again. - // Since pushed-down filters are not ommitted at the higher levels (and thus the + // Since pushed-down filters are not omitted at the higher levels (and thus the // contract of negotiation is ignored anyway), just ignore the residuals. // Re-assess this when negotiation is honored and the duplicate evaluation is removed. // THIS IGNORES RESIDUAL PARSING FROM HBaseStorageHandler#decomposePredicate @@ -216,9 +227,23 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) LOG.debug("Ignoring residual predicate " + residualPredicate.getExprString()); } + Map> split = HiveHBaseInputFormatUtil.decompose(conditions); + List keyConditions = split.get(keyColName); + if (keyConditions != null && !keyConditions.isEmpty()) { + setupKeyRange(scan, keyConditions, isKeyBinary); + } + List tsConditions = split.get(tsColName); + if (tsConditions != null && !tsConditions.isEmpty()) { + setupTimeRange(scan, tsConditions); + } + return scan; + } + + private void setupKeyRange(Scan scan, List conditions, boolean isBinary) + throws IOException { // Convert the search condition into a restriction on the HBase scan byte [] startRow = HConstants.EMPTY_START_ROW, stopRow = HConstants.EMPTY_END_ROW; - for (IndexSearchCondition sc : searchConditions){ + for (IndexSearchCondition sc : conditions) { ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc()); PrimitiveObjectInspector objInspector; @@ -234,7 +259,7 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) throw new IOException(e); } - byte [] constantVal = getConstantVal(writable, objInspector, isKeyBinary); + byte[] constantVal = getConstantVal(writable, objInspector, isBinary); String comparisonOp = sc.getComparisonOp(); if("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){ @@ -261,7 +286,52 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) if (LOG.isDebugEnabled()) { LOG.debug(Bytes.toStringBinary(startRow) + " ~ " + Bytes.toStringBinary(stopRow)); } - return scan; + } + + private void setupTimeRange(Scan scan, List conditions) + throws IOException { + long start = 0; + long end = Long.MAX_VALUE; + for (IndexSearchCondition sc : conditions) { + long timestamp = getTimestampVal(sc); + String comparisonOp = sc.getComparisonOp(); + if("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){ + start = timestamp; + end = timestamp + 1; + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan".equals(comparisonOp)){ + end = timestamp; + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan" + .equals(comparisonOp)) { + start = timestamp; + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan" + .equals(comparisonOp)){ + start = timestamp + 1; + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan" + .equals(comparisonOp)){ + end = timestamp + 1; + } else { + throw new IOException(comparisonOp + " is not a supported comparison operator"); + } + } + scan.setTimeRange(start, end); + } + + private long getTimestampVal(IndexSearchCondition sc) throws IOException { + long timestamp; + try { + ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc()); + ObjectInspector inspector = eval.initialize(null); + Object value = eval.evaluate(null); + if (inspector instanceof LongObjectInspector) { + timestamp = ((LongObjectInspector)inspector).get(value); + } else { + PrimitiveObjectInspector primitive = (PrimitiveObjectInspector) inspector; + timestamp = PrimitiveObjectInspectorUtils.getTimestamp(value, primitive).getTime(); + } + } catch (HiveException e) { + throw new IOException(e); + } + return timestamp; } private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, @@ -312,11 +382,6 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) return next; } - static IndexPredicateAnalyzer newIndexPredicateAnalyzer( - String keyColumnName, TypeInfo keyColType, boolean isKeyBinary) { - return newIndexPredicateAnalyzer(keyColumnName, keyColType.getTypeName(), isKeyBinary); - } - /** * Instantiates a new predicate analyzer suitable for * determining how to push a filter down into the HBase scan, @@ -327,26 +392,34 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( * @return preconfigured predicate analyzer */ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( - String keyColumnName, String keyColType, boolean isKeyBinary) { + String keyColumnName, boolean isKeyComparable, String timestampColumn) { IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer(); // We can always do equality predicate. Just need to make sure we get appropriate // BA representation of constant of filter condition. - analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"); // We can do other comparisons only if storage format in hbase is either binary - // or we are dealing with string types since there lexographic ordering will suffice. - if(isKeyBinary || (keyColType.equalsIgnoreCase("string"))){ - analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic." + - "GenericUDFOPEqualOrGreaterThan"); - analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan"); - analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan"); - analyzer.addComparisonOp("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan"); + // or we are dealing with string types since there lexicographic ordering will suffice. + if (isKeyComparable) { + analyzer.addComparisonOp(keyColumnName, + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual", + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan", + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan", + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan", + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan"); + } else { + analyzer.addComparisonOp(keyColumnName, + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"); } - // and only on the key column - analyzer.clearAllowedColumnNames(); - analyzer.allowColumnName(keyColumnName); + if (timestampColumn != null) { + analyzer.addComparisonOp(timestampColumn, + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual", + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan", + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan", + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan", + "org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan"); + } return analyzer; } @@ -374,6 +447,7 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( } int iKey = columnMappings.getKeyIndex(); + int iTimestamp = columnMappings.getTimestampIndex(); ColumnMapping keyMapping = columnMappings.getKeyMapping(); // Take filter pushdown into account while calculating splits; this @@ -382,7 +456,7 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( // split per region, the implementation actually takes the scan // definition into account and excludes regions which don't satisfy // the start/stop row conditions (HBASE-1829). - Scan scan = createFilterScan(jobConf, iKey, + Scan scan = createFilterScan(jobConf, iKey, iTimestamp, HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec, jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); @@ -392,7 +466,7 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( // REVIEW: are we supposed to be applying the getReadColumnIDs // same as in getRecordReader? for (ColumnMapping colMap : columnMappings) { - if (colMap.hbaseRowKey) { + if (colMap.hbaseRowKey || colMap.hbaseTimestamp) { continue; } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java index 6ac8423..908d2e2 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java @@ -18,17 +18,23 @@ package org.apache.hadoop.hive.hbase; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.hbase.struct.HBaseValueFactory; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyLong; import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.lazy.LazyString; import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -42,43 +48,48 @@ * The HBase columns mapping of the row. */ private Result result; - private ColumnMapping[] columnsMapping; private ArrayList cachedList; - private final int iKey; private final HBaseKeyFactory keyFactory; private final List valueFactories; + private final ColumnMapping[] columnsMapping; - public LazyHBaseRow(LazySimpleStructObjectInspector oi) { - this(oi, -1, null, null); + @VisibleForTesting + LazyHBaseRow(LazySimpleStructObjectInspector oi, ColumnMappings columnMappings) { + super(oi); + this.keyFactory = DefaultHBaseKeyFactory.forTest(null, columnMappings); + this.valueFactories = null; + this.columnsMapping = columnMappings.getColumnsMapping(); } /** * Construct a LazyHBaseRow object with the ObjectInspector. */ - public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey, HBaseKeyFactory keyFactory, - List valueFactories) { + public LazyHBaseRow(LazySimpleStructObjectInspector oi, HBaseSerDeParameters serdeParams) { super(oi); - this.iKey = iKey; - this.keyFactory = keyFactory; - this.valueFactories = valueFactories; + this.keyFactory = serdeParams.getKeyFactory(); + this.valueFactories = serdeParams.getValueFactories(); + this.columnsMapping = serdeParams.getColumnMappings().getColumnsMapping(); } /** * Set the HBase row data(a Result writable) for this LazyStruct. * @see LazyHBaseRow#init(org.apache.hadoop.hbase.client.Result) */ - public void init(Result r, ColumnMappings columnsMappings) { + public void init(Result r) { this.result = r; - this.columnsMapping = columnsMappings.getColumnsMapping(); setParsed(false); } @Override - protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) throws SerDeException { - if (fieldID == iKey) { + protected LazyObjectBase createLazyField(final int fieldID, final StructField fieldRef) + throws SerDeException { + if (columnsMapping[fieldID].hbaseRowKey) { return keyFactory.createKey(fieldRef.getFieldObjectInspector()); } + if (columnsMapping[fieldID].hbaseTimestamp) { + return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector()); + } if (valueFactories != null) { return valueFactories.get(fieldID).createValueObject(fieldRef.getFieldObjectInspector()); @@ -136,6 +147,14 @@ private Object uncheckedGetField(int fieldID) { if (colMap.hbaseRowKey) { ref = new ByteArrayRef(); ref.setData(result.getRow()); + } else if (colMap.hbaseTimestamp) { + long timestamp = result.rawCells()[0].getTimestamp(); // from hbase-0.96.0 + LazyObjectBase lz = fields[fieldID]; + if (lz instanceof LazyTimestamp) { + ((LazyTimestamp) lz).getWritableObject().setTime(timestamp); + } else { + ((LazyLong) lz).getWritableObject().set(timestamp); + } } else { if (colMap.qualifierName == null) { // it is a column family diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java index 9a31f0f..b2bdd19 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java @@ -459,7 +459,7 @@ public void testLazyHBaseRow1() throws SerDeException { List fieldTypeInfos = TypeInfoUtils.getTypeInfosFromTypeString( "string,int,array,map,string"); - List fieldNames = Arrays.asList(new String[]{"key", "a", "b", "c", "d"}); + List fieldNames = Arrays.asList("key", "a", "b", "c", "d"); Text nullSequence = new Text("\\N"); String hbaseColsMapping = ":key,cfa:a,cfa:b,cfb:c,cfb:d"; @@ -483,7 +483,7 @@ public void testLazyHBaseRow1() throws SerDeException { ObjectInspector oi = LazyFactory.createLazyStructInspector(fieldNames, fieldTypeInfos, new byte[] {' ', ':', '='}, nullSequence, false, false, (byte)0); - LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi); + LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi, columnMappings); List kvs = new ArrayList(); @@ -497,7 +497,7 @@ public void testLazyHBaseRow1() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("hi"))); Result r = new Result(kvs); - o.init(r, columnMappings); + o.init(r); assertEquals( ("{'key':'test-row','a':123,'b':['a','b','c']," @@ -511,7 +511,7 @@ public void testLazyHBaseRow1() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("c"), Bytes.toBytes("d=e:f=g"))); r = new Result(kvs); - o.init(r, columnMappings); + o.init(r); assertEquals( ("{'key':'test-row','a':123,'b':null," @@ -527,7 +527,7 @@ public void testLazyHBaseRow1() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no"))); r = new Result(kvs); - o.init(r, columnMappings); + o.init(r); assertEquals( ("{'key':'test-row','a':null,'b':['a']," @@ -541,7 +541,7 @@ public void testLazyHBaseRow1() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no"))); r = new Result(kvs); - o.init(r, columnMappings); + o.init(r); assertEquals( ("{'key':'test-row','a':null,'b':['','a','','']," @@ -565,7 +565,7 @@ public void testLazyHBaseRow1() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes(""))); r = new Result(kvs); - o.init(r, columnMappings); + o.init(r); assertEquals( "{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""), @@ -609,7 +609,7 @@ public void testLazyHBaseRow2() throws SerDeException { fieldTypeInfos, new byte[] {' ', ':', '='}, nullSequence, false, false, (byte) 0); - LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi); + LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi, columnMappings); List kvs = new ArrayList(); kvs.add(new KeyValue(Bytes.toBytes("test-row"), @@ -624,7 +624,7 @@ public void testLazyHBaseRow2() throws SerDeException { Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("hi"))); Result r = new Result(kvs); - o.init(r, columnMappings); + o.init(r); assertEquals( ("{'key':'test-row','a':123,'b':['a','b','c']," @@ -640,7 +640,7 @@ public void testLazyHBaseRow2() throws SerDeException { Bytes.toBytes("cfb"), Bytes.toBytes("f"), Bytes.toBytes("g"))); r = new Result(kvs); - o.init(r, columnMappings); + o.init(r); assertEquals( ("{'key':'test-row','a':123,'b':null," @@ -656,7 +656,7 @@ public void testLazyHBaseRow2() throws SerDeException { Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no"))); r = new Result(kvs); - o.init(r, columnMappings); + o.init(r); assertEquals( ("{'key':'test-row','a':null,'b':['a']," @@ -670,7 +670,7 @@ public void testLazyHBaseRow2() throws SerDeException { Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no"))); r = new Result(kvs); - o.init(r, columnMappings); + o.init(r); assertEquals( ("{'key':'test-row','a':null,'b':['','a','','']," @@ -686,7 +686,7 @@ public void testLazyHBaseRow2() throws SerDeException { Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes(""))); r = new Result(kvs); - o.init(r, columnMappings); + o.init(r); assertEquals( "{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""), @@ -733,7 +733,7 @@ public void testLazyHBaseRow3() throws SerDeException { LazyFactory.createLazyStructInspector(fieldNames, fieldTypeInfos, new byte [] {' ', ':', '='}, nullSequence, false, false, (byte) 0); - LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi); + LazyHBaseRow o = new LazyHBaseRow((LazySimpleStructObjectInspector) oi, columnMappings); byte [] rowKey = "row-key".getBytes(); List kvs = new ArrayList(); @@ -785,7 +785,7 @@ public void testLazyHBaseRow3() throws SerDeException { Collections.sort(kvs, KeyValue.COMPARATOR); Result result = new Result(kvs); - o.init(result, columnMappings); + o.init(result); List fieldRefs = ((StructObjectInspector) oi).getAllStructFieldRefs(); diff --git hbase-handler/src/test/queries/positive/hbase_timestamp.q hbase-handler/src/test/queries/positive/hbase_timestamp.q new file mode 100644 index 0000000..6f1e205 --- /dev/null +++ hbase-handler/src/test/queries/positive/hbase_timestamp.q @@ -0,0 +1,41 @@ +DROP TABLE hbase_table; +CREATE TABLE hbase_table (key string, value string, time timestamp) + STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string,:timestamp"); +DESC extended hbase_table; +FROM src INSERT OVERWRITE TABLE hbase_table SELECT key, value, "2012-02-23 10:14:52" WHERE (key % 17) = 0; +SELECT * FROM hbase_table; + +DROP TABLE hbase_table; +CREATE TABLE hbase_table (key string, value string, time bigint) + STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string,:timestamp"); +FROM src INSERT OVERWRITE TABLE hbase_table SELECT key, value, 1329959754000 WHERE (key % 17) = 0; +SELECT key, value, cast(time as timestamp) FROM hbase_table; + +DROP TABLE hbase_table; +CREATE TABLE hbase_table (key string, value string, time bigint) + STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string,:timestamp"); +insert overwrite table hbase_table select key,value,ts FROM +( + select key, value, 100000000000 as ts from src WHERE (key % 33) = 0 + UNION ALL + select key, value, 200000000000 as ts from src WHERE (key % 37) = 0 +) T; + +explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time < 200000000000; +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time < 200000000000; + +explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time > 100000000000; +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time > 100000000000; + +explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time <= 100000000000; +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time <= 100000000000; + +explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time >= 200000000000; +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time >= 200000000000; diff --git hbase-handler/src/test/results/positive/hbase_timestamp.q.out hbase-handler/src/test/results/positive/hbase_timestamp.q.out new file mode 100644 index 0000000..f70d371 --- /dev/null +++ hbase-handler/src/test/results/positive/hbase_timestamp.q.out @@ -0,0 +1,357 @@ +PREHOOK: query: DROP TABLE hbase_table +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE hbase_table +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE hbase_table (key string, value string, time timestamp) + STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string,:timestamp") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@hbase_table +POSTHOOK: query: CREATE TABLE hbase_table (key string, value string, time timestamp) + STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string,:timestamp") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_table +PREHOOK: query: DESC extended hbase_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@hbase_table +POSTHOOK: query: DESC extended hbase_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@hbase_table +key string from deserializer +value string from deserializer +time timestamp from deserializer + +#### A masked pattern was here #### +PREHOOK: query: FROM src INSERT OVERWRITE TABLE hbase_table SELECT key, value, "2012-02-23 10:14:52" WHERE (key % 17) = 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_table +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE hbase_table SELECT key, value, "2012-02-23 10:14:52" WHERE (key % 17) = 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_table +PREHOOK: query: SELECT * FROM hbase_table +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM hbase_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table +#### A masked pattern was here #### +0 val_0 2012-02-23 10:14:52 +119 val_119 2012-02-23 10:14:52 +136 val_136 2012-02-23 10:14:52 +153 val_153 2012-02-23 10:14:52 +17 val_17 2012-02-23 10:14:52 +170 val_170 2012-02-23 10:14:52 +187 val_187 2012-02-23 10:14:52 +221 val_221 2012-02-23 10:14:52 +238 val_238 2012-02-23 10:14:52 +255 val_255 2012-02-23 10:14:52 +272 val_272 2012-02-23 10:14:52 +289 val_289 2012-02-23 10:14:52 +306 val_306 2012-02-23 10:14:52 +323 val_323 2012-02-23 10:14:52 +34 val_34 2012-02-23 10:14:52 +374 val_374 2012-02-23 10:14:52 +459 val_459 2012-02-23 10:14:52 +493 val_493 2012-02-23 10:14:52 +51 val_51 2012-02-23 10:14:52 +85 val_85 2012-02-23 10:14:52 +PREHOOK: query: DROP TABLE hbase_table +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@hbase_table +PREHOOK: Output: default@hbase_table +POSTHOOK: query: DROP TABLE hbase_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@hbase_table +POSTHOOK: Output: default@hbase_table +PREHOOK: query: CREATE TABLE hbase_table (key string, value string, time bigint) + STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string,:timestamp") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@hbase_table +POSTHOOK: query: CREATE TABLE hbase_table (key string, value string, time bigint) + STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string,:timestamp") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_table +PREHOOK: query: FROM src INSERT OVERWRITE TABLE hbase_table SELECT key, value, 1329959754000 WHERE (key % 17) = 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_table +POSTHOOK: query: FROM src INSERT OVERWRITE TABLE hbase_table SELECT key, value, 1329959754000 WHERE (key % 17) = 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_table +PREHOOK: query: SELECT key, value, cast(time as timestamp) FROM hbase_table +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, value, cast(time as timestamp) FROM hbase_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table +#### A masked pattern was here #### +0 val_0 2012-02-22 17:15:54 +119 val_119 2012-02-22 17:15:54 +136 val_136 2012-02-22 17:15:54 +153 val_153 2012-02-22 17:15:54 +17 val_17 2012-02-22 17:15:54 +170 val_170 2012-02-22 17:15:54 +187 val_187 2012-02-22 17:15:54 +221 val_221 2012-02-22 17:15:54 +238 val_238 2012-02-22 17:15:54 +255 val_255 2012-02-22 17:15:54 +272 val_272 2012-02-22 17:15:54 +289 val_289 2012-02-22 17:15:54 +306 val_306 2012-02-22 17:15:54 +323 val_323 2012-02-22 17:15:54 +34 val_34 2012-02-22 17:15:54 +374 val_374 2012-02-22 17:15:54 +459 val_459 2012-02-22 17:15:54 +493 val_493 2012-02-22 17:15:54 +51 val_51 2012-02-22 17:15:54 +85 val_85 2012-02-22 17:15:54 +PREHOOK: query: DROP TABLE hbase_table +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@hbase_table +PREHOOK: Output: default@hbase_table +POSTHOOK: query: DROP TABLE hbase_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@hbase_table +POSTHOOK: Output: default@hbase_table +PREHOOK: query: CREATE TABLE hbase_table (key string, value string, time bigint) + STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string,:timestamp") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@hbase_table +POSTHOOK: query: CREATE TABLE hbase_table (key string, value string, time bigint) + STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:string,:timestamp") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_table +PREHOOK: query: insert overwrite table hbase_table select key,value,ts FROM +( + select key, value, 100000000000 as ts from src WHERE (key % 33) = 0 + UNION ALL + select key, value, 200000000000 as ts from src WHERE (key % 37) = 0 +) T +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_table +POSTHOOK: query: insert overwrite table hbase_table select key,value,ts FROM +( + select key, value, 100000000000 as ts from src WHERE (key % 33) = 0 + UNION ALL + select key, value, 200000000000 as ts from src WHERE (key % 37) = 0 +) T +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_table +PREHOOK: query: explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time < 200000000000 +PREHOOK: type: QUERY +POSTHOOK: query: explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time < 200000000000 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_table + filterExpr: (((key > 100) and (key < 400)) and (time < 200000000000)) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (((key > 100) and (key < 400)) and (time < 200000000000)) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), CAST( time AS TIMESTAMP) (type: timestamp) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time < 200000000000 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time < 200000000000 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table +#### A masked pattern was here #### +165 val_165 1973-03-03 01:46:40 +396 val_396 1973-03-03 01:46:40 +PREHOOK: query: explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time > 100000000000 +PREHOOK: type: QUERY +POSTHOOK: query: explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time > 100000000000 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_table + filterExpr: (((key > 100) and (key < 400)) and (time > 100000000000)) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (((key > 100) and (key < 400)) and (time > 100000000000)) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), CAST( time AS TIMESTAMP) (type: timestamp) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time > 100000000000 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time > 100000000000 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table +#### A masked pattern was here #### +111 val_111 1976-05-03 12:33:20 +222 val_222 1976-05-03 12:33:20 +296 val_296 1976-05-03 12:33:20 +333 val_333 1976-05-03 12:33:20 +PREHOOK: query: explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time <= 100000000000 +PREHOOK: type: QUERY +POSTHOOK: query: explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time <= 100000000000 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_table + filterExpr: (((key > 100) and (key < 400)) and (time <= 100000000000)) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (((key > 100) and (key < 400)) and (time <= 100000000000)) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), CAST( time AS TIMESTAMP) (type: timestamp) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time <= 100000000000 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time <= 100000000000 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table +#### A masked pattern was here #### +165 val_165 1973-03-03 01:46:40 +396 val_396 1973-03-03 01:46:40 +PREHOOK: query: explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time >= 200000000000 +PREHOOK: type: QUERY +POSTHOOK: query: explain +SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time >= 200000000000 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hbase_table + filterExpr: (((key > 100) and (key < 400)) and (time >= 200000000000)) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (((key > 100) and (key < 400)) and (time >= 200000000000)) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), CAST( time AS TIMESTAMP) (type: timestamp) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time >= 200000000000 +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_table +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, value, cast(time as timestamp) FROM hbase_table WHERE key > 100 AND key < 400 AND time >= 200000000000 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_table +#### A masked pattern was here #### +111 val_111 1976-05-03 12:33:20 +222 val_222 1976-05-03 12:33:20 +296 val_296 1976-05-03 12:33:20 +333 val_333 1976-05-03 12:33:20 diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java index 960fc1d..4987f7a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.index; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -57,14 +58,14 @@ public class IndexPredicateAnalyzer { private final Set udfNames; - private final Set allowedColumnNames; + private final Map> columnToUDFs; private FieldValidator fieldValidator; private boolean acceptsFields; public IndexPredicateAnalyzer() { udfNames = new HashSet(); - allowedColumnNames = new HashSet(); + columnToUDFs = new HashMap>(); } public void setFieldValidator(FieldValidator fieldValidator) { @@ -89,7 +90,7 @@ public void addComparisonOp(String udfName) { * column names are allowed.) */ public void clearAllowedColumnNames() { - allowedColumnNames.clear(); + columnToUDFs.clear(); } /** @@ -98,7 +99,22 @@ public void clearAllowedColumnNames() { * @param columnName name of column to be allowed */ public void allowColumnName(String columnName) { - allowedColumnNames.add(columnName); + columnToUDFs.put(columnName, udfNames); + } + + /** + * add allowed functions per column + * @param columnName + * @param udfs + */ + public void addComparisonOp(String columnName, String... udfs) { + Set allowed = columnToUDFs.get(columnName); + if (allowed == null || allowed == udfNames) { + // override + columnToUDFs.put(columnName, new HashSet(Arrays.asList(udfs))); + } else { + allowed.addAll(Arrays.asList(udfs)); + } } /** @@ -221,12 +237,13 @@ private ExprNodeDesc analyzeExpr( constantDesc = (ExprNodeConstantDesc) extracted[1]; } - String udfName = genericUDF.getUdfName(); - if (!udfNames.contains(genericUDF.getUdfName())) { + Set allowed = columnToUDFs.get(columnDesc.getColumn()); + if (allowed == null) { return expr; } - if (!allowedColumnNames.contains(columnDesc.getColumn())) { + String udfName = genericUDF.getUdfName(); + if (!allowed.contains(genericUDF.getUdfName())) { return expr; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java index 0ab27ff..a38ccb5 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java +++ serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java @@ -29,7 +29,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt; @@ -125,6 +124,12 @@ public void set(byte[] bytes, int offset) { clearTimestamp(); } + public void setTime(long time) { + timestamp.setTime(time); + bytesEmpty = true; + timestampEmpty = false; + } + public void set(Timestamp t) { if (t == null) { timestamp.setTime(0);