diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java index 8b64321..7b64a03 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseScanRange.java @@ -18,17 +18,17 @@ package org.apache.hadoop.hive.hbase; +import java.io.Serializable; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.io.BytesWritable; -import java.io.Serializable; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; - public class HBaseScanRange implements Serializable { private byte[] startRow; @@ -59,12 +59,20 @@ public void addFilter(Filter filter) throws Exception { } public void setup(Scan scan, Configuration conf) throws Exception { - if (startRow != null) { - scan.setStartRow(startRow); - } - if (stopRow != null) { - scan.setStopRow(stopRow); + setup(scan, conf, false); + } + + public void setup(Scan scan, Configuration conf, boolean filterOnly) throws Exception { + if (!filterOnly) { + // Set the start and stop rows only if asked to + if (startRow != null) { + scan.setStartRow(startRow); + } + if (stopRow != null) { + scan.setStopRow(stopRow); + } } + if (filterDescs.isEmpty()) { return; } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java index 6054d53..cfe228c 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java @@ -24,7 +24,10 @@ import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FilterList; @@ -32,9 +35,29 @@ import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; +import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; /** @@ -42,6 +65,8 @@ */ class HiveHBaseInputFormatUtil { + private static final Log LOG = LogFactory.getLog(HiveHBaseInputFormatUtil.class); + /** * Parse {@code jobConf} to create the target {@link HTable} instance. */ @@ -134,6 +159,13 @@ public static Scan getScan(JobConf jobConf) throws IOException { if (scanBatch != null) { scan.setBatch(Integer.parseInt(scanBatch)); } + + String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); + + if (filterObjectSerialized != null) { + setupScanRange(scan, filterObjectSerialized, jobConf, true); + } + return scan; } @@ -175,4 +207,157 @@ public static boolean getStorageFormatOfKey(String spec, String defaultFormat) t } return result; } + + static void setupScanRange(Scan scan, String filterObjectSerialized, JobConf jobConf, + boolean filterOnly) throws IOException { + HBaseScanRange range = + SerializationUtilities.deserializeObject(filterObjectSerialized, + HBaseScanRange.class); + try { + range.setup(scan, jobConf, filterOnly); + } catch (Exception e) { + throw new IOException(e); + } + } + + static void setupKeyRange(Scan scan, List conditions, boolean isBinary) + throws IOException { + // Convert the search condition into a restriction on the HBase scan + byte[] startRow = HConstants.EMPTY_START_ROW, stopRow = HConstants.EMPTY_END_ROW; + for (IndexSearchCondition sc : conditions) { + + ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc()); + PrimitiveObjectInspector objInspector; + Object writable; + + try { + objInspector = (PrimitiveObjectInspector) eval.initialize(null); + writable = eval.evaluate(null); + } catch (ClassCastException cce) { + throw new IOException("Currently only primitve types are supported. Found: " + + sc.getConstantDesc().getTypeString()); + } catch (HiveException e) { + throw new IOException(e); + } + + byte[] constantVal = getConstantVal(writable, objInspector, isBinary); + String comparisonOp = sc.getComparisonOp(); + + if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)) { + startRow = constantVal; + stopRow = getNextBA(constantVal); + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan".equals(comparisonOp)) { + stopRow = constantVal; + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan" + .equals(comparisonOp)) { + startRow = constantVal; + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan" + .equals(comparisonOp)) { + startRow = getNextBA(constantVal); + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan" + .equals(comparisonOp)) { + stopRow = getNextBA(constantVal); + } else { + throw new IOException(comparisonOp + " is not a supported comparison operator"); + } + } + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + + if (LOG.isDebugEnabled()) { + LOG.debug(Bytes.toStringBinary(startRow) + " ~ " + Bytes.toStringBinary(stopRow)); + } + } + + static void setupTimeRange(Scan scan, List conditions) throws IOException { + long start = 0; + long end = Long.MAX_VALUE; + for (IndexSearchCondition sc : conditions) { + long timestamp = getTimestampVal(sc); + String comparisonOp = sc.getComparisonOp(); + if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)) { + start = timestamp; + end = timestamp + 1; + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan".equals(comparisonOp)) { + end = timestamp; + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan" + .equals(comparisonOp)) { + start = timestamp; + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan" + .equals(comparisonOp)) { + start = timestamp + 1; + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan" + .equals(comparisonOp)) { + end = timestamp + 1; + } else { + throw new IOException(comparisonOp + " is not a supported comparison operator"); + } + } + scan.setTimeRange(start, end); + } + + static long getTimestampVal(IndexSearchCondition sc) throws IOException { + long timestamp; + try { + ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc()); + ObjectInspector inspector = eval.initialize(null); + Object value = eval.evaluate(null); + if (inspector instanceof LongObjectInspector) { + timestamp = ((LongObjectInspector) inspector).get(value); + } else { + PrimitiveObjectInspector primitive = (PrimitiveObjectInspector) inspector; + timestamp = PrimitiveObjectInspectorUtils.getTimestamp(value, primitive).getTime(); + } + } catch (HiveException e) { + throw new IOException(e); + } + return timestamp; + } + + static byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, boolean isKeyBinary) + throws IOException { + + if (!isKeyBinary) { + // Key is stored in text format. Get bytes representation of constant also of + // text format. + byte[] startRow; + ByteStream.Output serializeStream = new ByteStream.Output(); + LazyUtils.writePrimitiveUTF8(serializeStream, writable, poi, false, (byte) 0, null); + startRow = new byte[serializeStream.getLength()]; + System.arraycopy(serializeStream.getData(), 0, startRow, 0, serializeStream.getLength()); + return startRow; + } + + PrimitiveCategory pc = poi.getPrimitiveCategory(); + switch (poi.getPrimitiveCategory()) { + case INT: + return Bytes.toBytes(((IntWritable) writable).get()); + case BOOLEAN: + return Bytes.toBytes(((BooleanWritable) writable).get()); + case LONG: + return Bytes.toBytes(((LongWritable) writable).get()); + case FLOAT: + return Bytes.toBytes(((FloatWritable) writable).get()); + case DOUBLE: + return Bytes.toBytes(((DoubleWritable) writable).get()); + case SHORT: + return Bytes.toBytes(((ShortWritable) writable).get()); + case STRING: + return Bytes.toBytes(((Text) writable).toString()); + case BYTE: + return Bytes.toBytes(((ByteWritable) writable).get()); + + default: + throw new IOException("Type not supported " + pc); + } + } + + static byte[] getNextBA(byte[] current) { + // startRow is inclusive while stopRow is exclusive, + // this util method returns very next bytearray which will occur after the current one + // by padding current one with a trailing 0 byte. + byte[] next = new byte[current.length + 1]; + System.arraycopy(current, 0, next, 0, current.length); + return next; + } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 1ef4545..5534377 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -204,13 +204,7 @@ private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean Scan scan = new Scan(); String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); if (filterObjectSerialized != null) { - HBaseScanRange range = SerializationUtilities.deserializeObject(filterObjectSerialized, - HBaseScanRange.class); - try { - range.setup(scan, jobConf); - } catch (Exception e) { - throw new IOException(e); - } + HiveHBaseInputFormatUtil.setupScanRange(scan, filterObjectSerialized, jobConf, false); return scan; } @@ -252,158 +246,15 @@ private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean Map> split = HiveHBaseInputFormatUtil.decompose(conditions); List keyConditions = split.get(keyColName); if (keyConditions != null && !keyConditions.isEmpty()) { - setupKeyRange(scan, keyConditions, isKeyBinary); + HiveHBaseInputFormatUtil.setupKeyRange(scan, keyConditions, isKeyBinary); } List tsConditions = split.get(tsColName); if (tsConditions != null && !tsConditions.isEmpty()) { - setupTimeRange(scan, tsConditions); + HiveHBaseInputFormatUtil.setupTimeRange(scan, tsConditions); } return scan; } - private void setupKeyRange(Scan scan, List conditions, boolean isBinary) - throws IOException { - // Convert the search condition into a restriction on the HBase scan - byte [] startRow = HConstants.EMPTY_START_ROW, stopRow = HConstants.EMPTY_END_ROW; - for (IndexSearchCondition sc : conditions) { - - ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc()); - PrimitiveObjectInspector objInspector; - Object writable; - - try { - objInspector = (PrimitiveObjectInspector)eval.initialize(null); - writable = eval.evaluate(null); - } catch (ClassCastException cce) { - throw new IOException("Currently only primitve types are supported. Found: " + - sc.getConstantDesc().getTypeString()); - } catch (HiveException e) { - throw new IOException(e); - } - - byte[] constantVal = getConstantVal(writable, objInspector, isBinary); - String comparisonOp = sc.getComparisonOp(); - - if("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){ - startRow = constantVal; - stopRow = getNextBA(constantVal); - } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan".equals(comparisonOp)){ - stopRow = constantVal; - } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan" - .equals(comparisonOp)) { - startRow = constantVal; - } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan" - .equals(comparisonOp)){ - startRow = getNextBA(constantVal); - } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan" - .equals(comparisonOp)){ - stopRow = getNextBA(constantVal); - } else { - throw new IOException(comparisonOp + " is not a supported comparison operator"); - } - } - scan.setStartRow(startRow); - scan.setStopRow(stopRow); - - if (LOG.isDebugEnabled()) { - LOG.debug(Bytes.toStringBinary(startRow) + " ~ " + Bytes.toStringBinary(stopRow)); - } - } - - private void setupTimeRange(Scan scan, List conditions) - throws IOException { - long start = 0; - long end = Long.MAX_VALUE; - for (IndexSearchCondition sc : conditions) { - long timestamp = getTimestampVal(sc); - String comparisonOp = sc.getComparisonOp(); - if("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){ - start = timestamp; - end = timestamp + 1; - } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan".equals(comparisonOp)){ - end = timestamp; - } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan" - .equals(comparisonOp)) { - start = timestamp; - } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan" - .equals(comparisonOp)){ - start = timestamp + 1; - } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan" - .equals(comparisonOp)){ - end = timestamp + 1; - } else { - throw new IOException(comparisonOp + " is not a supported comparison operator"); - } - } - scan.setTimeRange(start, end); - } - - private long getTimestampVal(IndexSearchCondition sc) throws IOException { - long timestamp; - try { - ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc()); - ObjectInspector inspector = eval.initialize(null); - Object value = eval.evaluate(null); - if (inspector instanceof LongObjectInspector) { - timestamp = ((LongObjectInspector)inspector).get(value); - } else { - PrimitiveObjectInspector primitive = (PrimitiveObjectInspector) inspector; - timestamp = PrimitiveObjectInspectorUtils.getTimestamp(value, primitive).getTime(); - } - } catch (HiveException e) { - throw new IOException(e); - } - return timestamp; - } - - private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, - boolean isKeyBinary) throws IOException{ - - if (!isKeyBinary){ - // Key is stored in text format. Get bytes representation of constant also of - // text format. - byte[] startRow; - ByteStream.Output serializeStream = new ByteStream.Output(); - LazyUtils.writePrimitiveUTF8(serializeStream, writable, poi, false, (byte) 0, null); - startRow = new byte[serializeStream.getLength()]; - System.arraycopy(serializeStream.getData(), 0, startRow, 0, serializeStream.getLength()); - return startRow; - } - - PrimitiveCategory pc = poi.getPrimitiveCategory(); - switch (poi.getPrimitiveCategory()) { - case INT: - return Bytes.toBytes(((IntWritable)writable).get()); - case BOOLEAN: - return Bytes.toBytes(((BooleanWritable)writable).get()); - case LONG: - return Bytes.toBytes(((LongWritable)writable).get()); - case FLOAT: - return Bytes.toBytes(((FloatWritable)writable).get()); - case DOUBLE: - return Bytes.toBytes(((DoubleWritable)writable).get()); - case SHORT: - return Bytes.toBytes(((ShortWritable)writable).get()); - case STRING: - return Bytes.toBytes(((Text)writable).toString()); - case BYTE: - return Bytes.toBytes(((ByteWritable)writable).get()); - - default: - throw new IOException("Type not supported " + pc); - } - } - - - private byte[] getNextBA(byte[] current){ - // startRow is inclusive while stopRow is exclusive, - // this util method returns very next bytearray which will occur after the current one - // by padding current one with a trailing 0 byte. - byte[] next = new byte[current.length + 1]; - System.arraycopy(current, 0, next, 0, current.length); - return next; - } - /** * Instantiates a new predicate analyzer suitable for * determining how to push a filter down into the HBase scan, diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory3.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory3.java index 712725f..ef465e5 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory3.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory3.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; @@ -73,7 +75,7 @@ public HBaseScanRange getScanRange(List searchConditions) } fieldCond.add(condition); } - Filter filter = null; + List filters = new ArrayList(); HBaseScanRange range = new HBaseScanRange(); StructTypeInfo type = (StructTypeInfo) keyMapping.columnType; @@ -92,22 +94,22 @@ public HBaseScanRange getScanRange(List searchConditions) byte[] valueAsBytes = toBinary(constantVal, FIXED_LENGTH, false, false); if (comparisonOp.endsWith("UDFOPEqual")) { - filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(valueAsBytes)); + filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(valueAsBytes))); } else if (comparisonOp.endsWith("UDFOPEqualOrGreaterThan")) { - filter = new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(valueAsBytes)); + filters.add(new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(valueAsBytes))); } else if (comparisonOp.endsWith("UDFOPGreaterThan")) { - filter = new RowFilter(CompareOp.GREATER, new BinaryComparator(valueAsBytes)); + filters.add(new RowFilter(CompareOp.GREATER, new BinaryComparator(valueAsBytes))); } else if (comparisonOp.endsWith("UDFOPEqualOrLessThan")) { - filter = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(valueAsBytes)); + filters.add(new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(valueAsBytes))); } else if (comparisonOp.endsWith("UDFOPLessThan")) { - filter = new RowFilter(CompareOp.LESS, new BinaryComparator(valueAsBytes)); + filters.add(new RowFilter(CompareOp.LESS, new BinaryComparator(valueAsBytes))); } else { throw new IOException(comparisonOp + " is not a supported comparison operator"); } } } - if (filter != null) { - range.addFilter(filter); + if (!filters.isEmpty()) { + range.addFilter(new FilterList(Operator.MUST_PASS_ONE, filters)); } return range; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 3daa83f..07a919c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -70,7 +70,7 @@ private String tmpStatsDir; private ExprNodeGenericFuncDesc filterExpr; - private transient Serializable filterObject; + private Serializable filterObject; private String serializedFilterExpr; private String serializedFilterObject;