diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java index 18fb5ea..7342e7c 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java @@ -18,26 +18,49 @@ package org.apache.hadoop.hive.hbase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hive.hbase.predicate.ComplexPredicateAnalyzer; +import org.apache.hadoop.hive.hbase.predicate.RangeTranslator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.mapred.JobConf; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; public abstract class AbstractHBaseKeyFactory implements HBaseKeyFactory { + protected static final Log LOG = LogFactory.getLog(HBaseKeyFactory.class); + protected HBaseSerDeParameters hbaseParams; protected ColumnMappings.ColumnMapping keyMapping; + protected LazySimpleSerDe.SerDeParameters serdeParams; protected Properties properties; @Override public void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException { this.hbaseParams = hbaseParam; this.keyMapping = hbaseParam.getKeyColumnMapping(); + this.serdeParams = hbaseParam.getSerdeParams(); this.properties = properties; } @@ -48,6 +71,81 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOExce @Override public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { + if (this instanceof RangeTranslator) { + DecomposedPredicate decomposed = new DecomposedPredicate(); + RangeTranslator translator = (RangeTranslator) this; + ExprNodeGenericFuncDesc analyzed = ComplexPredicateAnalyzer.analyze(predicate, translator, jobConf); + if (analyzed == null || !predicate.isSame(analyzed)) { + decomposed.residualPredicate = (ExprNodeGenericFuncDesc) predicate; + } + if (analyzed != null) { + decomposed.pushedPredicateObject = Utilities.serializeExpression(analyzed); + decomposed.pushedPredicateFactory = NormalizedExprFactory.class.getName(); + } + return decomposed; + } return HBaseStorageHandler.decomposePredicate(jobConf, (HBaseSerDe) deserializer, predicate); } + + public static class NormalizedExprFactory implements HBaseScanFactory { + + @Override + public Class getObjectClass() { + return String.class; + } + + @Override + public List createScan(Scan emptyScan, String object) throws IOException { + + ExprNodeDesc filterExpr = Utilities.deserializeExpression(object); + + List scans = new ArrayList(); + for (ExprNodeGenericFuncDesc predicate : decomposeOR(filterExpr)) { + GenericUDF udf = predicate.getGenericUDF(); + byte[] startKey = HConstants.EMPTY_START_ROW; + byte[] endKey = HConstants.EMPTY_END_ROW; + if (udf instanceof GenericUDFBetween) { + ExprNodeConstantDesc startDesc = (ExprNodeConstantDesc) predicate.getChildren().get(2); + ExprNodeConstantDesc endDesc = (ExprNodeConstantDesc) predicate.getChildren().get(3); + startKey = Base64.decode((String) startDesc.getValue()); + endKey = Base64.decode((String) endDesc.getValue()); + } else if (udf instanceof GenericUDFOPEqualOrGreaterThan) { + ExprNodeConstantDesc startDesc = (ExprNodeConstantDesc) predicate.getChildren().get(1); + startKey = Base64.decode((String) startDesc.getValue()); + } else if (udf instanceof GenericUDFOPEqualOrLessThan) { + ExprNodeConstantDesc endDesc = (ExprNodeConstantDesc) predicate.getChildren().get(1); + endKey = Base64.decode((String) endDesc.getValue()); + } + Scan scan = new Scan(emptyScan); + scan.setStartRow(startKey); + scan.setStopRow(endKey); + + scans.add(scan); + } + return scans; + } + + private List decomposeOR(ExprNodeDesc input) { + List decomposed = new ArrayList(); + decomposeOR(input, decomposed); + return decomposed; + } + + private void decomposeOR(ExprNodeDesc node, List decomposed) { + if (node instanceof ExprNodeNullDesc) { + return; + } + ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) node; + if (funcDesc.getGenericUDF() instanceof GenericUDFOPOr) { + decomposeOR(funcDesc.getChildren().get(0), decomposed); + decomposeOR(funcDesc.getChildren().get(1), decomposed); + } else { + decomposed.add(funcDesc); + } + } + + @Override + public void configure(JobConf job) { + } + } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java index 0cc21fa..ba1fe51 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java @@ -17,16 +17,20 @@ */ package org.apache.hadoop.hive.hbase; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.mapred.JobConf; /** * Simple abstract class to help with creation of a {@link DecomposedPredicate}. In order to create @@ -52,6 +56,7 @@ public DecomposedPredicate decomposePredicate(String keyColName, ExprNodeDesc pr decomposed.pushedPredicate = analyzer.translateSearchConditions(conditions); try { decomposed.pushedPredicateObject = getScanRange(conditions); + decomposed.pushedPredicateFactory = HBaseScanRangeFactory.class.getName(); } catch (Exception e) { LOG.warn("Failed to decompose predicates", e); return null; @@ -68,9 +73,34 @@ public DecomposedPredicate decomposePredicate(String keyColName, ExprNodeDesc pr protected abstract HBaseScanRange getScanRange(List searchConditions) throws Exception; + public static class HBaseScanRangeFactory implements HBaseScanFactory { + + private JobConf jobConf; + + @Override + public void configure(JobConf jobConf) { + this.jobConf = jobConf; + } + + @Override + public Class getObjectClass() { + return HBaseScanRange.class; + } + + @Override + public List createScan(Scan emptyScan, HBaseScanRange object) throws IOException { + Scan scan = new Scan(emptyScan); + try { + return Arrays.asList(object.setup(scan, jobConf)); + } catch (Exception e) { + throw new IOException(e); + } + } + } + /** * Get an optional {@link IndexPredicateAnalyzer.FieldValidator validator}. A validator can be - * used to optinally filter out the predicates which need not be decomposed. By default this + * used to optionally filter out the predicates which need not be decomposed. By default this * method returns {@code null} which means that all predicates are pushed but consumers can choose * to override this to provide a custom validator as well. * */ diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java index 8735fbc..8e9248d 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ColumnMappings.java @@ -336,6 +336,10 @@ public String getQualifierName() { return binaryStorage; } + public boolean getBinaryStorage(int index) { + return binaryStorage.get(index); + } + public boolean isHbaseRowKey() { return hbaseRowKey; } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java index 480b31f..222fa01 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java @@ -39,8 +39,6 @@ private final Class keyClass; private final Constructor constructor; - private Configuration conf; - public CompositeHBaseKeyFactory(Class keyClass) throws Exception { // see javadoc of HBaseCompositeKey this.keyClass = keyClass; diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java index 5731e45..e988168 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/DefaultHBaseKeyFactory.java @@ -49,7 +49,7 @@ public ObjectInspector createKeyObjectInspector(TypeInfo type) throws SerDeExcep @Override public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException { - return LazyFactory.createLazyObject(inspector, keyMapping.binaryStorage.get(0)); + return LazyFactory.createLazyObject(inspector, keyMapping.getBinaryStorage(0)); } @Override diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java index fe6081e..869e8a5 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseRowSerializer.java @@ -121,7 +121,7 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws Excep } // use the serialization option switch to write primitive values as either a variable // length UTF8 string or a fixed width bytes if serializing in binary format - boolean writeBinary = keyMapping.binaryStorage.get(0); + boolean writeBinary = keyMapping.getBinaryStorage(0); return serialize(keyValue, keyFieldOI, 1, writeBinary); } @@ -147,14 +147,14 @@ private void serializeField( for (Map.Entry entry: map.entrySet()) { // Get the Key // Map keys are required to be primitive and may be serialized in binary format - byte[] columnQualifierBytes = serialize(entry.getKey(), koi, 3, colMap.binaryStorage.get(0)); + byte[] columnQualifierBytes = serialize(entry.getKey(), koi, 3, colMap.getBinaryStorage(0)); if (columnQualifierBytes == null) { continue; } // Map values may be serialized in binary format when they are primitive and binary // serialization is the option selected - byte[] bytes = serialize(entry.getValue(), voi, 3, colMap.binaryStorage.get(1)); + byte[] bytes = serialize(entry.getValue(), voi, 3, colMap.getBinaryStorage(1)); if (bytes == null) { continue; } @@ -176,7 +176,7 @@ private void serializeField( } else { // use the serialization option switch to write primitive values as either a variable // length UTF8 string or a fixed width bytes if serializing in binary format - bytes = serialize(value, foi, 1, colMap.binaryStorage.get(0)); + bytes = serialize(value, foi, 1, colMap.getBinaryStorage(0)); } if (bytes == null) { diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanFactory.java new file mode 100644 index 0000000..6c9a240 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanFactory.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.mapred.JobConfigurable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +public interface HBaseScanFactory extends JobConfigurable { + + Class getObjectClass(); + + List createScan(Scan emptyScan, T object) throws IOException; +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java index 8b64321..1f5b5b0 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java @@ -54,11 +54,11 @@ public void setStopRow(byte[] stopRow) { public void addFilter(Filter filter) throws Exception { Class clazz = filter.getClass(); - clazz.getMethod("parseFrom", byte[].class); // valiade + clazz.getMethod("parseFrom", byte[].class); // validate filterDescs.add(new FilterDesc(clazz.getName(), filter.toByteArray())); } - public void setup(Scan scan, Configuration conf) throws Exception { + public Scan setup(Scan scan, Configuration conf) throws Exception { if (startRow != null) { scan.setStartRow(startRow); } @@ -66,17 +66,18 @@ public void setup(Scan scan, Configuration conf) throws Exception { scan.setStopRow(stopRow); } if (filterDescs.isEmpty()) { - return; + return scan; } if (filterDescs.size() == 1) { scan.setFilter(filterDescs.get(0).toFilter(conf)); - return; + return scan; } List filters = new ArrayList(); for (FilterDesc filter : filterDescs) { filters.add(filter.toFilter(conf)); } scan.setFilter(new FilterList(filters)); + return scan; } public String toString() { diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 6c1ce5c..166f638 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -30,10 +30,13 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -101,6 +104,21 @@ public static ColumnMappings parseColumnsMapping(String columnsMappingSpec) throws SerDeException { return parseColumnsMapping(columnsMappingSpec, true); } + + public static ColumnMappings parseColumnsMappingRuntime(String columnsMappingSpec, + String hbaseColumnsNames, String hbaseColumnsTypes, String hbaseTableStorageType, + boolean doColumnRegexMatching) throws IOException { + try { + ColumnMappings mappings = parseColumnsMapping(columnsMappingSpec, doColumnRegexMatching); + mappings.setHiveColumnDescription(HBaseSerDe.class.getName(), + Arrays.asList(hbaseColumnsNames.split(",")).subList(0, mappings.size()), + TypeInfoUtils.getTypeInfosFromTypeString(hbaseColumnsTypes).subList(0, mappings.size())); + mappings.parseColumnStorageTypes(hbaseTableStorageType); + return mappings; + } catch (Exception e) { + throw new IOException(e); + } + } /** * Parses the HBase columns mapping specifier to identify the column families, qualifiers * and also caches the byte arrays corresponding to them. One of the Hive table diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index feb3cd1..3e0c63f 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -496,7 +496,7 @@ public static DecomposedPredicate decomposePredicate( ExprNodeDesc predicate) { ColumnMapping keyMapping = hBaseSerDe.getHBaseSerdeParam().getKeyColumnMapping(); IndexPredicateAnalyzer analyzer = HiveHBaseTableInputFormat.newIndexPredicateAnalyzer( - keyMapping.columnName, keyMapping.columnType, keyMapping.binaryStorage.get(0)); + keyMapping.columnName, keyMapping.columnType, keyMapping.getBinaryStorage(0)); List searchConditions = new ArrayList(); ExprNodeGenericFuncDesc residualPredicate = diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java index 5aa1d79..3be8584 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java @@ -23,8 +23,23 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobConf; import java.io.IOException; @@ -34,7 +49,7 @@ /** * Util code common between HiveHBaseTableInputFormat and HiveHBaseTableSnapshotInputFormat. */ -class HiveHBaseInputFormatUtil { +public class HiveHBaseInputFormatUtil { /** * Parse {@code jobConf} to create the target {@link HTable} instance. @@ -130,27 +145,57 @@ public static Scan getScan(JobConf jobConf) throws IOException { return scan; } - public static boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ + public static byte[] toByteArray(Object value, boolean appendBA, boolean isKeyBinary) + throws Exception { + byte[] bytes = toByteArray(new ExprNodeConstantDesc(value), isKeyBinary); + return appendBA ? getNextBA(bytes) : bytes; + } - String[] mapInfo = spec.split("#"); - boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat); + public static byte[] toByteArray(ExprNodeConstantDesc constant, boolean isKeyBinary) + throws Exception { - switch (mapInfo.length) { - case 1: - return tblLevelDefault; + ExprNodeEvaluator eval = ExprNodeEvaluatorFactory.get(constant); + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) eval.initialize(null); + Object writable = eval.evaluate(null); - case 2: - String storageType = mapInfo[1]; - if(storageType.equals("-")) { - return tblLevelDefault; - } else if ("string".startsWith(storageType)){ - return false; - } else if ("binary".startsWith(storageType)){ - return true; - } + if (!isKeyBinary) { + // Key is stored in text format. Get bytes representation of constant also of + // text format. + ByteStream.Output stream = new ByteStream.Output(); + LazyUtils.writePrimitiveUTF8(stream, writable, poi, false, (byte) 0, null); + return stream.toByteArray(); + } + switch (poi.getPrimitiveCategory()) { + case INT: + return Bytes.toBytes(((IntWritable) writable).get()); + case BOOLEAN: + return Bytes.toBytes(((BooleanWritable) writable).get()); + case LONG: + return Bytes.toBytes(((LongWritable) writable).get()); + case FLOAT: + return Bytes.toBytes(((FloatWritable) writable).get()); + case DOUBLE: + return Bytes.toBytes(((DoubleWritable) writable).get()); + case SHORT: + return Bytes.toBytes(((ShortWritable) writable).get()); + case STRING: + return Bytes.toBytes(writable.toString()); + case BYTE: + return Bytes.toBytes(((ByteWritable) writable).get()); + case TIMESTAMP: + return Bytes.toBytes(((TimestampWritable)writable).getTimestamp().getTime()); default: - throw new IOException("Malformed string: " + spec); + throw new IOException("Type not supported " + poi.getPrimitiveCategory()); } } + + public static byte[] getNextBA(byte[] current) { + // startRow is inclusive while stopRow is exclusive, + // this util method returns very next bytearray which will occur after the current one + // by padding current one with a trailing 0 byte. + byte[] next = new byte[current.length + 1]; + System.arraycopy(current, 0, next, 0, current.length); + return next; + } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 4ac0803..57f0a05 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -19,8 +19,12 @@ package org.apache.hadoop.hive.hbase; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,30 +40,16 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; -import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; -import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.lazy.LazyUtils; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -69,6 +59,7 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.ReflectionUtils; /** * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler @@ -160,55 +151,42 @@ public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws }; } - /** - * Converts a filter (which has been pushed down from Hive's optimizer) - * into corresponding restrictions on the HBase scan. The - * filter should already be in a form which can be fully converted. - * - * @param jobConf configuration for the scan - * - * @param iKey 0-based offset of key column within Hive table - * - * @return converted table split if any - */ - private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) + private List filterToScans(JobConf jobConf, ColumnMappings columnsMapping) throws IOException { - // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL + Scan empty = emptyScanner(columnsMapping); - Scan scan = new Scan(); - String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); - if (filterObjectSerialized != null) { - HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized, - HBaseScanRange.class); - try { - range.setup(scan, jobConf); - } catch (Exception e) { - throw new IOException(e); - } - return scan; + String filterFactory = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_FACTORY); + String filterObject = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); + if (filterFactory != null && filterObject != null) { + HBaseScanFactory factory = createFilterFactory(jobConf, filterFactory); + Serializable range = Utilities.deserializeObject(filterObject, factory.getObjectClass()); + return factory.createScan(empty, range); } - - String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - if (filterExprSerialized == null) { - return scan; + String filterExprStr = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filterExprStr == null) { + return Arrays.asList(empty); } + ExprNodeGenericFuncDesc filterExpr = - Utilities.deserializeExpression(filterExprSerialized); + Utilities.deserializeExpression(filterExprStr); - String colName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; - String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey]; - IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(colName,colType, isKeyBinary); + ColumnMapping keyMapping = columnsMapping.getKeyMapping(); + + String colName = keyMapping.getColumnName(); + TypeInfo colType = keyMapping.getColumnType(); + boolean isKeyBinary = keyMapping.getBinaryStorage(0); + IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(colName, colType, isKeyBinary); List searchConditions = - new ArrayList(); + new ArrayList(); ExprNodeDesc residualPredicate = - analyzer.analyzePredicate(filterExpr, searchConditions); + analyzer.analyzePredicate(filterExpr, searchConditions); // There should be no residual since we already negotiated that earlier in // HBaseStorageHandler.decomposePredicate. However, with hive.optimize.index.filter // OpProcFactory#pushFilterToStorageHandler pushes the original filter back down again. - // Since pushed-down filters are not ommitted at the higher levels (and thus the + // Since pushed-down filters are not omitted at the higher levels (and thus the // contract of negotiation is ignored anyway), just ignore the residuals. // Re-assess this when negotiation is honored and the duplicate evaluation is removed. // THIS IGNORES RESIDUAL PARSING FROM HBaseStorageHandler#decomposePredicate @@ -218,28 +196,15 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) // Convert the search condition into a restriction on the HBase scan byte [] startRow = HConstants.EMPTY_START_ROW, stopRow = HConstants.EMPTY_END_ROW; - for (IndexSearchCondition sc : searchConditions){ - - ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc()); - PrimitiveObjectInspector objInspector; - Object writable; - - try { - objInspector = (PrimitiveObjectInspector)eval.initialize(null); - writable = eval.evaluate(null); - } catch (ClassCastException cce) { - throw new IOException("Currently only primitve types are supported. Found: " + - sc.getConstantDesc().getTypeString()); - } catch (HiveException e) { - throw new IOException(e); - } + for (IndexSearchCondition sc : searchConditions) { + + byte[] constantVal = toBinaryValue(sc.getConstantDesc(), isKeyBinary); - byte [] constantVal = getConstantVal(writable, objInspector, isKeyBinary); String comparisonOp = sc.getComparisonOp(); - if("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){ + if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){ startRow = constantVal; - stopRow = getNextBA(constantVal); + stopRow = HiveHBaseInputFormatUtil.getNextBA(constantVal); } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan".equals(comparisonOp)){ stopRow = constantVal; } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan" @@ -247,69 +212,43 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) startRow = constantVal; } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan" .equals(comparisonOp)){ - startRow = getNextBA(constantVal); + startRow = HiveHBaseInputFormatUtil.getNextBA(constantVal); } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan" .equals(comparisonOp)){ - stopRow = getNextBA(constantVal); + stopRow = HiveHBaseInputFormatUtil.getNextBA(constantVal); } else { throw new IOException(comparisonOp + " is not a supported comparison operator"); } } - scan.setStartRow(startRow); - scan.setStopRow(stopRow); + empty.setStartRow(startRow); + empty.setStopRow(stopRow); if (LOG.isDebugEnabled()) { LOG.debug(Bytes.toStringBinary(startRow) + " ~ " + Bytes.toStringBinary(stopRow)); } - return scan; + return Arrays.asList(empty); } - private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, - boolean isKeyBinary) throws IOException{ - - if (!isKeyBinary){ - // Key is stored in text format. Get bytes representation of constant also of - // text format. - byte[] startRow; - ByteStream.Output serializeStream = new ByteStream.Output(); - LazyUtils.writePrimitiveUTF8(serializeStream, writable, poi, false, (byte) 0, null); - startRow = new byte[serializeStream.getLength()]; - System.arraycopy(serializeStream.getData(), 0, startRow, 0, serializeStream.getLength()); - return startRow; - } - - PrimitiveCategory pc = poi.getPrimitiveCategory(); - switch (poi.getPrimitiveCategory()) { - case INT: - return Bytes.toBytes(((IntWritable)writable).get()); - case BOOLEAN: - return Bytes.toBytes(((BooleanWritable)writable).get()); - case LONG: - return Bytes.toBytes(((LongWritable)writable).get()); - case FLOAT: - return Bytes.toBytes(((FloatWritable)writable).get()); - case DOUBLE: - return Bytes.toBytes(((DoubleWritable)writable).get()); - case SHORT: - return Bytes.toBytes(((ShortWritable)writable).get()); - case STRING: - return Bytes.toBytes(((Text)writable).toString()); - case BYTE: - return Bytes.toBytes(((ByteWritable)writable).get()); - - default: - throw new IOException("Type not supported " + pc); - } - } - + private byte[] toBinaryValue(ExprNodeConstantDesc constant, boolean keyBinary) throws IOException { + byte[] constantVal; + try { + constantVal = HiveHBaseInputFormatUtil.toByteArray(constant, keyBinary); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + return constantVal; + } - private byte[] getNextBA(byte[] current){ - // startRow is inclusive while stopRow is exclusive, - // this util method returns very next bytearray which will occur after the current one - // by padding current one with a trailing 0 byte. - byte[] next = new byte[current.length + 1]; - System.arraycopy(current, 0, next, 0, current.length); - return next; + private HBaseScanFactory createFilterFactory(JobConf jobConf, String filterFactory) + throws IOException { + try { + Class clazz = jobConf.getClassByName(filterFactory); + return (HBaseScanFactory) ReflectionUtils.newInstance(clazz, jobConf); + } catch (Exception e) { + throw new IOException(e); + } } static IndexPredicateAnalyzer newIndexPredicateAnalyzer( @@ -359,36 +298,48 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); + + String hbaseColumnNames = jobConf.get(serdeConstants.LIST_COLUMNS); + String hbaseColumnTypes = jobConf.get(serdeConstants.LIST_COLUMN_TYPES); String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + String hbaseTableStorageType = jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE); boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); - if (hbaseColumnsMapping == null) { throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table."); } - ColumnMappings columnMappings = null; - try { - columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); - } catch (SerDeException e) { - throw new IOException(e); - } + ColumnMappings columnMappings = + HBaseSerDe.parseColumnsMappingRuntime(hbaseColumnsMapping, + hbaseColumnNames, hbaseColumnTypes, hbaseTableStorageType, doColumnRegexMatching); - int iKey = columnMappings.getKeyIndex(); ColumnMapping keyMapping = columnMappings.getKeyMapping(); + Job job = new Job(jobConf); + JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); + Path[] tablePaths = FileInputFormat.getInputPaths(jobContext); + // Take filter pushdown into account while calculating splits; this // allows us to prune off regions immediately. Note that although // the Javadoc for the superclass getSplits says that it returns one // split per region, the implementation actually takes the scan // definition into account and excludes regions which don't satisfy // the start/stop row conditions (HBASE-1829). - Scan scan = createFilterScan(jobConf, iKey, - HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec, - jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); + List results = new ArrayList(); + for (Scan scan : filterToScans(jobConf, columnMappings)) { + setScan(scan); + for (org.apache.hadoop.mapreduce.InputSplit split : super.getSplits(jobContext)) { + results.add(new HBaseSplit((TableSplit) split, tablePaths[0])); + } + } + return results.toArray(new InputSplit[results.size()]); + } + + private Scan emptyScanner(ColumnMappings columnMappings) { // The list of families that have been added to the scan - List addedFamilies = new ArrayList(); + Scan scan = new Scan(); + Set addedFamilies = new HashSet(); // REVIEW: are we supposed to be applying the getReadColumnIDs // same as in getRecordReader? for (ColumnMapping colMap : columnMappings) { @@ -406,20 +357,6 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( } } } - setScan(scan); - - Job job = new Job(jobConf); - JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); - Path [] tablePaths = FileInputFormat.getInputPaths(jobContext); - - List splits = - super.getSplits(jobContext); - InputSplit [] results = new InputSplit[splits.size()]; - - for (int i = 0; i < splits.size(); i++) { - results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]); - } - - return results; + return scan; } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java index 3e8b8fd..9dcc53e 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java @@ -82,7 +82,7 @@ protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) thro return new LazyHBaseCellMap((LazyMapObjectInspector) fieldRef.getFieldObjectInspector()); } return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector(), - colMap.binaryStorage.get(0)); + colMap.getBinaryStorage(0)); } /** diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/OrPredicateHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/OrPredicateHBaseKeyFactory.java new file mode 100644 index 0000000..957b148 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/OrPredicateHBaseKeyFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hive.hbase.predicate.ColumnRanges; +import org.apache.hadoop.hive.hbase.predicate.KeyRange; +import org.apache.hadoop.hive.hbase.predicate.Range; +import org.apache.hadoop.hive.hbase.predicate.RangeTranslator; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class OrPredicateHBaseKeyFactory extends DefaultHBaseKeyFactory implements RangeTranslator { + + @Override + public List translate(List> ranges) { + String columnName = keyMapping.getColumnName(); + boolean binaryStorage = keyMapping.getBinaryStorage(0); + try { + return ColumnRanges.toKeyRanges(ranges, columnName, binaryStorage); + } catch (Exception e) { + LOG.info("Failed to translate predicate " + ranges, e); + } + return null; + } + + @Override + public List getKeyColumns() { + return Arrays.asList(keyMapping.getColumnName()); + } + +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/ColumnRanges.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/ColumnRanges.java new file mode 100644 index 0000000..9f220e5 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/ColumnRanges.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase.predicate; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class ColumnRanges { + + /** + * Converts OR sets to column ranges list. + * @param orSet + * @param keyColumns + * @return + */ + static List> toColRangesList(Node.OrSet orSet) { + + List> columnRangesList = new ArrayList>(); + + for (Node child : orSet.getChildren()) { + // For each AND clause + Node.AndSet andSet = (Node.AndSet) child; + + // AND conjugated predicates for each column + Map columnRanges = new LinkedHashMap(); + boolean isContributingRangeSet = true; + + // For each column + for (Map.Entry> entry : + groupComparisonsByColumn(andSet).entrySet()) { + String column = entry.getKey(); + List comparisons = entry.getValue(); + + // Apply its all comparisons to narrow its range. + Range range = Range.create(comparisons); + + // If this column covers everywhere, ignore it and its followings. + if (range.isEverything()) { + columnRanges.put(column, range); + break; + } + + // If this column range does not contribute, ignore followings. + if (range.isEmpty()) { + isContributingRangeSet = false; + break; + } + + // Otherwise, continue to process followings. + columnRanges.put(column, range); + } + + // If it contributes, add it to the result. + if (isContributingRangeSet) { + columnRangesList.add(columnRanges); + } + } + return columnRangesList; + } + + // Group comparisons by column + private static Map> groupComparisonsByColumn( + Node.AndSet andSet) { + + Map> columnToComparisons = + new LinkedHashMap>(); + + // For each nodes + for (Node child : andSet.getChildren()) { + + // If it is a comparison, add it on a corresponding group. + if (child instanceof Comparison) { + Comparison comparison = (Comparison) child; + String column = comparison.getColumn().getColumnName(); + List comparisons = columnToComparisons.get(column); + if (comparisons == null) { + columnToComparisons.put(column, comparisons = new ArrayList()); + } + comparisons.add(comparison); + } + } + return columnToComparisons; + } + + // Convert column range of key to hbase range + public static List toKeyRanges( + List> ranges, String keyColumn, boolean binary) throws Exception { + + List keyRanges = new ArrayList(); + for (Map range : ranges) { + if (range.size() != 1) { + return null; + } + String column = range.keySet().iterator().next(); + Range colRange = range.get(column); + + if (!column.equalsIgnoreCase(keyColumn)) { + return null; + } + keyRanges.add(colRange.serialize(column, binary)); + } + return keyRanges; + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/Comparison.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/Comparison.java new file mode 100644 index 0000000..553393c --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/Comparison.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase.predicate; + +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual; + +import java.util.ArrayList; +import java.util.List; + +class Comparison implements Node { + private final List children; + private Comparator comparator; + + public Comparison(GenericUDFBaseCompare comparator, Node leftChild, Node rightChild) { + this(toComparator(comparator), leftChild, rightChild); + } + + static Comparator toComparator(GenericUDFBaseCompare comparator) { + if (comparator instanceof GenericUDFOPGreaterThan) { + return Comparator.GT; + } + else if (comparator instanceof GenericUDFOPEqualOrGreaterThan) { + return Comparator.GE; + } + else if (comparator instanceof GenericUDFOPLessThan) { + return Comparator.LT; + } + else if (comparator instanceof GenericUDFOPEqualOrLessThan) { + return Comparator.LE; + } + else if (comparator instanceof GenericUDFOPEqual) { + return Comparator.EQ; + } + else if (comparator instanceof GenericUDFOPNotEqual) { + return Comparator.NE; + } + throw new IllegalArgumentException(comparator.getUdfName() + " is not supported."); + } + + public Comparison(Comparator comparator, Node leftChild, Node rightChild) { + children = new ArrayList(); + children.add(leftChild); + children.add(rightChild); + this.comparator = comparator; + } + + public List getChildren() { + return children; + } + + public String toString() { + return "(" + children.get(0) + " " + comparator + " " + children.get(1) + ")"; + } + + public Column getColumn() { + for (Node child : children) { + if (child instanceof Column) { + return (Column) child; + } + } + return null; + } + + public Constant getConstant() { + for (Node child : children) { + if (child instanceof Constant) { + return (Constant) child; + } + } + return null; + } + + public Comparison flip() { + return new Comparison(flip(comparator), children.get(1), children.get(0)); + } + + private Comparator flip(Comparator comparator) { + switch (comparator) { + case EQ: + return Comparator.NE; + case NE: + return Comparator.EQ; + case GT: + return Comparator.LE; + case GE: + return Comparator.LT; + case LT: + return Comparator.GE; + case LE: + return Comparator.GT; + default: + throw new IllegalArgumentException(comparator.toString()); + } + } + + public Comparator getComparator() { + return comparator; + } + + public static enum Comparator { + GE(">="), GT(">"), LE("<="), LT("<"), EQ("="), NE("!="); + + private String representation; + + Comparator(String representation) { + this.representation = representation; + } + + public String toString() { + return representation; + } + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/ComplexPredicateAnalyzer.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/ComplexPredicateAnalyzer.java new file mode 100644 index 0000000..e61c9eb --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/ComplexPredicateAnalyzer.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.hive.hbase.predicate; + +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Analyzes the given expression to an optimal row key scan plan. + * It uses disjunctive normal form(DNF) for simplification. DNF seems like + * "(a AND b) OR (c AND d) OR (e AND f)". The outer structure is a set of OR operations, and the inner + * structure is a set of AND operations. Each inner structure corresponds to a scan range. + */ +public class ComplexPredicateAnalyzer { + /** + * Analyzes an expression and returns an optimal scan plan. + * @param expr An expressions to analyze + * @param keyColumns A list of column names + * @return An optimal scan plan. It includes a set of OR operations that include only + * row key ranges in base64 encoding. + * @throws IOException + */ + public static ExprNodeGenericFuncDesc analyze( + ExprNodeDesc expr, RangeTranslator factory, JobConf jobConf) { + // Convert to simple node + Node node = NodeConverter.toNode(expr, factory.getKeyColumns()); + if (node == Node.IGNORED) { + return null; // nothing to pushdown + } + + // Analyze + List analyzed = analyze(node, factory, jobConf); + + // Convert back to expr + return KeyRange.toNormalizedExpr(analyzed); + } + + /** + * Analyzes a node and returns a set or scan range. + * @param node A node to analyze + * @param keyColumns A list of column names + * @return A set of key range to scan. + * @throws IOException + */ + private static List analyze(Node node, RangeTranslator translator, JobConf jobConf) { + + // Translate the expression to disjunctive normal form + Node dnf = DNFTranslator.translate(node); + + // Group ANDs to ANDS, ORs to ORS + Node.OrSet orSet = NodeConverter.toOrSet(dnf); + + // Minimize to column range sets + List> colRanges = ColumnRanges.toColRangesList(orSet); + + // Convert to key ranges + List keyRanges = translator.translate(colRanges); + + // Merge intersections + return keyRanges == null ? null : KeyRange.coalesce(keyRanges); + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/DNFTranslator.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/DNFTranslator.java new file mode 100644 index 0000000..bc2c9d3 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/DNFTranslator.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase.predicate; + +import org.apache.hadoop.hive.hbase.predicate.Comparison.Comparator; + +class DNFTranslator { + static Node translate(Node node) { + // Push down and minimize NOTSs + Node pushedDownNots = pushDownNots(node); + + // Bring up ORs + Node broughtUpOrs = bringUpOrs(pushedDownNots); + + return broughtUpOrs; + } + + private static Node bringUpOrs(Node node) { + if (node instanceof Node.And) { + Node leftChild = node.getChildren().get(0); + Node rightChild = node.getChildren().get(1); + leftChild = bringUpOrs(leftChild); + rightChild = bringUpOrs(rightChild); + + // a AND (b OR c) -> (a AND b) OR (a AND c) + if (rightChild instanceof Node.Or) { + Node a = leftChild; + Node b = rightChild.getChildren().get(0); + Node c = rightChild.getChildren().get(1); + return new Node.Or(bringUpOrs(new Node.And(a, b)), bringUpOrs(new Node.And(a, c))); + } + // (a OR b) AND c -> (a AND c) OR (b AND c) + if (leftChild instanceof Node.Or) { + Node a = leftChild.getChildren().get(0); + Node b = leftChild.getChildren().get(1); + Node c = rightChild; + return new Node.Or(bringUpOrs(new Node.And(a, c)), bringUpOrs(new Node.And(b, c))); + } + } else if (node instanceof Node.Or) { + Node leftChild = node.getChildren().get(0); + Node rightChild = node.getChildren().get(1); + leftChild = bringUpOrs(leftChild); + rightChild = bringUpOrs(rightChild); + return new Node.Or(leftChild, rightChild); + } + return node; + } + + private static Node pushDownNots(Node node) { + if (node instanceof Node.Not) { + Node child = node.getChildren().get(0); + // NOT(a AND b) -> (NOT a) OR (NOT b) + if (child instanceof Node.And) { + Node leftGrandChild = child.getChildren().get(0); + Node rightGrandChild = child.getChildren().get(1); + return new Node.Or( + pushDownNots(new Node.Not(leftGrandChild)), + pushDownNots(new Node.Not(rightGrandChild))); + } + // NOT(a OR b) -> (NOT a) AND (NOT b) + if (child instanceof Node.Or) { + Node leftGrandChild = child.getChildren().get(0); + Node rightGrandChild = child.getChildren().get(1); + return new Node.And( + pushDownNots(new Node.Not(leftGrandChild)), + pushDownNots(new Node.Not(rightGrandChild))); + } + // NOT NOT a -> a + if (child instanceof Node.Not) { + Node grandChild = child.getChildren().get(0); + return pushDownNots(grandChild); + } + // NOT (a > b) -> a <= b + if (child instanceof Comparison) { + Comparison comparison = (Comparison) child; + Node leftGrandChild = child.getChildren().get(0); + Node rightGrandChild = child.getChildren().get(1); + switch (comparison.getComparator()) { + case EQ: + return new Node.Or( + new Comparison(Comparator.LT, leftGrandChild, rightGrandChild), + new Comparison(Comparator.GT, leftGrandChild, rightGrandChild)); + case NE: + return new Comparison(Comparator.EQ, leftGrandChild, rightGrandChild); + case GT: + return new Comparison(Comparator.LE, leftGrandChild, rightGrandChild); + case GE: + return new Comparison(Comparator.LT, leftGrandChild, rightGrandChild); + case LT: + return new Comparison(Comparator.GE, leftGrandChild, rightGrandChild); + case LE: + return new Comparison(Comparator.GT, leftGrandChild, rightGrandChild); + } + } + } + if (node instanceof Node.And) { + Node leftChild = node.getChildren().get(0); + Node rightChild = node.getChildren().get(1); + return new Node.And(pushDownNots(leftChild), pushDownNots(rightChild)); + } + if (node instanceof Node.Or) { + Node leftChild = node.getChildren().get(0); + Node rightChild = node.getChildren().get(1); + return new Node.Or(pushDownNots(leftChild), pushDownNots(rightChild)); + } + return node; + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/KeyRange.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/KeyRange.java new file mode 100644 index 0000000..1b0549d --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/KeyRange.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase.predicate; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * A range of HBase key. It contains a start key and an end key. Each key can be null, which represents + * negative infinity(start key) or positive infinity(end key). + */ +public class KeyRange implements Comparable { + + static final Log LOG = LogFactory.getLog(KeyRange.class); + + private final String columnName; + private final byte[] startKey; // A null start key represents negative infinity + private final byte[] endKey; // A null end key represents positive infinity + + public KeyRange(String columnName, byte[] startKey, byte[] endKey) { + this.columnName = columnName; + this.startKey = startKey; + this.endKey = endKey; + } + + public byte[] getStartKey() { + return startKey; + } + + public byte[] getEndKey() { + return endKey; + } + + // (-)-[-] : different + // [-]-(-) : different + // (-[-)-] : overlap + // [-(-]-) : overlap + // [-(-)-] : contains, overlap + // (-[-]-) : contains, overlap + /** + * Checks this range overlaps the given range. + * @param other A range to check + * @return {@code true} if they overlap. {@code false} otherwise. + */ + public boolean overlap(KeyRange other) { + // this.startKey > other.endKey + if (compareStartKeyEndKey(this.startKey, other.endKey) > 0) { + return false; + } + + // other.startKey > this.endKey + if (compareStartKeyEndKey(other.startKey, this.endKey) > 0) { + return false; + } + + return true; + } + + /** + * Checks this range contains the given range. + * @param other A range to check + * @return {@code true} if it contains. {@code false} otherwise. + */ + public boolean contains(KeyRange other) { + int startKeyCompared = compareStartKey(this.startKey, other.startKey); + int endKeyCompared = compareEndKey(this.endKey, other.endKey); + return (startKeyCompared <= 0 && endKeyCompared >= 0); + } + + /** + * Returns the union of this range and the given range. + * @param other A range to unify + * @return A unified range. It starts at the minimum start key of them, and ends at the maximum end key of them. + */ + public KeyRange union(KeyRange other) { + byte[] minStartKey = compareStartKey(this.startKey, other.startKey) <= 0 + ? this.startKey : other.startKey; + byte[] maxEndKey = compareEndKey(this.endKey, other.endKey) >= 0 + ? this.endKey : other.endKey; + return new KeyRange(columnName, minStartKey, maxEndKey); + } + + @Override + public int compareTo(KeyRange other) { + int startKeyCompared = compareStartKey(this.startKey, other.startKey); + if (startKeyCompared != 0) { + return startKeyCompared; + } + int endKeyCompared = compareEndKey(this.endKey, other.endKey); + if (endKeyCompared != 0) { + return endKeyCompared; + } + return 0; + } + + private int compareStartKeyEndKey(byte[] startKey, byte[] endKey) { + if (startKey == null) { + if (endKey == null) { + // startKey = -inf, endKey = +inf + return -1; + } + // startKey = -inf, endKey != +inf + return -1; + } + + if (endKey == null) { + // startKey != -inf, endKey = +inf + return -1; + } + + // startKey != inf, endKey != +inf + return Bytes.compareTo(startKey, endKey); + } + + private int compareStartKey(byte[] a, byte[] b) { + if (a == null) { + // a = -inf, b = -inf + if (b == null) { + return 0; + } + // a = -inf, b != -inf + return -1; + } + + if (b == null) { + // a != -inf, b = -inf + return 1; + } + // a != -inf, b != -inf + return Bytes.compareTo(a, b); + } + + private int compareEndKey(byte[] a, byte[] b) { + if (a == null) { + // a = +inf, b = +inf + if (b == null) { + return 0; + } + // a = +inf, b != +inf + return 1; + } + + if (b == null) { + // a != +inf, b = +inf + return -1; + } + // a != +inf, b != +inf + return Bytes.compareTo(a, b); + } + + public String toString() { + String startString; + String endString; + + if (startKey == null) { + startString = "-inf"; + } else { + startString = Arrays.toString(startKey); + } + + if (endKey == null) { + endString = "+inf"; + } else { + endString = Arrays.toString(endKey); + } + + return "[" + startString + ", " + endString + "]"; + } + + public boolean equals(Object object) { + KeyRange other = (KeyRange) object; + if (!Arrays.equals(this.startKey, other.startKey)) { + return false; + } + if (!Arrays.equals(this.endKey, other.endKey)) { + return false; + } + return true; + } + + /** + * Make each range does not overlap other range, while they represent the same ranges together. + * @param input A list of range to coalesce + * @return A coalesced range list from input + */ + public static List coalesce(List input) { + if (input == null || input.size() <= 1) { + return input; + } + List output = new ArrayList(); + + // Sort ranges by start border value then end border value + Collections.sort(input); + + // Until the input list become empty, + while (!input.isEmpty()) { + + // If just one element is remaining, add it on output and quit + if (input.size() == 1) { + output.add(input.get(0)); + return output; + } + + // Get first two elements from input + KeyRange first = input.get(0); + KeyRange second = input.get(1); + + // If they overlap each other, + if (first.overlap(second)) { + + // If one contains other, then remove the contained one + if (first.contains(second)) { + input.remove(second); + } else if (second.contains(first)) { + input.remove(first); + } else { + // Otherwise, replace them with their union + input.remove(first); + input.remove(second); + input.add(0, first.union(second)); + } + } else { + // Otherwise, the first element does not overlap any range. So move it from input to output + input.remove(first); + output.add(first); + } + } + + // Return the output list + return output; + } + + /** + * Converts simple nodes to Hive ExprNodeDescs. + * @param ranges + * @return + */ + static ExprNodeGenericFuncDesc toNormalizedExpr(List ranges) { + ExprNodeGenericFuncDesc root = null; + for (KeyRange range : ranges) { + ExprNodeGenericFuncDesc expr = toNormalizedExpr(range); + if (expr == null) { + return null; + } + if (root == null) { + root = expr; + } else { + List children = new ArrayList(); + children.add(root); + children.add(expr); + root = new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, new GenericUDFOPOr(), children); + } + } + return root; + } + + public static ExprNodeGenericFuncDesc toNormalizedExpr(KeyRange range) { + return normalize(range.getStartKey(), range.getEndKey()); + } + + public static ExprNodeGenericFuncDesc normalize(byte[] startKey, byte[] endKey) { + + if (LOG.isDebugEnabled()) { + LOG.debug(Bytes.toStringBinary(startKey) + " ~ " + Bytes.toStringBinary(endKey)); + } + + if (startKey != null) { + if (endKey != null) { + List children = new ArrayList(); + children.add(new ExprNodeConstantDesc(false)); + children.add(new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo, "rowkey", "rowkey", false)); + children.add(new ExprNodeConstantDesc(Base64.encodeBytes(startKey))); + children.add(new ExprNodeConstantDesc(Base64.encodeBytes(endKey))); + return new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, new GenericUDFBetween(), children); + } else { + List children = new ArrayList(); + children.add(new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo, "rowkey", "rowkey", false)); + children.add(new ExprNodeConstantDesc(Base64.encodeBytes(startKey))); + return new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, new GenericUDFOPEqualOrGreaterThan(), children); + } + } else if (endKey != null) { + List children = new ArrayList(); + children.add(new ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo, "rowkey", "rowkey", false)); + children.add(new ExprNodeConstantDesc(Base64.encodeBytes(endKey))); + return new ExprNodeGenericFuncDesc( + TypeInfoFactory.booleanTypeInfo, new GenericUDFOPEqualOrLessThan(), children); + } + return null; + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/Node.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/Node.java new file mode 100644 index 0000000..38107a7 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/Node.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase.predicate; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A node is a simple representation of an expression element. + */ +interface Node { + /** + * Returns its children. + * @return A list of child nodes + */ + List getChildren(); + + /** + * A named column. + */ + class Column implements Node { + + private final String columnName; + + public Column(String columnName) { + this.columnName = columnName; + } + + public String getColumnName() { + return columnName; + } + + public List getChildren() { + return Collections.emptyList(); + } + + public String toString() { + return columnName; + } + } + + /** + * AND operation. + */ + class And implements Node { + private final List children; + + public And(Node left, Node right) { + children = new ArrayList(2); + children.add(left); + children.add(right); + } + + public List getChildren() { + return children; + } + + public String toString() { + return "AND(" + children.get(0) + ", " + children.get(1) + ")"; + } + } + + /** + * A constant value. + */ + class Constant implements Node { + private final Comparable value; + + public Constant(Comparable value) { + this.value = value; + } + + public Comparable getValue() { + return value; + } + + public List getChildren() { + return Collections.emptyList(); + } + + public String toString() { + return String.valueOf(value); + } + } + + public static Ignored IGNORED = new Ignored(); + + /** + * Ignored node. If an expression has a column that is not related with row key, or not supported function, + * then it will be ignored. AND, NOT, OR operations have different effects on its propagation. + */ + class Ignored implements Node { + + public List getChildren() { + return Collections.emptyList(); + } + + public String toString() { + return "IGNORED"; + } + } + + /** + * NOT operation. + */ + class Not implements Node { + private final List children; + + public Not(Node node) { + children = new ArrayList(1); + children.add(node); + } + + public List getChildren() { + return children; + } + + public String toString() { + return "NOT (" + children.get(0) + ")"; + } + } + + /** + * OR operation. + */ + class Or implements Node { + private final List children; + + public Or(Node left, Node right) { + children = new ArrayList(2); + children.add(left); + children.add(right); + } + + public List getChildren() { + return children; + } + + public String toString() { + return "OR(" + children.get(0) + ", " + children.get(1) + ")"; + } + } + + /** + * OrSet is a flat representation of the DNF outer structure. (...) OR (...) OR (...) + */ + class OrSet implements Node { + private List children; + + public OrSet(List children) { + this.children = children; + } + + public List getChildren() { + return children; + } + + public String toString() { + return "ORS:" + children; + } + } + + /** + * AndSet is a flat representation of the DNF inner structure. (... AND ... AND ...) + */ + class AndSet implements Node { + private List children; + + public AndSet(List children) { + this.children = children; + } + + public List getChildren() { + return children; + } + + public String toString() { + return "ANDS:" + children; + } + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/NodeConverter.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/NodeConverter.java new file mode 100644 index 0000000..43067f1 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/NodeConverter.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase.predicate; + +import org.apache.hadoop.hive.hbase.predicate.Comparison.Comparator; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.util.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * Node converter converts node to its related classes and vice versa. + * 1. ExprNodeDesc -> Node + * 2. Node -> OrSet + */ +public class NodeConverter { + + /** + * Converts Hive ExprNodeExprs to simple nodes. + * + * @param expr An expression to convert + * @return A converted node. If the expression contains non-relative columns and functions so that it + * cannot make an optimal plan, then it just returns an instance of Node.Ignored. + */ + static Node toNode(ExprNodeDesc expr, List keyColumns) { + Node node; + node = innerToNode(expr, keyColumns); + node = removeIgnored(node); + return node; + } + + private static Node innerToNode(ExprNodeDesc expr, List keyColumns) { + if (expr instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) expr; + GenericUDF udf = funcDesc.getGenericUDF(); + + // Handle comparisons + if (udf instanceof GenericUDFBaseCompare) { + ExprNodeDesc expr1 = expr.getChildren().get(0); + ExprNodeDesc expr2 = expr.getChildren().get(1); + + ExprNodeDesc[] extracted = ExprNodeDescUtils.extractComparePair(expr1, expr2); + if (extracted == null) { + return Node.IGNORED; + } + if (extracted.length > 2) { + udf = udf.flip(); + } + + Node leftChild = innerToNode(extracted[0], keyColumns); + Node rightChild = innerToNode(extracted[1], keyColumns); + Comparison.Comparator comparator = Comparison.toComparator((GenericUDFBaseCompare) udf); + if (leftChild instanceof Node.Column && rightChild instanceof Node.Constant) { + if (comparator == Comparator.NE) { + return new Node.And( + new Comparison(Comparator.LT, leftChild, rightChild), + new Comparison(Comparator.GT, leftChild, rightChild)); + } + return new Comparison(comparator, leftChild, rightChild); + } + if (leftChild instanceof Node.Constant && rightChild instanceof Node.Column) { + if (comparator == Comparator.NE) { + return new Node.And( + new Comparison(Comparator.LT, leftChild, rightChild).flip(), + new Comparison(Comparator.GT, leftChild, rightChild).flip()); + } + return new Comparison(comparator, leftChild, rightChild).flip(); + } + return Node.IGNORED; + } + if (udf instanceof GenericUDFOPNot) { + return new Node.Not( + innerToNode(expr.getChildren().get(0), keyColumns)); + } + if (udf instanceof GenericUDFOPAnd) { + return new Node.And( + innerToNode(expr.getChildren().get(0), keyColumns), + innerToNode(expr.getChildren().get(1), keyColumns)); + } + if (udf instanceof GenericUDFOPOr) { + // Handle OR + return new Node.Or( + innerToNode(expr.getChildren().get(0), keyColumns), + innerToNode(expr.getChildren().get(1), keyColumns)); + } + if (udf instanceof GenericUDFBetween) { + return new Node.And( + new Comparison( + Comparator.GE, + innerToNode(expr.getChildren().get(0), keyColumns), + innerToNode(expr.getChildren().get(1), keyColumns)), + new Comparison( + Comparator.LE, + innerToNode(expr.getChildren().get(0), keyColumns), + innerToNode(expr.getChildren().get(2), keyColumns)) + ); + } + if (udf instanceof GenericUDFIn) { + int size = expr.getChildren().size(); + Node node = null; + for (int i = 1; i < size; i++) { + if (node == null) { + node = new Comparison( + Comparator.EQ, + innerToNode(expr.getChildren().get(0), keyColumns), + innerToNode(expr.getChildren().get(i), keyColumns)); + } else { + node = + new Node.Or( + node, + new Comparison( + Comparator.EQ, + innerToNode(expr.getChildren().get(0), keyColumns), + innerToNode(expr.getChildren().get(i), keyColumns)) + ); + } + } + return node; + } + ExprNodeConstantDesc constant = ExprNodeDescUtils.foldConstant(funcDesc); + if (constant != null) { + return new Node.Constant((Comparable) constant.getValue()); + } + return Node.IGNORED; + } + // Handle constant + if (expr instanceof ExprNodeConstantDesc) { + return new Node.Constant((Comparable) ((ExprNodeConstantDesc) expr).getValue()); + } + // Handle column + if (expr instanceof ExprNodeColumnDesc) { + String column = ((ExprNodeColumnDesc) expr).getColumn().toLowerCase(); + if (keyColumns != null && !keyColumns.contains(column)) { + return Node.IGNORED; + } + return new Node.Column(column); + } + if (expr instanceof ExprNodeFieldDesc) { + String[] fields = ExprNodeDescUtils.extractFields((ExprNodeFieldDesc) expr, true); + if (fields == null || (keyColumns != null && !keyColumns.contains(fields[0]))) { + return Node.IGNORED; + } + return new Node.Column(StringUtils.join(".", fields)); + } + return Node.IGNORED; + } + + private static Node removeIgnored(Node node) { + // AND: Its okay to have a ignored child. + if (node instanceof Node.And) { + Node leftChild = removeIgnored(node.getChildren().get(0)); + Node rightChild = removeIgnored(node.getChildren().get(1)); + boolean leftIgnored = leftChild == Node.IGNORED; + boolean rightIgnored = rightChild == Node.IGNORED; + if (!leftIgnored && !rightIgnored) { + return new Node.And(leftChild, rightChild); + } + if (leftIgnored && !rightIgnored) { + return rightChild; + } + if (!leftIgnored && rightIgnored) { + return leftChild; + } + // if (leftIgnored && rightIgnored) { + return Node.IGNORED; + } + // OR: If any child is ignored, it's ignored, too. + if (node instanceof Node.Or) { + Node leftChild = removeIgnored(node.getChildren().get(0)); + Node rightChild = removeIgnored(node.getChildren().get(1)); + boolean leftIgnored = leftChild == Node.IGNORED; + boolean rightIgnored = rightChild == Node.IGNORED; + if (leftIgnored || rightIgnored) { + return Node.IGNORED; + } + return new Node.Or(leftChild, rightChild); + } + // NOT: If its child is ignored, it's ignored, too. + if (node instanceof Node.Not) { + Node child = removeIgnored(node.getChildren().get(0)); + if (child == Node.IGNORED) { + return Node.IGNORED; + } + return new Node.Not(child); + } + // Comparison: If its child is ignored, it's ignored, too. + if (node instanceof Comparison) { + for (Node child : node.getChildren()) { + child = removeIgnored(child); + if (child == Node.IGNORED) { + return Node.IGNORED; + } + } + return node; + } + // Otherwise, return as it is. + return node; + } + + /** + * Converts nodes to a OR set. + * + * @param node A root node to convert + * @return A converted OR set + */ + static Node.OrSet toOrSet(Node node) { + List andsList = new ArrayList(); + toOrSet(node, andsList); + return new Node.OrSet(andsList); + } + + private static void toOrSet(Node node, List andsList) { + if (node instanceof Node.Or) { + toOrSet(node.getChildren().get(0), andsList); + toOrSet(node.getChildren().get(1), andsList); + } else if (node instanceof Node.And) { + List andList = new ArrayList(); + toAndSet(node.getChildren().get(0), andList); + toAndSet(node.getChildren().get(1), andList); + andsList.add(new Node.AndSet(andList)); + } else { + List andList = new ArrayList(); + toAndSet(node, andList); + andsList.add(new Node.AndSet(andList)); + } + } + + private static void toAndSet(Node node, List nodeList) { + if (node instanceof Node.And) { + toAndSet(node.getChildren().get(0), nodeList); + toAndSet(node.getChildren().get(1), nodeList); + } else { + nodeList.add(node); + } + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/Range.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/Range.java new file mode 100644 index 0000000..5c9a92f --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/Range.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase.predicate; + +import org.apache.hadoop.hive.hbase.HiveHBaseInputFormatUtil; + +import java.util.List; + +/** + * A range of a column. + */ +public class Range { + + public Border startBorder = new StartBorder(); + public Border endBorder = new EndBorder(); + + public boolean isEverything() { + return startBorder.value == null && endBorder.value == null; + } + + public boolean isEmpty() { + if (startBorder.value == null || endBorder.value == null){ + return false; + } + int compared = startBorder.value.compareTo(endBorder.value); + if (compared > 0) { + return true; + } + if (compared == 0) { + if (startBorder.inclusive == false && endBorder.inclusive == false) { + return true; + } + } + return false; + } + + public static Range create(List comparisons) { + Range range = new Range(); + Range.Border startBorder = range.startBorder; + Range.Border endBorder = range.endBorder; + for (Comparison comparison : comparisons) { + Comparable constant = comparison.getConstant().getValue(); + switch (comparison.getComparator()) { + case LT: + endBorder = endBorder.narrower(new Range.EndBorder(constant, false)); + break; + case LE: + endBorder = endBorder.narrower(new Range.EndBorder(constant, true)); + break; + case GT: + startBorder = startBorder.narrower(new Range.StartBorder(constant, false)); + break; + case GE: + startBorder = startBorder.narrower(new Range.StartBorder(constant, true)); + break; + case EQ: + startBorder = startBorder.narrower(new Range.StartBorder(constant, true)); + endBorder = endBorder.narrower(new Range.EndBorder(constant, true)); + break; + } + } + range.startBorder = startBorder; + range.endBorder = endBorder; + return range; + } + + public KeyRange serialize(String columnName, boolean binary) throws Exception { + byte[] startKey = null; + byte[] endKey = null; + if (startBorder.value != null) { + startKey = HiveHBaseInputFormatUtil.toByteArray( + startBorder.value, !startBorder.inclusive, binary); + } + if (endBorder.value != null) { + endKey = HiveHBaseInputFormatUtil.toByteArray( + endBorder.value, endBorder.inclusive, binary); + } + return new KeyRange(columnName, startKey, endKey); + } + + public static abstract class Border { + + public boolean inclusive; + public Comparable value; + + public Border(Comparable value, boolean inclusive) { + this.value = value; + this.inclusive = inclusive; + } + + public abstract Border narrower(Border other); + + public String toString() { + return "inclusive:" + inclusive + ", value:" + value; + } + } + + static class StartBorder extends Border { + + public StartBorder(Comparable value, boolean inclusive) { + super(value, inclusive); + } + + public StartBorder() { + this(null, false); + } + + @Override + public Border narrower(Border other) { + if (value == null || other.value == null) { + return value == null ? other : this; + } + int compared = value.compareTo(other.value); + if (compared == 0) { + return inclusive ? other : this; + } + return compared > 0 ? this : other; + } + + public String toString() { + return "start(" + super.toString() + ")"; + } + } + + static class EndBorder extends Border { + + public EndBorder(Comparable value, boolean inclusive) { + super(value, inclusive); + } + + public EndBorder() { + this(null, false); + } + + @Override + public Border narrower(Border other) { + if (value == null || other.value == null) { + return value == null ? other : this; + } + int compared = value.compareTo(other.value); + if (compared == 0) { + return inclusive ? other : this; + } + return compared < 0 ? this : other; + } + + public String toString() { + return "end(" + super.toString() + ")"; + } + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/RangeTranslator.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/RangeTranslator.java new file mode 100644 index 0000000..14e7ba6 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/predicate/RangeTranslator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase.predicate; + +import org.apache.hadoop.mapred.JobConf; + +import java.util.List; +import java.util.Map; + +public interface RangeTranslator { + + /** + * Translate OR conjugated ColumnRanges to hbase ranges + * + * @param ranges + * @return + */ + List translate(List> ranges); + + /** + * Returns column names + * @return + */ + List getKeyColumns(); +} diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java index 8962533..641ee9d 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory.java @@ -98,7 +98,7 @@ private SlashSeparatedOI(StructTypeInfo type) { @Override public Object getStructFieldData(Object data, StructField fieldRef) { - return ((DoubleDollarSeparated)data).fields[((MyField)fieldRef).getFieldID()]; + return ((DoubleDollarSeparated)data).fields[fieldRef.getFieldID()]; } @Override diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java index ecd5061..a91764e 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java @@ -236,7 +236,7 @@ private StringArrayOI(StructTypeInfo type) { @Override public Object getStructFieldData(Object data, StructField fieldRef) { - return ((FixedLengthed)data).fields.get(((MyField)fieldRef).getFieldID()); + return ((FixedLengthed)data).fields.get(fieldRef.getFieldID()); } @Override diff --git hbase-handler/src/test/queries/positive/hbase_ppd_key_range.q hbase-handler/src/test/queries/positive/hbase_ppd_key_range.q index 59e724d..087669d 100644 --- hbase-handler/src/test/queries/positive/hbase_ppd_key_range.q +++ hbase-handler/src/test/queries/positive/hbase_ppd_key_range.q @@ -14,7 +14,7 @@ select * from hbase_pushdown where key<'1'; select * from hbase_pushdown where key<='2'; select * from hbase_pushdown where key>='90'; --- with cnostant expressinon +-- with constant expression explain select * from hbase_pushdown where key>=cast(40 + 50 as string); select * from hbase_pushdown where key>=cast(40 + 50 as string); diff --git hbase-handler/src/test/queries/positive/hbase_ppd_or.q hbase-handler/src/test/queries/positive/hbase_ppd_or.q new file mode 100644 index 0000000..a0ee946 --- /dev/null +++ hbase-handler/src/test/queries/positive/hbase_ppd_or.q @@ -0,0 +1,39 @@ +CREATE TABLE hbase_or(key string, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_or", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.OrPredicateHBaseKeyFactory"); + +from src tablesample (5 rows) +insert into table hbase_or select key, value; + +set hive.fetch.task.conversion=more; + +-- 165,238,27,311,86 +select * from hbase_or; + +-- 238,86 +explain +select * from hbase_or where key = '238' OR key = '86'; +select * from hbase_or where key = '238' OR key = '86'; + +-- 27,311 +explain +select * from hbase_or where key IN ('27','311','340'); +select * from hbase_or where key IN ('27','311','340'); + +-- 165,238,86 +explain +select * from hbase_or where key NOT IN ('27','311','340'); +select * from hbase_or where key NOT IN ('27','311','340'); + +-- 165,238,27 +explain +select * from hbase_or where key BETWEEN '100' AND '300'; +select * from hbase_or where key BETWEEN '100' AND '300'; + +-- 311,86 +explain +select * from hbase_or where key NOT BETWEEN '100' AND '300'; +select * from hbase_or where key NOT BETWEEN '100' AND '300'; diff --git hbase-handler/src/test/results/positive/hbase_custom_key3.q.out hbase-handler/src/test/results/positive/hbase_custom_key3.q.out index c8aad2b..9ddd198 100644 --- hbase-handler/src/test/results/positive/hbase_custom_key3.q.out +++ hbase-handler/src/test/results/positive/hbase_custom_key3.q.out @@ -70,6 +70,7 @@ STAGE PLANS: TableScan alias: hbase_ck_5 filterExpr: ((key.col1 = '238') and (key.col2 = '1238')) (type: boolean) + filterFactory: org.apache.hadoop.hive.hbase.AbstractHBaseKeyPredicateDecomposer$HBaseScanRangeFactory Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE Filter Operator predicate: ((key.col1 = '238') and (key.col2 = '1238')) (type: boolean) diff --git hbase-handler/src/test/results/positive/hbase_ppd_key_range.q.out hbase-handler/src/test/results/positive/hbase_ppd_key_range.q.out index 4e4364e..8e7f892 100644 --- hbase-handler/src/test/results/positive/hbase_ppd_key_range.q.out +++ hbase-handler/src/test/results/positive/hbase_ppd_key_range.q.out @@ -178,10 +178,10 @@ POSTHOOK: Input: default@hbase_pushdown 96 val_96 97 val_97 98 val_98 -PREHOOK: query: -- with cnostant expressinon +PREHOOK: query: -- with constant expression explain select * from hbase_pushdown where key>=cast(40 + 50 as string) PREHOOK: type: QUERY -POSTHOOK: query: -- with cnostant expressinon +POSTHOOK: query: -- with constant expression explain select * from hbase_pushdown where key>=cast(40 + 50 as string) POSTHOOK: type: QUERY STAGE DEPENDENCIES: diff --git hbase-handler/src/test/results/positive/hbase_ppd_or.q.out hbase-handler/src/test/results/positive/hbase_ppd_or.q.out new file mode 100644 index 0000000..7a2d840 --- /dev/null +++ hbase-handler/src/test/results/positive/hbase_ppd_or.q.out @@ -0,0 +1,240 @@ +PREHOOK: query: CREATE TABLE hbase_or(key string, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_or", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.OrPredicateHBaseKeyFactory") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@hbase_or +POSTHOOK: query: CREATE TABLE hbase_or(key string, value string) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ( + "hbase.table.name" = "hbase_or", + "hbase.columns.mapping" = ":key,cf:string", + "hbase.composite.key.factory"="org.apache.hadoop.hive.hbase.OrPredicateHBaseKeyFactory") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_or +PREHOOK: query: from src tablesample (5 rows) +insert into table hbase_or select key, value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_or +POSTHOOK: query: from src tablesample (5 rows) +insert into table hbase_or select key, value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_or +PREHOOK: query: -- 165,238,27,311,86 +select * from hbase_or +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_or +#### A masked pattern was here #### +POSTHOOK: query: -- 165,238,27,311,86 +select * from hbase_or +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_or +#### A masked pattern was here #### +165 val_165 +238 val_238 +27 val_27 +311 val_311 +86 val_86 +PREHOOK: query: -- 238,86 +explain +select * from hbase_or where key = '238' OR key = '86' +PREHOOK: type: QUERY +POSTHOOK: query: -- 238,86 +explain +select * from hbase_or where key = '238' OR key = '86' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_or + filterFactory: org.apache.hadoop.hive.hbase.AbstractHBaseKeyFactory$NormalizedExprFactory + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: ((key = '238') or (key = '86')) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_or where key = '238' OR key = '86' +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_or +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_or where key = '238' OR key = '86' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_or +#### A masked pattern was here #### +238 val_238 +86 val_86 +PREHOOK: query: -- 27,311 +explain +select * from hbase_or where key IN ('27','311','340') +PREHOOK: type: QUERY +POSTHOOK: query: -- 27,311 +explain +select * from hbase_or where key IN ('27','311','340') +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_or + filterFactory: org.apache.hadoop.hive.hbase.AbstractHBaseKeyFactory$NormalizedExprFactory + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (key) IN ('27', '311', '340') (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_or where key IN ('27','311','340') +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_or +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_or where key IN ('27','311','340') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_or +#### A masked pattern was here #### +27 val_27 +311 val_311 +PREHOOK: query: -- 165,238,86 +explain +select * from hbase_or where key NOT IN ('27','311','340') +PREHOOK: type: QUERY +POSTHOOK: query: -- 165,238,86 +explain +select * from hbase_or where key NOT IN ('27','311','340') +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_or + filterFactory: org.apache.hadoop.hive.hbase.AbstractHBaseKeyFactory$NormalizedExprFactory + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (not (key) IN ('27', '311', '340')) (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_or where key NOT IN ('27','311','340') +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_or +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_or where key NOT IN ('27','311','340') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_or +#### A masked pattern was here #### +165 val_165 +238 val_238 +86 val_86 +PREHOOK: query: -- 165,238,27 +explain +select * from hbase_or where key BETWEEN '100' AND '300' +PREHOOK: type: QUERY +POSTHOOK: query: -- 165,238,27 +explain +select * from hbase_or where key BETWEEN '100' AND '300' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_or + filterFactory: org.apache.hadoop.hive.hbase.AbstractHBaseKeyFactory$NormalizedExprFactory + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: key BETWEEN '100' AND '300' (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_or where key BETWEEN '100' AND '300' +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_or +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_or where key BETWEEN '100' AND '300' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_or +#### A masked pattern was here #### +165 val_165 +238 val_238 +27 val_27 +PREHOOK: query: -- 311,86 +explain +select * from hbase_or where key NOT BETWEEN '100' AND '300' +PREHOOK: type: QUERY +POSTHOOK: query: -- 311,86 +explain +select * from hbase_or where key NOT BETWEEN '100' AND '300' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: hbase_or + filterFactory: org.apache.hadoop.hive.hbase.AbstractHBaseKeyFactory$NormalizedExprFactory + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Filter Operator + predicate: (not key BETWEEN '100' AND '300') (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + ListSink + +PREHOOK: query: select * from hbase_or where key NOT BETWEEN '100' AND '300' +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_or +#### A masked pattern was here #### +POSTHOOK: query: select * from hbase_or where key NOT BETWEEN '100' AND '300' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_or +#### A masked pattern was here #### +311 val_311 +86 val_86 diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 5c4459b..c1dabc1 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -37,7 +36,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -424,19 +422,19 @@ public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { // construct column name list and types for reference by filter push down Utilities.setColumnNameList(jobConf, tableScan); Utilities.setColumnTypeList(jobConf, tableScan); - // push down filters - ExprNodeGenericFuncDesc filterExpr = (ExprNodeGenericFuncDesc)scanDesc.getFilterExpr(); - if (filterExpr == null) { - return; - } Serializable filterObject = scanDesc.getFilterObject(); - if (filterObject != null) { - jobConf.set( - TableScanDesc.FILTER_OBJECT_CONF_STR, - Utilities.serializeObject(filterObject)); + String filterFactory = scanDesc.getFilterFactory(); + if (filterObject != null && filterFactory != null) { + jobConf.set(TableScanDesc.FILTER_OBJECT_CONF_STR, Utilities.serializeObject(filterObject)); + jobConf.set(TableScanDesc.FILTER_OBJECT_CONF_FACTORY, filterFactory); } + // push down filters + ExprNodeGenericFuncDesc filterExpr = scanDesc.getFilterExpr(); + if (filterExpr == null) { + return; + } String filterText = filterExpr.getExprString(); String filterExprSerialized = Utilities.serializeExpression(filterExpr); if (LOG.isDebugEnabled()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java index 7d7c764..45825a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java @@ -76,6 +76,11 @@ public DecomposedPredicate decomposePredicate( public Serializable pushedPredicateObject; /** + * Factory class name for pushedPredicateObject + */ + public String pushedPredicateFactory; + + /** * Portion of predicate to be post-evaluated by Hive for any rows * which are returned by storage handler. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 75897b8..9b8131b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -386,10 +386,10 @@ precedenceEqualExpression -> ^(TOK_SUBQUERY_EXPR ^(TOK_SUBQUERY_OP KW_IN) subQueryExpression $precedenceEqualExpression) | (KW_IN expressions) -> ^(TOK_FUNCTION KW_IN $precedenceEqualExpression expressions) - | ( KW_NOT KW_BETWEEN (min=precedenceBitwiseOrExpression) KW_AND (max=precedenceBitwiseOrExpression) ) - -> ^(TOK_FUNCTION Identifier["between"] KW_TRUE $left $min $max) - | ( KW_BETWEEN (min=precedenceBitwiseOrExpression) KW_AND (max=precedenceBitwiseOrExpression) ) - -> ^(TOK_FUNCTION Identifier["between"] KW_FALSE $left $min $max) + | (KW_NOT KW_BETWEEN (min=precedenceBitwiseOrExpression) KW_AND (max=precedenceBitwiseOrExpression) ) + -> ^(KW_NOT ^(TOK_FUNCTION Identifier["between"] $left $min $max)) + | (KW_BETWEEN (min=precedenceBitwiseOrExpression) KW_AND (max=precedenceBitwiseOrExpression) ) + -> ^(TOK_FUNCTION Identifier["between"] $left $min $max) )* | (KW_EXISTS LPAREN KW_SELECT)=> (KW_EXISTS subQueryExpression) -> ^(TOK_SUBQUERY_EXPR ^(TOK_SUBQUERY_OP KW_EXISTS) subQueryExpression) ; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java index f293c43..ac1ad32 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java @@ -264,18 +264,19 @@ private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator cur public static ExprNodeDesc[] extractComparePair(ExprNodeDesc expr1, ExprNodeDesc expr2) { expr1 = extractConstant(expr1); expr2 = extractConstant(expr2); + // handles cases where the query has a predicate "column-name=constant" if (expr1 instanceof ExprNodeColumnDesc && expr2 instanceof ExprNodeConstantDesc) { return new ExprNodeDesc[] {expr1, expr2}; } + // handles cases where the query has a predicate "constant=column-name" if (expr1 instanceof ExprNodeConstantDesc && expr2 instanceof ExprNodeColumnDesc) { return new ExprNodeDesc[] {expr1, expr2}; } - // handles cases where the query has a predicate "column-name=constant" + if (expr1 instanceof ExprNodeFieldDesc && expr2 instanceof ExprNodeConstantDesc) { ExprNodeColumnDesc columnDesc = extractColumn(expr1); return columnDesc != null ? new ExprNodeDesc[] {columnDesc, expr2, expr1} : null; } - // handles cases where the query has a predicate "constant=column-name" if (expr1 instanceof ExprNodeConstantDesc && expr2 instanceof ExprNodeFieldDesc) { ExprNodeColumnDesc columnDesc = extractColumn(expr2); return columnDesc != null ? new ExprNodeDesc[] {expr1, columnDesc, expr2} : null; @@ -284,24 +285,32 @@ private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator cur return null; } + public static String[] extractFields(ExprNodeFieldDesc expr) { + return extractFields(expr, false); + } + /** * Extract fields from the given {@link ExprNodeFieldDesc node descriptor} * */ - public static String[] extractFields(ExprNodeFieldDesc expr) { - return extractFields(expr, new ArrayList()).toArray(new String[0]); + public static String[] extractFields(ExprNodeFieldDesc expr, boolean withColumn) { + return extractFields(expr, new ArrayList(), withColumn).toArray(new String[0]); } /* * Recursively extract fields from ExprNodeDesc. Deeply nested structs can have multiple levels of * fields in them */ - private static List extractFields(ExprNodeDesc expr, List fields) { + private static List extractFields( + ExprNodeDesc expr, List fields, boolean withColumn) { if (expr instanceof ExprNodeFieldDesc) { ExprNodeFieldDesc field = (ExprNodeFieldDesc)expr; fields.add(field.getFieldName()); - return extractFields(field.getDesc(), fields); + return extractFields(field.getDesc(), fields, withColumn); } if (expr instanceof ExprNodeColumnDesc) { + if (withColumn) { + fields.add(((ExprNodeColumnDesc)expr).getColumn()); + } return fields; } throw new IllegalStateException( @@ -330,7 +339,7 @@ private static ExprNodeDesc extractConstant(ExprNodeDesc expr) { return folded == null ? expr : folded; } - private static ExprNodeConstantDesc foldConstant(ExprNodeGenericFuncDesc func) { + public static ExprNodeConstantDesc foldConstant(ExprNodeGenericFuncDesc func) { GenericUDF udf = func.getGenericUDF(); if (!FunctionRegistry.isDeterministic(udf) || FunctionRegistry.isStateful(udf)) { return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 699b476..564cea2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -67,7 +67,9 @@ private int maxStatsKeyPrefixLength = -1; private ExprNodeGenericFuncDesc filterExpr; + private transient Serializable filterObject; + private transient String filterFactory; // Both neededColumnIDs and neededColumns should never be null. // When neededColumnIDs is an empty list, @@ -88,6 +90,9 @@ public static final String FILTER_OBJECT_CONF_STR = "hive.io.filter.object"; + public static final String FILTER_OBJECT_CONF_FACTORY = + "hive.io.filter.object.factory"; + // input file name (big) to bucket number private Map bucketFileNameMapping; @@ -140,6 +145,15 @@ public void setFilterObject(Serializable filterObject) { this.filterObject = filterObject; } + @Explain(displayName = "filterFactory") + public String getFilterFactory() { + return filterFactory; + } + + public void setFilterFactory(String filterFactory) { + this.filterFactory = filterFactory; + } + public void setNeededColumnIDs(List neededColumnIDs) { this.neededColumnIDs = neededColumnIDs; } diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java index 7c8f8d7..320aabc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java @@ -1053,6 +1053,7 @@ private static ExprNodeGenericFuncDesc pushFilterToStorageHandler( } tableScanDesc.setFilterExpr(decomposed.pushedPredicate); tableScanDesc.setFilterObject(decomposed.pushedPredicateObject); + tableScanDesc.setFilterFactory(decomposed.pushedPredicateFactory); return decomposed.residualPredicate; } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBetween.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBetween.java index 9d05e12..ff06f84 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBetween.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBetween.java @@ -20,10 +20,8 @@ import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; -import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.BooleanWritable; @@ -38,47 +36,37 @@ @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { - if (!arguments[0].getTypeName().equals("boolean")) { - throw new UDFArgumentTypeException(0, "First argument for BETWEEN should be boolean type"); - } - egt.initialize(new ObjectInspector[] {arguments[1], arguments[2]}); - elt.initialize(new ObjectInspector[] {arguments[1], arguments[3]}); + egt.initialize(new ObjectInspector[] {arguments[0], arguments[1]}); + elt.initialize(new ObjectInspector[] {arguments[0], arguments[2]}); argumentOIs = arguments; return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector; } @Override - public Object evaluate(DeferredObject[] arguments) throws HiveException { - boolean invert = (Boolean) ((PrimitiveObjectInspector) argumentOIs[0]) - .getPrimitiveJavaObject(arguments[0].get()); - - BooleanWritable left = ((BooleanWritable)egt.evaluate(new DeferredObject[] {arguments[1], arguments[2]})); + public BooleanWritable evaluate(DeferredObject[] arguments) throws HiveException { + BooleanWritable left = ((BooleanWritable)egt.evaluate(new DeferredObject[] {arguments[0], arguments[1]})); if (left == null) { return null; } - if (!invert && !left.get()) { + if (!left.get()) { result.set(false); return result; } - BooleanWritable right = ((BooleanWritable)elt.evaluate(new DeferredObject[] {arguments[1], arguments[3]})); + BooleanWritable right = ((BooleanWritable)elt.evaluate(new DeferredObject[] {arguments[0], arguments[2]})); if (right == null) { return null; } - boolean between = left.get() && right.get(); - result.set(invert ? !between : between); + result.set(left.get() && right.get()); return result; } @Override public String getDisplayString(String[] children) { StringBuilder sb = new StringBuilder(); - sb.append(children[1]); - if (Boolean.valueOf(children[0])) { - sb.append(" NOT"); - } + sb.append(children[0]); sb.append(" BETWEEN "); - sb.append(children[2]).append(" AND ").append(children[3]); + sb.append(children[1]).append(" AND ").append(children[2]); return sb.toString(); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizationContext.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizationContext.java index 0612647..0a68311 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizationContext.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizationContext.java @@ -29,7 +29,6 @@ import junit.framework.Assert; -import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.expressions.ColAndCol; import org.apache.hadoop.hive.ql.exec.vector.expressions.ColOrCol; import org.apache.hadoop.hive.ql.exec.vector.expressions.DoubleColumnInList; @@ -125,10 +124,8 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFPower; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFRound; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPPlus; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTimestamp; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.junit.Test; @@ -915,7 +912,6 @@ public void testBetweenFilters() throws HiveException { // string BETWEEN GenericUDFBetween udf = new GenericUDFBetween(); List children1 = new ArrayList(); - children1.add(new ExprNodeConstantDesc(new Boolean(false))); // no NOT keyword children1.add(col1Expr); children1.add(constDesc); children1.add(constDesc2);