diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java index 5008f15..14af735 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseCompositeKey.java @@ -20,9 +20,8 @@ import java.util.ArrayList; import java.util.List; -import java.util.Properties; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyObject; @@ -96,4 +95,31 @@ public HBaseCompositeKey(LazySimpleStructObjectInspector oi) { return lazyObject; } -} + + /** + * Returns the optional {@link Filter row filter} to be applied to the + * {@link org.apache.hadoop.hbase.client.Scan scans} to + * filter out unwanted row keys + * + *
+ * Note 1: For cases where the keys are salted, it is highly recommended that a custom filter be provided. + * The reason being for a scan to work properly for salted keys, multiple scans are required and + * the current implementation of hbase integration does not support that. + *
+ * + *+ * Note 2: Since currently hive supports only pushdown for the first field in the struct, hence the value of + * fieldID will always be 0. Additional support for more fields will be added in future. + *
+ * + * @param fieldID + * id for the value + * @param values + * values that are parts of the row key + * @return {@link Filter filter} that can be applied to the {@link Scan scans}. By default, + * returns {@code null}. Can be overridden to provide a custom {@link Filter filter} + * */ + public Filter getFilter(int fieldID, Object... values) { + return null; + } +} \ No newline at end of file diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 8cd594b..925f53e 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -312,6 +312,10 @@ public void configureTableJobProperties( if (scanBatch != null) { jobProperties.put(HBaseSerDe.HBASE_SCAN_BATCH, scanBatch); } + String compKey = tableProperties.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS); + if (compKey != null) { + jobProperties.put(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS, compKey); + } String tableName = tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_NAME); diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 704fcb9..5b38557 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -19,17 +19,21 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; @@ -52,6 +56,7 @@ import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.shims.ShimLoader; @@ -296,12 +301,29 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) objInspector = (PrimitiveObjectInspector)eval.initialize(null); writable = eval.evaluate(null); } catch (ClassCastException cce) { - throw new IOException("Currently only primitve types are supported. Found: " + - sc.getConstantDesc().getTypeString()); + throw new IOException("Currently only primitive types are supported. Found: " + + sc.getConstantDesc().getTypeString(), cce); } catch (HiveException e) { throw new IOException(e); } + // See if a custom filter was provided. Currently the capability to directly plugin custom + // filter without a composite class implementation is not supported + Filter filter = getFilter(jobConf, writable, objInspector); + + // if a valid filter was found, set it on the scan and return + if (filter != null) { + LOG.debug("Setting filter and returning scan"); + scan.setFilter(filter); + + return scan; + } else if (sc.isStruct()) { + // if the given column was part of a struct and a filter wasn't provided simply return to + // avoid setting bad values as start and stop keys + LOG.debug("Filter was null. Simply returning scan as a struct was found"); + return scan; + } + byte [] constantVal = getConstantVal(writable, objInspector, isKeyBinary); String comparisonOp = sc.getComparisonOp(); @@ -511,4 +533,72 @@ private boolean getStorageFormatOfKey(String spec, String defaultFormat) throws throw new IOException("Malformed string: " + spec); } } -} + + /** + * Extract a possible filter implementation from the composite key class + * + * @returnnull if the given key class is null or if it does not provide a custom row
+ * key filter, the provided filter otherwise
+ * @throws IOException
+ * */
+ private Filter getFilter(JobConf jobConf, Object writable, PrimitiveObjectInspector poi)
+ throws IOException {
+
+ String compositeKeyClass = jobConf.get(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
+
+ // No composite key provided. return
+ if (compositeKeyClass == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Returning null as no composite key class was provided");
+ }
+
+ return null;
+ }
+
+ Filter filter = null;
+
+ Properties props = new Properties();
+
+ try {
+ Class> clazz = jobConf.getClassByName(compositeKeyClass);
+ HBaseCompositeKey compositeKeyObj = (HBaseCompositeKey)(clazz.getDeclaredConstructor(LazySimpleStructObjectInspector.class,
+ Properties.class, Configuration.class).newInstance(null, props,
+ HBaseConfiguration.create(jobConf)));
+
+ Object value = getValue(writable, poi);
+
+ filter = compositeKeyObj.getFilter(0, new Object[]{value});
+ } catch (Exception e) {
+ // throw as IOException
+ throw new IllegalStateException("Could not instantiate "+compositeKeyClass, e);
+ }
+
+ // return the given filter
+ return filter;
+ }
+
+ private Object getValue(Object writable, PrimitiveObjectInspector poi) throws IOException {
+ PrimitiveCategory pc = poi.getPrimitiveCategory();
+ switch (poi.getPrimitiveCategory()) {
+ case INT:
+ return ((IntWritable) writable).get();
+ case BOOLEAN:
+ return ((BooleanWritable) writable).get();
+ case LONG:
+ return ((LongWritable) writable).get();
+ case FLOAT:
+ return ((FloatWritable) writable).get();
+ case DOUBLE:
+ return ((DoubleWritable) writable).get();
+ case SHORT:
+ return ((ShortWritable) writable).get();
+ case STRING:
+ return ((Text) writable).toString();
+ case BYTE:
+ return ((ByteWritable) writable).get();
+
+ default:
+ throw new IllegalStateException("Type not supported " + pc);
+ }
+ }
+}
\ No newline at end of file
diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java
index 13c344b..f025b1c 100644
--- hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java
+++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseTestCompositeKey.java
@@ -21,6 +21,10 @@
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
@@ -54,4 +58,9 @@ public Object getField(int fieldID) {
return toLazyObject(fieldID, fieldBytes);
}
+
+ @Override
+ public Filter getFilter(int fieldID, Object... values) {
+ return new FamilyFilter(CompareOp.EQUAL, new BinaryComparator("astring".getBytes()));
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
index d39ee2e..039af8f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
@@ -40,11 +40,11 @@
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/**
* IndexPredicateAnalyzer decomposes predicates, separating the parts
@@ -193,12 +193,14 @@ private ExprNodeDesc analyzeExpr(
return expr;
}
+ boolean isStruct = ((expr1 instanceof ExprNodeFieldDesc) || (expr2 instanceof ExprNodeFieldDesc));
+
searchConditions.add(
new IndexSearchCondition(
columnDesc,
udfName,
constantDesc,
- expr));
+ expr, isStruct));
// we converted the expression to a search condition, so
// remove it from the residual predicate
diff --git ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java
index 5f1329c..9658636 100644
--- ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java
+++ ql/src/java/org/apache/hadoop/hive/ql/index/IndexSearchCondition.java
@@ -32,6 +32,7 @@
private String comparisonOp;
private ExprNodeConstantDesc constantDesc;
private ExprNodeGenericFuncDesc comparisonExpr;
+ private boolean isStruct;
/**
* Constructs a search condition, which takes the form
@@ -50,12 +51,36 @@ public IndexSearchCondition(
ExprNodeColumnDesc columnDesc,
String comparisonOp,
ExprNodeConstantDesc constantDesc,
- ExprNodeGenericFuncDesc comparisonExpr) {
+ ExprNodeGenericFuncDesc comparisonExpr){
+ this(columnDesc, comparisonOp, constantDesc, comparisonExpr, false);
+ }
+
+ /**
+ * Constructs a search condition, which takes the form
+ * column-ref comparison-op constant-value. + * + * @param columnDesc column being compared + * + * @param comparisonOp comparison operator, e.g. "=" + * (taken from GenericUDFBridge.getUdfName()) + * + * @param constantDesc constant value to search for + * + * @param comparisonExpr the original comparison expression + * @param isStruct whether the column is a struct + */ + public IndexSearchCondition( + ExprNodeColumnDesc columnDesc, + String comparisonOp, + ExprNodeConstantDesc constantDesc, + ExprNodeGenericFuncDesc comparisonExpr, + boolean isStruct) { this.columnDesc = columnDesc; this.comparisonOp = comparisonOp; this.constantDesc = constantDesc; this.comparisonExpr = comparisonExpr; + this.isStruct = isStruct; } public void setColumnDesc(ExprNodeColumnDesc columnDesc) { @@ -90,8 +115,16 @@ public ExprNodeGenericFuncDesc getComparisonExpr() { return comparisonExpr; } + public boolean isStruct(){ + return isStruct; + } + + public void setIsStruct(boolean isStruct){ + this.isStruct = isStruct; + } + @Override public String toString() { return comparisonExpr.getExprString(); } -} +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java index e50026b..7ebcba1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java @@ -26,12 +26,14 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.util.ReflectionUtils; @@ -85,7 +87,7 @@ public static ExprNodeDesc replace(ExprNodeDesc origin, /** * return true if predicate is already included in source - */ + */ public static boolean containsPredicate(ExprNodeDesc source, ExprNodeDesc predicate) { if (source.isSame(predicate)) { return true; @@ -153,12 +155,12 @@ public static ExprNodeDesc mergePredicates(List