diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java index 5008f15..14af735 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java @@ -20,9 +20,8 @@ import java.util.ArrayList; import java.util.List; -import java.util.Properties; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyObject; @@ -96,4 +95,31 @@ public HBaseCompositeKey(LazySimpleStructObjectInspector oi) { return lazyObject; } -} + + /** + * Returns the optional {@link Filter row filter} to be applied to the + * {@link org.apache.hadoop.hbase.client.Scan scans} to + * filter out unwanted row keys + * + *

+ * Note 1: For cases where the keys are salted, it is highly recommended that a custom filter be provided. + * The reason being for a scan to work properly for salted keys, multiple scans are required and + * the current implementation of hbase integration does not support that. + *

+ * + *

+ * Note 2: Since currently hive supports only pushdown for the first field in the struct, hence the value of + * fieldID will always be 0. Additional support for more fields will be added in future. + *

+ * + * @param fieldID + * id for the value + * @param values + * values that are parts of the row key + * @return {@link Filter filter} that can be applied to the {@link Scan scans}. By default, + * returns {@code null}. Can be overridden to provide a custom {@link Filter filter} + * */ + public Filter getFilter(int fieldID, Object... values) { + return null; + } +} \ No newline at end of file diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 899ccce..076692a 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -314,6 +313,10 @@ public void configureTableJobProperties( if (scanBatch != null) { jobProperties.put(HBaseSerDe.HBASE_SCAN_BATCH, scanBatch); } + String compKey = tableProperties.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + if (compKey != null) { + jobProperties.put(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS, compKey); + } String tableName = tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_NAME); diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 704fcb9..d95c8cf 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -19,17 +19,21 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; @@ -52,6 +56,7 @@ import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.shims.ShimLoader; @@ -296,12 +301,29 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) objInspector = (PrimitiveObjectInspector)eval.initialize(null); writable = eval.evaluate(null); } catch (ClassCastException cce) { - throw new IOException("Currently only primitve types are supported. Found: " + + throw new IOException("Currently only primitive types are supported. Found: " + sc.getConstantDesc().getTypeString()); } catch (HiveException e) { throw new IOException(e); } + // See if a custom filter was provided. Currently the capability to directly plugin custom + // filter without a composite class implementation is not supported + Filter filter = getFilter(jobConf, writable, objInspector); + + // if a valid filter was found, set it on the scan and return + if (filter != null) { + LOG.debug("Setting filter and returning scan"); + scan.setFilter(filter); + + return scan; + } else if (sc.isStruct()) { + // if the given column was part of a struct and a filter wasn't provided simply return to + // avoid setting bad values as start and stop keys + LOG.debug("Filter was null. Simply returning scan as a struct was found"); + return scan; + } + byte [] constantVal = getConstantVal(writable, objInspector, isKeyBinary); String comparisonOp = sc.getComparisonOp(); @@ -511,4 +533,97 @@ private boolean getStorageFormatOfKey(String spec, String defaultFormat) throws throw new IOException("Malformed string: " + spec); } } -} + + /** + * Extract a possible filter implementation from the composite key class + * + * @return null if the given key class is null or if it does not provide a custom row + * key filter, the provided filter otherwise + * @throws IOException + * */ + private Filter getFilter(JobConf jobConf, Object writable, PrimitiveObjectInspector poi) + throws IOException { + + String compositeKeyClass = jobConf.get(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + + // No composite key provided. return + if (compositeKeyClass == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Returning null as no composite key class was provided"); + } + + return null; + } + + Object filter = null; + + Properties props = new Properties(); + + try { + Class clazz = jobConf.getClassByName(compositeKeyClass); + Object compositeKeyObj = clazz.getDeclaredConstructor(LazySimpleStructObjectInspector.class, + Properties.class, Configuration.class).newInstance(null, props, + HBaseConfiguration.create(jobConf)); + + Object value = getValue(writable, poi); + + // we currently support filters on the first field of the composite key + filter = clazz.getMethod("getFilter", int.class, Object[].class) + .invoke(compositeKeyObj, 0, new Object[] {value}); + } catch (ClassNotFoundException e) { + // throw as IOException + throw new IOException(e); + } catch (SecurityException e) { + // throw as IOException + throw new IOException(e); + } catch (IllegalArgumentException e) { + // throw as IOException + throw new IOException(e); + } catch (InstantiationException e) { + // throw as IOException + throw new IOException(e); + } catch (IllegalAccessException e) { + // throw as IOException + throw new IOException(e); + } catch (InvocationTargetException e) { + // throw as IOException + throw new IOException(e); + } catch (NoSuchMethodException e) { + // throw as IOException + throw new IOException(e); + } + + if (!(filter instanceof Filter)) { + throw new IllegalStateException( + "Invalid filter. Should be an instance of org.apache.hadoop.hbase.Filter"); + } + + // return the given filter + return (Filter) filter; + } + + private Object getValue(Object writable, PrimitiveObjectInspector poi) throws IOException { + PrimitiveCategory pc = poi.getPrimitiveCategory(); + switch (poi.getPrimitiveCategory()) { + case INT: + return ((IntWritable) writable).get(); + case BOOLEAN: + return ((BooleanWritable) writable).get(); + case LONG: + return ((LongWritable) writable).get(); + case FLOAT: + return ((FloatWritable) writable).get(); + case DOUBLE: + return ((DoubleWritable) writable).get(); + case SHORT: + return ((ShortWritable) writable).get(); + case STRING: + return ((Text) writable).toString(); + case BYTE: + return ((ByteWritable) writable).get(); + + default: + throw new IOException("Type not supported " + pc); + } + } +} \ No newline at end of file diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java index 13c344b..f025b1c 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java @@ -21,6 +21,10 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; @@ -54,4 +58,9 @@ public Object getField(int fieldID) { return toLazyObject(fieldID, fieldBytes); } + + @Override + public Filter getFilter(int fieldID, Object... values) { + return new FamilyFilter(CompareOp.EQUAL, new BinaryComparator("astring".getBytes())); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java index d39ee2e..039af8f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java @@ -40,11 +40,11 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; /** * IndexPredicateAnalyzer decomposes predicates, separating the parts @@ -193,12 +193,14 @@ private ExprNodeDesc analyzeExpr( return expr; } + boolean isStruct = ((expr1 instanceof ExprNodeFieldDesc) || (expr2 instanceof ExprNodeFieldDesc)); + searchConditions.add( new IndexSearchCondition( columnDesc, udfName, constantDesc, - expr)); + expr, isStruct)); // we converted the expression to a search condition, so // remove it from the residual predicate diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java index 5f1329c..5fccc5d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java @@ -32,6 +32,7 @@ private String comparisonOp; private ExprNodeConstantDesc constantDesc; private ExprNodeGenericFuncDesc comparisonExpr; + private boolean isStruct; /** * Constructs a search condition, which takes the form @@ -50,12 +51,36 @@ public IndexSearchCondition( ExprNodeColumnDesc columnDesc, String comparisonOp, ExprNodeConstantDesc constantDesc, - ExprNodeGenericFuncDesc comparisonExpr) { + ExprNodeGenericFuncDesc comparisonExpr){ + this(columnDesc, comparisonOp, constantDesc, comparisonExpr, false); + } + + /** + * Constructs a search condition, which takes the form + *
column-ref comparison-op constant-value
. + * + * @param columnDesc column being compared + * + * @param comparisonOp comparison operator, e.g. "=" + * (taken from GenericUDFBridge.getUdfName()) + * + * @param constantDesc constant value to search for + * + * @param comparisonExpr the original comparison expression + * @param isStruct whether the column is a struct + */ + public IndexSearchCondition( + ExprNodeColumnDesc columnDesc, + String comparisonOp, + ExprNodeConstantDesc constantDesc, + ExprNodeGenericFuncDesc comparisonExpr, + boolean isStruct) { this.columnDesc = columnDesc; this.comparisonOp = comparisonOp; this.constantDesc = constantDesc; this.comparisonExpr = comparisonExpr; + this.isStruct = isStruct; } public void setColumnDesc(ExprNodeColumnDesc columnDesc) { @@ -90,8 +115,16 @@ public ExprNodeGenericFuncDesc getComparisonExpr() { return comparisonExpr; } + public boolean isStruct(){ + return isStruct; + } + + public void setIsStruct(boolean isStruct){ + this.isStruct = isStruct; + } + @Override public String toString() { return comparisonExpr.getExprString(); } -} +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java index e50026b..7ebcba1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java @@ -26,12 +26,14 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.util.ReflectionUtils; @@ -85,7 +87,7 @@ public static ExprNodeDesc replace(ExprNodeDesc origin, /** * return true if predicate is already included in source - */ + */ public static boolean containsPredicate(ExprNodeDesc source, ExprNodeDesc predicate) { if (source.isSame(predicate)) { return true; @@ -153,12 +155,12 @@ public static ExprNodeDesc mergePredicates(List exprs) { */ public static String recommendInputName(ExprNodeDesc desc) { if (desc instanceof ExprNodeColumnDesc) { - return ((ExprNodeColumnDesc)desc).getColumn(); + return ((ExprNodeColumnDesc) desc).getColumn(); } List children = desc.getChildren(); if (FunctionRegistry.isOpPreserveInputName(desc) && !children.isEmpty() && - children.get(0) instanceof ExprNodeColumnDesc) { - return ((ExprNodeColumnDesc)children.get(0)).getColumn(); + children.get(0) instanceof ExprNodeColumnDesc) { + return ((ExprNodeColumnDesc) children.get(0)).getColumn(); } return null; } @@ -168,7 +170,7 @@ public static String recommendInputName(ExprNodeDesc desc) { */ public static boolean isDeterministic(ExprNodeDesc desc) { if (desc instanceof ExprNodeGenericFuncDesc) { - if (!FunctionRegistry.isDeterministic(((ExprNodeGenericFuncDesc)desc).getGenericUDF())) { + if (!FunctionRegistry.isDeterministic(((ExprNodeGenericFuncDesc) desc).getGenericUDF())) { return false; } } @@ -226,7 +228,7 @@ private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator cur Operator terminal) throws SemanticException { Map mapping = current.getColumnExprMap(); if (mapping == null || !mapping.containsKey(column.getColumn())) { - return backtrack((ExprNodeDesc)column, current, terminal); + return backtrack((ExprNodeDesc) column, current, terminal); } ExprNodeDesc mapped = mapping.get(column.getColumn()); return backtrack(mapped, current, terminal); @@ -262,6 +264,21 @@ private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator cur if (expr1 instanceof ExprNodeConstantDesc && expr2 instanceof ExprNodeColumnDesc) { return new ExprNodeDesc[] {expr2, expr1, null}; // add null as a marker (inverted order) } + if (expr1 instanceof ExprNodeFieldDesc && expr2 instanceof ExprNodeConstantDesc) { + ExprNodeFieldDesc nodeFieldDesc = (ExprNodeFieldDesc) expr1; + + if (validateFields(nodeFieldDesc)) { + return new ExprNodeDesc[] {nodeFieldDesc.getDesc(), expr2}; + } + } + if (expr1 instanceof ExprNodeConstantDesc && expr2 instanceof ExprNodeFieldDesc) { + ExprNodeFieldDesc nodeFieldDesc = (ExprNodeFieldDesc) expr2; + + if (validateFields(nodeFieldDesc)) { + return new ExprNodeDesc[] {nodeFieldDesc.getDesc(), expr1, null}; + } + } + // todo: constant op constant return null; } @@ -318,4 +335,33 @@ private static ExprNodeConstantDesc foldConstant(ExprNodeGenericFuncDesc func) { return null; } } + + /** + * Validates the field in the {@link ExprNodeFieldDesc}. Basically this validates that the given + * field is the first field in the given struct. This is important specially in case of structs as + * order of fields in the structs is important when using for any filter down the line + * */ + private static boolean validateFields(ExprNodeFieldDesc fieldDesc) { + String fieldName = fieldDesc.getFieldName(); + + ExprNodeDesc nodeDesc = fieldDesc.getDesc(); + + TypeInfo typeInfo = nodeDesc.getTypeInfo(); + + if (!(typeInfo instanceof StructTypeInfo)) { + // since we are working off a ExprNodeFieldDesc which represents a field within a struct, this + // should never happen + throw new AssertionError("Expected StructTypeInfo. Found:" + typeInfo.getTypeName()); + } + + List allFieldNames = ((StructTypeInfo) typeInfo).getAllStructFieldNames(); + + if (allFieldNames == null || allFieldNames.size() == 0) { + return false; + } + + String firstElement = allFieldNames.get(0); + + return firstElement.equals(fieldName); + } }