diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseShimRecordReader.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseShimRecordReader.java new file mode 100644 index 0000000..8db18e6 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseShimRecordReader.java @@ -0,0 +1,71 @@ +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.RecordReader; + +import java.io.IOException; + +/** + * mapred.RecordReader wrapper around mapreduce.RecordReader + */ +public class HBaseShimRecordReader implements RecordReader { + + private final org.apache.hadoop.mapreduce.RecordReader recordReader; + + public HBaseShimRecordReader(org.apache.hadoop.mapreduce.RecordReader recordReader) { + this.recordReader = recordReader; + } + + @Override + public void close() throws IOException { + recordReader.close(); + } + + @Override + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + @Override + public ResultWritable createValue() { + return new ResultWritable(new Result()); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public float getProgress() throws IOException { + float progress = 0.0F; + + try { + progress = recordReader.getProgress(); + } catch (InterruptedException e) { + throw new IOException(e); + } + + return progress; + } + + @Override + public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException { + + boolean next = false; + + try { + next = recordReader.nextKeyValue(); + + if (next) { + rowKey.set(recordReader.getCurrentValue().getRow()); + value.setResult(recordReader.getCurrentValue()); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + + return next; + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 4ac0803..65582a5 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -22,18 +22,13 @@ import java.util.ArrayList; import java.util.List; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; -import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; @@ -46,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -54,7 +50,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; @@ -63,142 +58,236 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; /** * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler * tables, decorating an underlying HBase TableInputFormat with extra Hive logic * such as column pruning and filter pushdown. */ -public class HiveHBaseTableInputFormat extends TableInputFormatBase +public class HiveHBaseTableInputFormat extends HiveMultiTableInputFormatBase implements InputFormat { static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class); @Override - public RecordReader getRecordReader( - InputSplit split, - JobConf jobConf, - final Reporter reporter) throws IOException { + public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { + // Take filter pushdown into account while calculating splits; this + // allows us to prune off regions immediately. Note that although + // the Javadoc for the superclass getSplits says that it returns one + // split per region, the implementation actually takes the scan + // definition into account and excludes regions which don't satisfy + // the start/stop row conditions (HBASE-1829). + List scans = createFilterScans(jobConf); + // REVIEW: are we supposed to be applying the getReadColumnIDs + // same as in getRecordReader? - HBaseSplit hbaseSplit = (HBaseSplit) split; - TableSplit tableSplit = hbaseSplit.getTableSplit(); + for (Scan scan : scans) { + pushScanColumns(scan, jobConf); + configureScan(scan, jobConf); + } - setHTable(HiveHBaseInputFormatUtil.getTable(jobConf)); - setScan(HiveHBaseInputFormatUtil.getScan(jobConf)); + setScans(scans); + return super.getSplits(jobConf, numSplits); + } - Job job = new Job(jobConf); - TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext( - job.getConfiguration(), reporter); + private void configureScan( + Scan scan, + JobConf jobConf) throws IOException { - final org.apache.hadoop.mapreduce.RecordReader - recordReader = createRecordReader(tableSplit, tac); - try { - recordReader.initialize(tableSplit, tac); - } catch (InterruptedException e) { - throw new IOException("Failed to initialize RecordReader", e); + String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); + + if (hbaseTableName == null) { + throw new IOException("HBase table must be specified in the JobConf"); + } else { + scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(hbaseTableName)); } - return new RecordReader() { + String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); + if (scanCache != null) { + scan.setCaching(Integer.valueOf(scanCache)); + } + String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); + if (scanCacheBlocks != null) { + scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); + } + String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); + if (scanBatch != null) { + scan.setBatch(Integer.valueOf(scanBatch)); + } + } - @Override - public void close() throws IOException { - recordReader.close(); - } + /** + * Push the configured columns to be read into scan, reading all necessary values from jobConf and delegating + * to {@link #pushScanColumns(org.apache.hadoop.hbase.client.Scan, ColumnMappings, java.util.List, boolean)} + * @param scan + * @param jobConf + * @throws IOException + */ + private void pushScanColumns(Scan scan, JobConf jobConf) throws IOException { + String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); + ColumnMappings columnMappings; - @Override - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } + try { + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); + } catch (SerDeException e) { + throw new IOException(e); + } - @Override - public ResultWritable createValue() { - return new ResultWritable(new Result()); - } + List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); - @Override - public long getPos() throws IOException { - return 0; - } + boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf); - @Override - public float getProgress() throws IOException { - float progress = 0.0F; + pushScanColumns(scan, columnMappings, readColIDs, readAllColumns); + } - try { - progress = recordReader.getProgress(); - } catch (InterruptedException e) { - throw new IOException(e); - } + private void pushScanColumns(Scan scan, ColumnMappings columnMappings, List readColIDs, boolean readAllColumns) throws IOException { - return progress; - } - @Override - public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException { + if (columnMappings.size() < readColIDs.size()) { + throw new IOException("Cannot read more columns than the given table contains."); + } - boolean next = false; + boolean empty = true; - try { - next = recordReader.nextKeyValue(); + // The list of families that have been added to the scan + List addedFamilies = new ArrayList(); + + if (!readAllColumns) { + ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); + for (int i : readColIDs) { + ColumnMapping colMap = columnsMapping[i]; + if (colMap.hbaseRowKey) { + continue; + } - if (next) { - rowKey.set(recordReader.getCurrentValue().getRow()); - value.setResult(recordReader.getCurrentValue()); + if (colMap.qualifierName == null) { + scan.addFamily(colMap.familyNameBytes); + addedFamilies.add(colMap.familyName); + } else { + if(!addedFamilies.contains(colMap.familyName)){ + // add only if the corresponding family has not already been added + scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); } - } catch (InterruptedException e) { - throw new IOException(e); } - return next; + empty = false; + } + } + + // The HBase table's row key maps to a Hive table column. In the corner case when only the + // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/ + // column qualifier will have been added to the scan. We arbitrarily add at least one column + // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive + // tables column projection. + if (empty) { + for (ColumnMapping colMap: columnMappings) { + if (colMap.hbaseRowKey) { + continue; + } + + if (colMap.qualifierName == null) { + scan.addFamily(colMap.familyNameBytes); + } else { + scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); + } + + if (!readAllColumns) { + break; + } } - }; + } } + /** * Converts a filter (which has been pushed down from Hive's optimizer) - * into corresponding restrictions on the HBase scan. The - * filter should already be in a form which can be fully converted. - * - * @param jobConf configuration for the scan - * - * @param iKey 0-based offset of key column within Hive table + * into corresponding restrictions on the HBase scan. * - * @return converted table split if any + * If a list of HBaseScanRanges has been pushed as TableScanDesc.FILTER_OBJECT_CONF_STR, use that; otherwise use TableScanDesc.FILTER_EXPR_CONF_STR. If nothing has been + * pushed, return a single element list containing an unbounded scan. + * @return converted scan */ - private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) - throws IOException { + private List createFilterScans(JobConf jobConf) throws IOException { - // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL + String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); + + if (hbaseColumnsMapping == null) { + throw new IOException("hbase.columns.mapping required for HBase Table."); + } + + ColumnMappings columnMappings; + try { + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching); + } catch (SerDeException e) { + throw new IOException(e); + } + + int iKey = columnMappings.getKeyIndex(); + ColumnMapping keyMapping = columnMappings.getKeyMapping(); - Scan scan = new Scan(); String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); + + List ranges = null; if (filterObjectSerialized != null) { - HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized, - HBaseScanRange.class); - try { - range.setup(scan, jobConf); - } catch (Exception e) { - throw new IOException(e); - } - return scan; + ranges = (List) Utilities.deserializeObject(filterObjectSerialized, + ArrayList.class); } - String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - if (filterExprSerialized == null) { - return scan; - } - ExprNodeGenericFuncDesc filterExpr = - Utilities.deserializeExpression(filterExprSerialized); + + ExprNodeGenericFuncDesc filterExpr = filterExprSerialized != null ? + Utilities.deserializeExpression(filterExprSerialized) : null; String colName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey]; - IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(colName,colType, isKeyBinary); + + boolean isKeyBinary = getStorageFormatOfKey(keyMapping.mappingSpec, + jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")); + + return createFilterScans(ranges, filterExpr, colName, colType, isKeyBinary, jobConf); + } + + /** + * Perform the actual conversion from pushed objects (deserialized from the JobConf) + * to a List + */ + private List createFilterScans(List ranges, ExprNodeGenericFuncDesc filterExpr, String keyColName, String keyColType, boolean isKeyBinary, JobConf jobConf) throws IOException { + + // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL + + if (ranges != null) { + List rtn = Lists.newArrayListWithCapacity(ranges.size()); + + for (HBaseScanRange range : ranges) { + Scan scan = new Scan(); + try { + range.setup(scan, jobConf); + } catch (Exception e) { + throw new IOException(e); + } + rtn.add(scan); + } + return rtn; + } else if (filterExpr != null) { + return ImmutableList.of(createScanFromFilterExpr(filterExpr, keyColName, keyColType, isKeyBinary)); + } else { + return ImmutableList.of(new Scan()); + } + } + + /** + * Create a scan with the filters represented by filterExpr. + * @param filterExpr + * @param keyColName + * @param keyColType + * @param isKeyBinary + * @return + * @throws IOException + */ + private Scan createScanFromFilterExpr(ExprNodeGenericFuncDesc filterExpr, String keyColName, String keyColType, boolean isKeyBinary) throws IOException { + Scan scan = new Scan(); + IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(keyColName, keyColType, isKeyBinary); List searchConditions = new ArrayList(); @@ -264,7 +353,7 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) return scan; } - private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, + private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, boolean isKeyBinary) throws IOException{ if (!isKeyBinary){ @@ -351,75 +440,27 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( return analyzer; } - @Override - public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { - - //obtain delegation tokens for the job - TableMapReduceUtil.initCredentials(jobConf); - - String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); - String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); - boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); - - if (hbaseColumnsMapping == null) { - throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table."); - } - - ColumnMappings columnMappings = null; - try { - columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); - } catch (SerDeException e) { - throw new IOException(e); - } - - int iKey = columnMappings.getKeyIndex(); - ColumnMapping keyMapping = columnMappings.getKeyMapping(); - - // Take filter pushdown into account while calculating splits; this - // allows us to prune off regions immediately. Note that although - // the Javadoc for the superclass getSplits says that it returns one - // split per region, the implementation actually takes the scan - // definition into account and excludes regions which don't satisfy - // the start/stop row conditions (HBASE-1829). - Scan scan = createFilterScan(jobConf, iKey, - HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec, - jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); + private boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ - // The list of families that have been added to the scan - List addedFamilies = new ArrayList(); + String[] mapInfo = spec.split("#"); + boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat) ? true : false; - // REVIEW: are we supposed to be applying the getReadColumnIDs - // same as in getRecordReader? - for (ColumnMapping colMap : columnMappings) { - if (colMap.hbaseRowKey) { - continue; - } + switch (mapInfo.length) { + case 1: + return tblLevelDefault; - if (colMap.qualifierName == null) { - scan.addFamily(colMap.familyNameBytes); - addedFamilies.add(colMap.familyName); - } else { - if(!addedFamilies.contains(colMap.familyName)){ - // add the column only if the family has not already been added - scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); - } + case 2: + String storageType = mapInfo[1]; + if(storageType.equals("-")) { + return tblLevelDefault; + } else if ("string".startsWith(storageType)){ + return false; + } else if ("binary".startsWith(storageType)){ + return true; } - } - setScan(scan); - - Job job = new Job(jobConf); - JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); - Path [] tablePaths = FileInputFormat.getInputPaths(jobContext); - List splits = - super.getSplits(jobContext); - InputSplit [] results = new InputSplit[splits.size()]; - - for (int i = 0; i < splits.size(); i++) { - results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]); + default: + throw new IOException("Malformed string: " + spec); } - - return results; } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveMultiTableInputFormatBase.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveMultiTableInputFormatBase.java new file mode 100644 index 0000000..163209b --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveMultiTableInputFormatBase.java @@ -0,0 +1,62 @@ +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase; +import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +/** + * mapred port of {@link org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase} + */ +public class HiveMultiTableInputFormatBase extends MultiTableInputFormatBase implements InputFormat { + + @Override + public InputSplit[] getSplits(JobConf jobConf, int splitNum) throws IOException { + //obtain delegation tokens for the job + TableMapReduceUtil.initCredentials(jobConf); + Job job = new Job(jobConf); + org.apache.hadoop.mapreduce.JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); + + + List splits = super.getSplits(jobContext); + + //Convert mapreduce splits to mapred splits + InputSplit[] rtn = new InputSplit[splits.size()]; + + for (int i = 0; i < splits.size(); i++) { + rtn[i] = new HBaseSplit((TableSplit) splits.get(i), FileInputFormat.getInputPaths(jobConf)[0]); + } + + return rtn; + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { + HBaseSplit hbaseSplit = (HBaseSplit) inputSplit; + TableSplit tableSplit = hbaseSplit.getTableSplit(); + Job job = new Job(jobConf); + HadoopShims hadoopShims = ShimLoader.getHadoopShims(); + TaskAttemptContext tac = hadoopShims.newTaskAttemptContext( + job.getConfiguration(), reporter); + + final org.apache.hadoop.mapreduce.RecordReader recordReader; + try { + recordReader = createRecordReader(tableSplit, tac); + recordReader.initialize(tableSplit, tac); + } catch (InterruptedException e) { + throw new IOException("Failed to initialized RecordReader", e); + } + + return new HBaseShimRecordReader(recordReader); + } + +} diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java index ecd5061..5ab86b6 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java @@ -114,7 +114,7 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese return decomposed; } - private HBaseScanRange setupFilter(String keyColName, List conditions) + protected HBaseScanRange setupFilter(String keyColName, List conditions) throws IOException { Map> fieldConds = new HashMap>(); diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestMultiScanHBaseKeyFactory.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestMultiScanHBaseKeyFactory.java new file mode 100644 index 0000000..c52cbc3 --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestMultiScanHBaseKeyFactory.java @@ -0,0 +1,176 @@ +package org.apache.hadoop.hive.hbase; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.plan.*; +import org.apache.hadoop.hive.ql.udf.generic.*; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Key factory supporting fixed length composite row keys, where the first field in the row key is a "bucket" value intended + * to distribute data. Predicate pushdown in this class works by pushing down predicates on the rest of the fields + * to all buckets if a predicate on the bucket isn't provided. + */ +public class TestMultiScanHBaseKeyFactory extends TestHBaseKeyFactory2 { + + public static final String HIVE_HBASE_MULTISCAN_NUM_BUCKETS = "hive.hbase.multiscan.num_buckets"; + private int numBuckets; + + public static final List> ALLOWED_COMPARISONS = ImmutableList.of( + GenericUDFOPGreaterThan.class, + GenericUDFOPEqualOrGreaterThan.class, + GenericUDFOPEqual.class, + GenericUDFOPLessThan.class, + GenericUDFOPEqualOrLessThan.class + ); + + @Override + public void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException { + super.init(hbaseParam, properties); + + super.init(hbaseParam, properties); + String numBucketsProp = properties.getProperty(HIVE_HBASE_MULTISCAN_NUM_BUCKETS); + + try { + this.numBuckets = Integer.parseInt(numBucketsProp); + } catch (NullPointerException e) { + throw new SerDeException( + "Expected table property " + HIVE_HBASE_MULTISCAN_NUM_BUCKETS + " to be a valid integer; was: " + numBucketsProp, e); + } catch (NumberFormatException e) { + throw new SerDeException( + "Expected table property " + HIVE_HBASE_MULTISCAN_NUM_BUCKETS + " to be a valid integer; was: " + numBucketsProp, e); + } + } + + /** + * Decompose predicates on fields of the row key into one or more HBaseScanRanges. This implementation supports + * pushing down conditions into each bucket, creating one HBaseScanRange per bucket, if a condition on the bucket isn't + * otherwise provided. For instance, given the schema: + * + * (bucket int, age int) + * + * and the condition: + * + * WHERE age >= 20 AND age < 25 + * + * the method will return an HBaseScanRange for each bucket, with the age predicate pushed down: + * + * (1/20, 1/25)... (numBuckets/20, numBuckets/25) + * + * Given a condition on the bucket, it will return a single range: + * + * WHERE bucket = 1 AND age >= 20 AND age < 25 =\> + * (1/20, 1/25) + * + */ + @Override + public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { + IndexPredicateAnalyzer analyzer = getIndexPredicateAnalyzer(); + + List searchConds = Lists.newArrayList(); + + ExprNodeDesc residual = analyzer.analyzePredicate(predicate, searchConds); + + StructTypeInfo keyColumnType = (StructTypeInfo) keyMapping.columnType; + String bucketCol = keyColumnType.getAllStructFieldNames().get(0); + TypeInfo bucketTypeInfo = keyColumnType.getStructFieldTypeInfo(bucketCol); + + if (! (bucketTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE && + ((PrimitiveTypeInfo) bucketTypeInfo).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.INT)) { + throw new IllegalArgumentException("Bucket field " + bucketCol + " must be of type int"); + } + + ArrayList scanRanges; + + try { + // search for a condition already on the bucket + boolean conditionOnBucket = false; + for (IndexSearchCondition sc : searchConds) { + if (sc.getFields()[0].equals(bucketCol)) { + conditionOnBucket = true; + break; + } + } + + if (conditionOnBucket) { + scanRanges = Lists.newArrayList(super.setupFilter(keyMapping.getColumnName(), searchConds)); + } else { + scanRanges = Lists.newArrayList(); + List searchCondsWithBucket = cons(null, searchConds); + // no condition on the bucket column; create an artificial one for each bucket value. + for (int i = 0; i < this.numBuckets; i++) { + ExprNodeColumnDesc keyColumnDesc = searchConds.get(0).getColumnDesc(); + searchCondsWithBucket.set(0, searchConditionForBucketValue(bucketCol, i, keyColumnDesc)); + scanRanges.add(super.setupFilter(keyMapping.getColumnName(), searchCondsWithBucket)); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new DecomposedPredicate(analyzer.translateSearchConditions(searchConds), scanRanges, (ExprNodeGenericFuncDesc) residual); + } + + /** + * Create an artificial search condition on the bucket column. + * @param bucketCol + * @param value + * @param keyColumnDesc + * @return + */ + private IndexSearchCondition searchConditionForBucketValue(String bucketCol, int value, ExprNodeColumnDesc keyColumnDesc) { + return new IndexSearchCondition( + new ExprNodeColumnDesc( + keyMapping.getColumnType(), + keyColumnDesc.getColumn(), + keyColumnDesc.getTabAlias(), + keyColumnDesc.getIsPartitionColOrVirtualCol() + ), + GenericUDFOPEqual.class.getCanonicalName(), + + new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, value), + null, + new String[]{bucketCol} + ); + } + + /** + * Create a new list formed by inserting ele at the front of lst. + * @param ele + * @param lst + * @param + * @return + */ + private List cons(T ele, List lst) { + List rtn = Lists.newArrayListWithCapacity(lst.size() + 1); + rtn.add(ele); + rtn.addAll(lst); + return rtn; + } + + private IndexPredicateAnalyzer getIndexPredicateAnalyzer() { + IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(true); + + for (Class compareOp : ALLOWED_COMPARISONS) { + analyzer.addComparisonOp(compareOp.getCanonicalName()); + } + analyzer.allowColumnName(keyMapping.getColumnName()); + analyzer.setAcceptsFields(true); + return analyzer; + } +} diff --git hbase-handler/src/test/queries/positive/hbase_multiscan_pushdown.q hbase-handler/src/test/queries/positive/hbase_multiscan_pushdown.q new file mode 100644 index 0000000..289fa2d --- /dev/null +++ hbase-handler/src/test/queries/positive/hbase_multiscan_pushdown.q @@ -0,0 +1,34 @@ +CREATE TABLE hbase_multiscan( + key struct, + value string) + ROW FORMAT SERDE + 'org.apache.hadoop.hive.hbase.HBaseSerDe' + STORED BY + 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ( + 'serialization.format'='1', + 'hbase.columns.mapping'=':key,cf:string', + 'hbase.composite.key.factory'='org.apache.hadoop.hive.hbase.TestMultiScanHBaseKeyFactory' + ) + TBLPROPERTIES ( + 'hbase.table.name'='mscan2', 'hive.hbase.multiscan.num_buckets'='3'); + +FROM src +INSERT INTO TABLE hbase_multiscan SELECT +named_struct( + "bucket", cast(hash(cast(key as int)) % 3 as string), + "other_val", cast(key as string)), +cast(value as string); + + +-- {"bucket":"1","other_val":"10"} val_10 +-- {"bucket":"1","other_val":"100"} val_100 +-- {"bucket":"1","other_val":"103"} val_103 +-- {"bucket":"1","other_val":"118"} val_118 +SELECT * FROM hbase_multiscan WHERE key.bucket = 1 AND key.other_val >= "0" AND key.other_val < "12" ORDER BY value; + +-- {"bucket":"2","other_val":"128"} val_128 +SELECT * FROM hbase_multiscan WHERE key.other_val = "128"; + +DROP TABLE hbase_multiscan; diff --git hbase-handler/src/test/results/positive/hbase_multiscan_pushdown.q.out hbase-handler/src/test/results/positive/hbase_multiscan_pushdown.q.out new file mode 100644 index 0000000..30613f8 --- /dev/null +++ hbase-handler/src/test/results/positive/hbase_multiscan_pushdown.q.out @@ -0,0 +1,92 @@ +PREHOOK: query: CREATE TABLE hbase_multiscan( + key struct, + value string) + ROW FORMAT SERDE + 'org.apache.hadoop.hive.hbase.HBaseSerDe' + STORED BY + 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ( + 'serialization.format'='1', + 'hbase.columns.mapping'=':key,cf:string', + 'hbase.composite.key.factory'='org.apache.hadoop.hive.hbase.TestMultiScanHBaseKeyFactory' + ) + TBLPROPERTIES ( + 'hbase.table.name'='mscan2', 'hive.hbase.multiscan.num_buckets'='3') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: CREATE TABLE hbase_multiscan( + key struct, + value string) + ROW FORMAT SERDE + 'org.apache.hadoop.hive.hbase.HBaseSerDe' + STORED BY + 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ( + 'serialization.format'='1', + 'hbase.columns.mapping'=':key,cf:string', + 'hbase.composite.key.factory'='org.apache.hadoop.hive.hbase.TestMultiScanHBaseKeyFactory' + ) + TBLPROPERTIES ( + 'hbase.table.name'='mscan2', 'hive.hbase.multiscan.num_buckets'='3') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_multiscan +PREHOOK: query: FROM src +INSERT INTO TABLE hbase_multiscan SELECT +named_struct( + "bucket", cast(hash(cast(key as int)) % 3 as string), + "other_val", cast(key as string)), +cast(value as string) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_multiscan +POSTHOOK: query: FROM src +INSERT INTO TABLE hbase_multiscan SELECT +named_struct( + "bucket", cast(hash(cast(key as int)) % 3 as string), + "other_val", cast(key as string)), +cast(value as string) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_multiscan +PREHOOK: query: -- {"bucket":"1","other_val":"10"} val_10 +-- {"bucket":"1","other_val":"100"} val_100 +-- {"bucket":"1","other_val":"103"} val_103 +-- {"bucket":"1","other_val":"118"} val_118 +SELECT * FROM hbase_multiscan WHERE key.bucket = 1 AND key.other_val >= "0" AND key.other_val < "12" ORDER BY value +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +POSTHOOK: query: -- {"bucket":"1","other_val":"10"} val_10 +-- {"bucket":"1","other_val":"100"} val_100 +-- {"bucket":"1","other_val":"103"} val_103 +-- {"bucket":"1","other_val":"118"} val_118 +SELECT * FROM hbase_multiscan WHERE key.bucket = 1 AND key.other_val >= "0" AND key.other_val < "12" ORDER BY value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +{"bucket":"1","other_val":"10"} val_10 +{"bucket":"1","other_val":"100"} val_100 +{"bucket":"1","other_val":"103"} val_103 +{"bucket":"1","other_val":"118"} val_118 +PREHOOK: query: -- {"bucket":"2","other_val":"128"} val_128 +SELECT * FROM hbase_multiscan WHERE key.other_val = "128" +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +POSTHOOK: query: -- {"bucket":"2","other_val":"128"} val_128 +SELECT * FROM hbase_multiscan WHERE key.other_val = "128" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +{"bucket":"2","other_val":"128"} val_128 +PREHOOK: query: DROP TABLE hbase_multiscan +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@hbase_multiscan +PREHOOK: Output: default@hbase_multiscan +POSTHOOK: query: DROP TABLE hbase_multiscan +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@hbase_multiscan +POSTHOOK: Output: default@hbase_multiscan diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java index 7d7c764..699f247 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java @@ -64,6 +64,17 @@ public DecomposedPredicate decomposePredicate( * Struct class for returning multiple values from decomposePredicate. */ public static class DecomposedPredicate { + + public DecomposedPredicate() { + + } + + public DecomposedPredicate(ExprNodeGenericFuncDesc pushedPredicate, Serializable pushedPredicateObject, ExprNodeGenericFuncDesc residualPredicate) { + this.pushedPredicate = pushedPredicate; + this.pushedPredicateObject = pushedPredicateObject; + this.residualPredicate = residualPredicate; + } + /** * Portion of predicate to be evaluated by storage handler. Hive * will pass this into the storage handler's input format.