diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java index 18fb5ea..d8a9961 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java @@ -47,7 +47,7 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOExce } @Override - public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { + public HBaseDecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { return HBaseStorageHandler.decomposePredicate(jobConf, (HBaseSerDe) deserializer, predicate); } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java index 0cc21fa..1b18ac5 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; @@ -37,36 +38,38 @@ public static final Log LOG = LogFactory.getLog(AbstractHBaseKeyPredicateDecomposer.class); - public DecomposedPredicate decomposePredicate(String keyColName, ExprNodeDesc predicate) { + public HBaseDecomposedPredicate decomposePredicate(String keyColName, ExprNodeDesc predicate) { IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(true); analyzer.allowColumnName(keyColName); analyzer.setAcceptsFields(true); analyzer.setFieldValidator(getFieldValidator()); - DecomposedPredicate decomposed = new DecomposedPredicate(); - List conditions = new ArrayList(); - decomposed.residualPredicate = + ExprNodeGenericFuncDesc residualPredicate = (ExprNodeGenericFuncDesc) analyzer.analyzePredicate(predicate, conditions); + ExprNodeGenericFuncDesc pushedPredicate = null; + List pushedPredicateObject = null; + + if (!conditions.isEmpty()) { - decomposed.pushedPredicate = analyzer.translateSearchConditions(conditions); + pushedPredicate = analyzer.translateSearchConditions(conditions); try { - decomposed.pushedPredicateObject = getScanRange(conditions); + pushedPredicateObject = getScanRanges(conditions); } catch (Exception e) { LOG.warn("Failed to decompose predicates", e); return null; } } - - return decomposed; + return new HBaseDecomposedPredicate(pushedPredicate, pushedPredicateObject, residualPredicate); } /** - * Get the scan range that specifies the start/stop keys and/or filters to be applied onto the - * hbase scan - * */ - protected abstract HBaseScanRange getScanRange(List searchConditions) - throws Exception; + * Get a list of scan ranges specifying start/stop keys and/or filters for one or more HBase scans. + * + * @param searchConditions + * @return + */ + protected abstract List getScanRanges(List searchConditions) throws Exception; /** * Get an optional {@link IndexPredicateAnalyzer.FieldValidator validator}. A validator can be diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseDecomposedPredicate.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseDecomposedPredicate.java new file mode 100644 index 0000000..8fba47e --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseDecomposedPredicate.java @@ -0,0 +1,41 @@ +package org.apache.hadoop.hive.hbase; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; + +import java.io.Serializable; +import java.util.List; + +/** + * A DecomposedPredicate for the HBase handler. The primary customization is that the pushedPredicateObject must be + * a List. You *could* hack around this, since the fields are public, but {@link org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat} + * won't understand any other types. + */ +public class HBaseDecomposedPredicate extends HiveStoragePredicateHandler.DecomposedPredicate { + + public HBaseDecomposedPredicate(ExprNodeGenericFuncDesc pushedPredicate, List pushedPredicateObject, ExprNodeGenericFuncDesc residualPredicate) { + super(pushedPredicate, getListAsSerializable(pushedPredicateObject), residualPredicate); + } + + /** + * Ensure that pushedPredicateObject is serializable by checking its runtime type. If it's not an instance of Serializable, + * copy it to an ArrayList (which is Serializable) + * @param pushedPredicateObject + * @return + */ + private static Serializable getListAsSerializable(List pushedPredicateObject) { + + if (pushedPredicateObject == null) { + return null; + } + + Serializable serializablePushedPred; + if (! (pushedPredicateObject instanceof Serializable)) { + serializablePushedPred = Lists.newArrayList(pushedPredicateObject); + } else { + serializablePushedPred = (Serializable) pushedPredicateObject; + } + return serializablePushedPred; + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java index 251d22e..4693297 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.hbase; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -39,6 +41,29 @@ public interface HBaseKeyFactory extends HiveStoragePredicateHandler { /** + * Gives the storage handler a chance to decompose a predicate. The storage + * handler should analyze the predicate and return the portion of it which + * cannot be evaluated during table access. For example, if the original + * predicate is x = 2 AND upper(y)='YUM', the storage handler + * might be able to handle x = 2 but leave the "residual" + * upper(y)='YUM' for Hive to deal with. The breakdown + * need not be non-overlapping; for example, given the + * predicate x LIKE 'a%b', the storage handler might + * be able to evaluate the prefix search x LIKE 'a%', leaving + * x LIKE '%b' as the residual. + * + * @param jobConf contains a job configuration matching the one that + * will later be passed to getRecordReader and getSplits + * @param deserializer deserializer which will be used when + * fetching rows + * @param predicate predicate to be decomposed + * @return decomposed form of predicate, or null if no pushdown is + * possible at all + */ + @Override + HBaseDecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate); + + /** * initialize factory with properties */ void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException; diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseShimRecordReader.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseShimRecordReader.java new file mode 100644 index 0000000..8db18e6 --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseShimRecordReader.java @@ -0,0 +1,71 @@ +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.RecordReader; + +import java.io.IOException; + +/** + * mapred.RecordReader wrapper around mapreduce.RecordReader + */ +public class HBaseShimRecordReader implements RecordReader { + + private final org.apache.hadoop.mapreduce.RecordReader recordReader; + + public HBaseShimRecordReader(org.apache.hadoop.mapreduce.RecordReader recordReader) { + this.recordReader = recordReader; + } + + @Override + public void close() throws IOException { + recordReader.close(); + } + + @Override + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + @Override + public ResultWritable createValue() { + return new ResultWritable(new Result()); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public float getProgress() throws IOException { + float progress = 0.0F; + + try { + progress = recordReader.getProgress(); + } catch (InterruptedException e) { + throw new IOException(e); + } + + return progress; + } + + @Override + public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException { + + boolean next = false; + + try { + next = recordReader.nextKeyValue(); + + if (next) { + rowKey.set(recordReader.getCurrentValue().getRow()); + value.setResult(recordReader.getCurrentValue()); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + + return next; + } +} diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index feb3cd1..31aa024 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -490,7 +490,7 @@ public DecomposedPredicate decomposePredicate( return keyFactory.decomposePredicate(jobConf, deserializer, predicate); } - public static DecomposedPredicate decomposePredicate( + public static HBaseDecomposedPredicate decomposePredicate( JobConf jobConf, HBaseSerDe hBaseSerDe, ExprNodeDesc predicate) { @@ -520,10 +520,6 @@ public static DecomposedPredicate decomposePredicate( return null; } - DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); - decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions( - searchConditions); - decomposedPredicate.residualPredicate = residualPredicate; - return decomposedPredicate; + return new HBaseDecomposedPredicate(analyzer.translateSearchConditions(searchConditions), null, residualPredicate); } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 4ac0803..f523fba 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -20,20 +20,16 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; -import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; @@ -46,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -54,7 +51,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; @@ -63,142 +60,264 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; /** * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler * tables, decorating an underlying HBase TableInputFormat with extra Hive logic * such as column pruning and filter pushdown. */ -public class HiveHBaseTableInputFormat extends TableInputFormatBase +public class HiveHBaseTableInputFormat extends HiveMultiTableInputFormatBase implements InputFormat { static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class); @Override - public RecordReader getRecordReader( - InputSplit split, - JobConf jobConf, - final Reporter reporter) throws IOException { + public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { + // Take filter pushdown into account while calculating splits; this + // allows us to prune off regions immediately. Note that although + // the Javadoc for the superclass getSplits says that it returns one + // split per region, the implementation actually takes the scan + // definition into account and excludes regions which don't satisfy + // the start/stop row conditions (HBASE-1829). + List scans = createFilterScans(jobConf); + // REVIEW: are we supposed to be applying the getReadColumnIDs + // same as in getRecordReader? - HBaseSplit hbaseSplit = (HBaseSplit) split; - TableSplit tableSplit = hbaseSplit.getTableSplit(); + for (Scan scan : scans) { + pushScanColumns(scan, jobConf); + configureScan(scan, jobConf); + } - setHTable(HiveHBaseInputFormatUtil.getTable(jobConf)); - setScan(HiveHBaseInputFormatUtil.getScan(jobConf)); + setScans(scans); + return super.getSplits(jobConf, numSplits); + } - Job job = new Job(jobConf); - TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext( - job.getConfiguration(), reporter); + private void configureScan( + Scan scan, + JobConf jobConf) throws IOException { - final org.apache.hadoop.mapreduce.RecordReader - recordReader = createRecordReader(tableSplit, tac); - try { - recordReader.initialize(tableSplit, tac); - } catch (InterruptedException e) { - throw new IOException("Failed to initialize RecordReader", e); + String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); + + if (hbaseTableName == null) { + throw new IOException("HBase table must be specified in the JobConf"); + } else { + scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(hbaseTableName)); } - return new RecordReader() { + String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); + if (scanCache != null) { + scan.setCaching(Integer.valueOf(scanCache)); + } + String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); + if (scanCacheBlocks != null) { + scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); + } + String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); + if (scanBatch != null) { + scan.setBatch(Integer.valueOf(scanBatch)); + } + } - @Override - public void close() throws IOException { - recordReader.close(); - } + /** + * Push the configured columns to be read into scan, reading all necessary values from jobConf and delegating + * to {@link #pushScanColumns(org.apache.hadoop.hbase.client.Scan, ColumnMappings, java.util.List, boolean)} + * @param scan + * @param jobConf + * @throws IOException + */ + private void pushScanColumns(Scan scan, JobConf jobConf) throws IOException { + String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); + ColumnMappings columnMappings; - @Override - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } + try { + columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); + } catch (SerDeException e) { + throw new IOException(e); + } - @Override - public ResultWritable createValue() { - return new ResultWritable(new Result()); - } + List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); - @Override - public long getPos() throws IOException { - return 0; - } + boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf); - @Override - public float getProgress() throws IOException { - float progress = 0.0F; + pushScanColumns(scan, columnMappings, readColIDs, readAllColumns); + } - try { - progress = recordReader.getProgress(); - } catch (InterruptedException e) { - throw new IOException(e); - } + private void pushScanColumns(Scan scan, ColumnMappings columnMappings, List readColIDs, boolean readAllColumns) throws IOException { - return progress; - } - @Override - public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException { + if (columnMappings.size() < readColIDs.size()) { + throw new IOException("Cannot read more columns than the given table contains."); + } - boolean next = false; + boolean empty = true; - try { - next = recordReader.nextKeyValue(); + // The list of families that have been added to the scan + List addedFamilies = new ArrayList(); - if (next) { - rowKey.set(recordReader.getCurrentValue().getRow()); - value.setResult(recordReader.getCurrentValue()); + if (!readAllColumns) { + ColumnMapping[] columnsMapping = columnMappings.getColumnsMapping(); + for (int i : readColIDs) { + ColumnMapping colMap = columnsMapping[i]; + if (colMap.hbaseRowKey) { + continue; + } + + if (colMap.qualifierName == null) { + scan.addFamily(colMap.familyNameBytes); + addedFamilies.add(colMap.familyName); + } else { + if(!addedFamilies.contains(colMap.familyName)){ + // add only if the corresponding family has not already been added + scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); } - } catch (InterruptedException e) { - throw new IOException(e); } - return next; + empty = false; + } + } + + // The HBase table's row key maps to a Hive table column. In the corner case when only the + // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/ + // column qualifier will have been added to the scan. We arbitrarily add at least one column + // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive + // tables column projection. + if (empty) { + for (ColumnMapping colMap: columnMappings) { + if (colMap.hbaseRowKey) { + continue; + } + + if (colMap.qualifierName == null) { + scan.addFamily(colMap.familyNameBytes); + } else { + scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); + } + + if (!readAllColumns) { + break; + } } - }; + } } + /** * Converts a filter (which has been pushed down from Hive's optimizer) - * into corresponding restrictions on the HBase scan. The - * filter should already be in a form which can be fully converted. + * into corresponding restrictions on the HBase scan. * - * @param jobConf configuration for the scan - * - * @param iKey 0-based offset of key column within Hive table - * - * @return converted table split if any + * If a list of HBaseScanRanges has been pushed as TableScanDesc.FILTER_OBJECT_CONF_STR, use that; otherwise use TableScanDesc.FILTER_EXPR_CONF_STR. If nothing has been + * pushed, return a single element list containing an unbounded scan. + * @return converted scan */ - private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) - throws IOException { + private List createFilterScans(JobConf jobConf) throws IOException { + String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); + String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL + // If nothing was pushed, exit early--some other things might not be present as well. + if (filterObjectSerialized == null && filterExprSerialized == null) { + return ImmutableList.of(new Scan()); + } - Scan scan = new Scan(); - String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); + String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); + + if (hbaseColumnsMapping == null) { + throw new IOException("hbase.columns.mapping required for HBase Table."); + } + + + String listColumnsString = jobConf.get(serdeConstants.LIST_COLUMNS); + String listColumnTypeString = jobConf.get(serdeConstants.LIST_COLUMN_TYPES); + + List columns = Arrays.asList(listColumnsString.split(",")); + List columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(listColumnTypeString); + + String defaultStorageType = jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"); + + ColumnMappings columnMappings; + try { + columnMappings = getColumnMappings(hbaseColumnsMapping, doColumnRegexMatching, columns, columnTypes, defaultStorageType); + } catch (SerDeException e) { + throw new IOException(e); + } + + // TODO: do we always have a keyMapping? + ColumnMapping keyMapping = columnMappings.getKeyMapping(); + + + List ranges = null; if (filterObjectSerialized != null) { - HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized, - HBaseScanRange.class); - try { - range.setup(scan, jobConf); - } catch (Exception e) { - throw new IOException(e); - } - return scan; + ranges = (List) Utilities.deserializeObject(filterObjectSerialized, + ArrayList.class); } - String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - if (filterExprSerialized == null) { - return scan; + ExprNodeGenericFuncDesc filterExpr = filterExprSerialized != null ? + Utilities.deserializeExpression(filterExprSerialized) : null; + + boolean isKeyBinary = getStorageFormatOfKey(keyMapping.mappingSpec, + defaultStorageType); + + return createFilterScans(ranges, filterExpr, keyMapping, isKeyBinary, jobConf); + } + + private ColumnMappings getColumnMappings(String hbaseColumnMappings, + boolean doColumnRegexMatching, + List columnsList, + List columnTypeList, + String defaultStorageType + ) throws SerDeException { + ColumnMappings columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnMappings, doColumnRegexMatching); + + // Only non virtual columns are mapped, and all of the virtual columns ought to be at the end. Therefore, + // take only as many columns as there are in the mapping. + List nonVirtualColumns = columnsList.subList(0, columnMappings.size()); + List nonVirtualColumnTypes = columnTypeList.subList(0, columnMappings.size()); + columnMappings.setHiveColumnDescription(HBaseSerDe.class.getName(), nonVirtualColumns, nonVirtualColumnTypes); + columnMappings.parseColumnStorageTypes(defaultStorageType); + + return columnMappings; + } + + /** + * Perform the actual conversion from pushed objects (deserialized from the JobConf) + * to a List + */ + private List createFilterScans(List ranges, ExprNodeGenericFuncDesc filterExpr, ColumnMapping keyMapping, boolean isKeyBinary, JobConf jobConf) throws IOException { + + // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL + + if (ranges != null) { + List rtn = Lists.newArrayListWithCapacity(ranges.size()); + + for (HBaseScanRange range : ranges) { + Scan scan = new Scan(); + try { + range.setup(scan, jobConf); + } catch (Exception e) { + throw new IOException(e); + } + rtn.add(scan); + } + return rtn; + } else if (filterExpr != null) { + return ImmutableList.of(createScanFromFilterExpr(filterExpr, keyMapping, isKeyBinary)); + } else { + // nothing pushed; just return an unbounded scan. + return ImmutableList.of(new Scan()); } - ExprNodeGenericFuncDesc filterExpr = - Utilities.deserializeExpression(filterExprSerialized); + } - String colName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; - String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey]; - IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(colName,colType, isKeyBinary); + /** + * Create a scan with the filters represented by filterExpr. + * @param filterExpr + * @param isKeyBinary + * @return + * @throws IOException + */ + private Scan createScanFromFilterExpr(ExprNodeGenericFuncDesc filterExpr, ColumnMapping keyMapping, boolean isKeyBinary) throws IOException { + Scan scan = new Scan(); + IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(keyMapping.getColumnName(), keyMapping.getColumnType(), isKeyBinary); List searchConditions = new ArrayList(); @@ -264,7 +383,7 @@ private Scan createFilterScan(JobConf jobConf, int iKey, boolean isKeyBinary) return scan; } - private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, + private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, boolean isKeyBinary) throws IOException{ if (!isKeyBinary){ @@ -351,75 +470,27 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( return analyzer; } - @Override - public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { - - //obtain delegation tokens for the job - TableMapReduceUtil.initCredentials(jobConf); - - String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); - String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); - boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); + private boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ - if (hbaseColumnsMapping == null) { - throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table."); - } - - ColumnMappings columnMappings = null; - try { - columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); - } catch (SerDeException e) { - throw new IOException(e); - } + String[] mapInfo = spec.split("#"); + boolean tblLevelDefault = "binary".equalsIgnoreCase(defaultFormat) ? true : false; - int iKey = columnMappings.getKeyIndex(); - ColumnMapping keyMapping = columnMappings.getKeyMapping(); + switch (mapInfo.length) { + case 1: + return tblLevelDefault; - // Take filter pushdown into account while calculating splits; this - // allows us to prune off regions immediately. Note that although - // the Javadoc for the superclass getSplits says that it returns one - // split per region, the implementation actually takes the scan - // definition into account and excludes regions which don't satisfy - // the start/stop row conditions (HBASE-1829). - Scan scan = createFilterScan(jobConf, iKey, - HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec, - jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); - - // The list of families that have been added to the scan - List addedFamilies = new ArrayList(); - - // REVIEW: are we supposed to be applying the getReadColumnIDs - // same as in getRecordReader? - for (ColumnMapping colMap : columnMappings) { - if (colMap.hbaseRowKey) { - continue; + case 2: + String storageType = mapInfo[1]; + if(storageType.equals("-")) { + return tblLevelDefault; + } else if ("string".startsWith(storageType)){ + return false; + } else if ("binary".startsWith(storageType)){ + return true; } - if (colMap.qualifierName == null) { - scan.addFamily(colMap.familyNameBytes); - addedFamilies.add(colMap.familyName); - } else { - if(!addedFamilies.contains(colMap.familyName)){ - // add the column only if the family has not already been added - scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); - } - } - } - setScan(scan); - - Job job = new Job(jobConf); - JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); - Path [] tablePaths = FileInputFormat.getInputPaths(jobContext); - - List splits = - super.getSplits(jobContext); - InputSplit [] results = new InputSplit[splits.size()]; - - for (int i = 0; i < splits.size(); i++) { - results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]); + default: + throw new IOException("Malformed string: " + spec); } - - return results; } } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveMultiTableInputFormatBase.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveMultiTableInputFormatBase.java new file mode 100644 index 0000000..163209b --- /dev/null +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveMultiTableInputFormatBase.java @@ -0,0 +1,62 @@ +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase; +import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +/** + * mapred port of {@link org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase} + */ +public class HiveMultiTableInputFormatBase extends MultiTableInputFormatBase implements InputFormat { + + @Override + public InputSplit[] getSplits(JobConf jobConf, int splitNum) throws IOException { + //obtain delegation tokens for the job + TableMapReduceUtil.initCredentials(jobConf); + Job job = new Job(jobConf); + org.apache.hadoop.mapreduce.JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); + + + List splits = super.getSplits(jobContext); + + //Convert mapreduce splits to mapred splits + InputSplit[] rtn = new InputSplit[splits.size()]; + + for (int i = 0; i < splits.size(); i++) { + rtn[i] = new HBaseSplit((TableSplit) splits.get(i), FileInputFormat.getInputPaths(jobConf)[0]); + } + + return rtn; + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { + HBaseSplit hbaseSplit = (HBaseSplit) inputSplit; + TableSplit tableSplit = hbaseSplit.getTableSplit(); + Job job = new Job(jobConf); + HadoopShims hadoopShims = ShimLoader.getHadoopShims(); + TaskAttemptContext tac = hadoopShims.newTaskAttemptContext( + job.getConfiguration(), reporter); + + final org.apache.hadoop.mapreduce.RecordReader recordReader; + try { + recordReader = createRecordReader(tableSplit, tac); + recordReader.initialize(tableSplit, tac); + } catch (InterruptedException e) { + throw new IOException("Failed to initialized RecordReader", e); + } + + return new HBaseShimRecordReader(recordReader); + } + +} diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java index ecd5061..4a09517 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory2.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.hbase; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -73,11 +74,11 @@ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException return output.getLength() > 0 ? output.toByteArray() : null; } - private byte[] toBinary(String value, int max, boolean end, boolean nextBA) { + private static byte[] toBinary(String value, int max, boolean end, boolean nextBA) { return toBinary(value.getBytes(), max, end, nextBA); } - private byte[] toBinary(byte[] value, int max, boolean end, boolean nextBA) { + private static byte[] toBinary(byte[] value, int max, boolean end, boolean nextBA) { byte[] bytes = new byte[max + 1]; System.arraycopy(value, 0, bytes, 0, Math.min(value.length, max)); if (end) { @@ -90,7 +91,7 @@ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException } @Override - public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + public HBaseDecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { String keyColName = keyMapping.columnName; @@ -98,101 +99,124 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese analyzer.allowColumnName(keyColName); analyzer.setAcceptsFields(true); - DecomposedPredicate decomposed = new DecomposedPredicate(); List searchConditions = new ArrayList(); - decomposed.residualPredicate = - (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions); + + ExprNodeGenericFuncDesc pushedPredicate = null; + List pushedPredicateObject = null; + ExprNodeGenericFuncDesc residualPredicate = (ExprNodeGenericFuncDesc) analyzer.analyzePredicate(predicate, searchConditions); if (!searchConditions.isEmpty()) { - decomposed.pushedPredicate = analyzer.translateSearchConditions(searchConditions); + pushedPredicate = analyzer.translateSearchConditions(searchConditions); try { - decomposed.pushedPredicateObject = setupFilter(keyColName, searchConditions); + pushedPredicateObject = new FixedLengthPredicateDecomposer(keyMapping).getScanRanges(searchConditions); } catch (IOException e) { throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); } } - return decomposed; + + return new HBaseDecomposedPredicate(pushedPredicate, pushedPredicateObject, residualPredicate); } - private HBaseScanRange setupFilter(String keyColName, List conditions) - throws IOException { - Map> fieldConds = - new HashMap>(); - for (IndexSearchCondition condition : conditions) { - assert keyColName.equals(condition.getColumnDesc().getColumn()); - String fieldName = condition.getFields()[0]; - List fieldCond = fieldConds.get(fieldName); - if (fieldCond == null) { - fieldConds.put(fieldName, fieldCond = new ArrayList()); - } - fieldCond.add(condition); - } - HBaseScanRange range = new HBaseScanRange(); - ByteArrayOutputStream startRow = new ByteArrayOutputStream(); - ByteArrayOutputStream stopRow = new ByteArrayOutputStream(); + public static class FixedLengthPredicateDecomposer extends AbstractHBaseKeyPredicateDecomposer { - StructTypeInfo type = (StructTypeInfo) keyMapping.columnType; - for (String name : type.getAllStructFieldNames()) { - List fieldCond = fieldConds.get(name); - if (fieldCond == null || fieldCond.size() > 2) { - continue; + private final ColumnMappings.ColumnMapping keyMapping; + + public FixedLengthPredicateDecomposer(ColumnMappings.ColumnMapping keyMapping) { + this.keyMapping = keyMapping; + } + + public HBaseScanRange getScanRange(List conditions) throws IOException { + Map> fieldConds = + new HashMap>(); + for (IndexSearchCondition condition : conditions) { + assert keyMapping.getColumnName().equals(condition.getColumnDesc().getColumn()); + String fieldName = condition.getFields()[0]; + List fieldCond = fieldConds.get(fieldName); + if (fieldCond == null) { + fieldConds.put(fieldName, fieldCond = new ArrayList()); + } + fieldCond.add(condition); } - byte[] startElement = null; - byte[] stopElement = null; - for (IndexSearchCondition condition : fieldCond) { - if (condition.getConstantDesc().getValue() == null) { + HBaseScanRange range = new HBaseScanRange(); + + ByteArrayOutputStream startRow = new ByteArrayOutputStream(); + ByteArrayOutputStream stopRow = new ByteArrayOutputStream(); + + StructTypeInfo type = (StructTypeInfo) keyMapping.columnType; + for (String name : type.getAllStructFieldNames()) { + List fieldCond = fieldConds.get(name); + if (fieldCond == null || fieldCond.size() > 2) { continue; } - String comparisonOp = condition.getComparisonOp(); - String constantVal = String.valueOf(condition.getConstantDesc().getValue()); - - if (comparisonOp.endsWith("UDFOPEqual")) { - startElement = toBinary(constantVal, FIXED_LENGTH, false, false); - stopElement = toBinary(constantVal, FIXED_LENGTH, true, true); - } else if (comparisonOp.endsWith("UDFOPEqualOrGreaterThan")) { - startElement = toBinary(constantVal, FIXED_LENGTH, false, false); - } else if (comparisonOp.endsWith("UDFOPGreaterThan")) { - startElement = toBinary(constantVal, FIXED_LENGTH, false, true); - } else if (comparisonOp.endsWith("UDFOPEqualOrLessThan")) { - stopElement = toBinary(constantVal, FIXED_LENGTH, true, false); - } else if (comparisonOp.endsWith("UDFOPLessThan")) { - stopElement = toBinary(constantVal, FIXED_LENGTH, true, true); - } else { - throw new IOException(comparisonOp + " is not a supported comparison operator"); + byte[] startElement = null; + byte[] stopElement = null; + for (IndexSearchCondition condition : fieldCond) { + if (condition.getConstantDesc().getValue() == null) { + continue; + } + String comparisonOp = condition.getComparisonOp(); + String constantVal = String.valueOf(condition.getConstantDesc().getValue()); + + if (comparisonOp.endsWith("UDFOPEqual")) { + startElement = toBinary(constantVal, FIXED_LENGTH, false, false); + stopElement = toBinary(constantVal, FIXED_LENGTH, true, true); + } else if (comparisonOp.endsWith("UDFOPEqualOrGreaterThan")) { + startElement = toBinary(constantVal, FIXED_LENGTH, false, false); + } else if (comparisonOp.endsWith("UDFOPGreaterThan")) { + startElement = toBinary(constantVal, FIXED_LENGTH, false, true); + } else if (comparisonOp.endsWith("UDFOPEqualOrLessThan")) { + stopElement = toBinary(constantVal, FIXED_LENGTH, true, false); + } else if (comparisonOp.endsWith("UDFOPLessThan")) { + stopElement = toBinary(constantVal, FIXED_LENGTH, true, true); + } else { + throw new IOException(comparisonOp + " is not a supported comparison operator"); + } } - } - if (startRow != null) { - if (startElement != null) { - startRow.write(startElement); - } else { - if (startRow.size() > 0) { - range.setStartRow(startRow.toByteArray()); + if (startRow != null) { + if (startElement != null) { + startRow.write(startElement); + } else { + if (startRow.size() > 0) { + range.setStartRow(startRow.toByteArray()); + } + startRow = null; } - startRow = null; } - } - if (stopRow != null) { - if (stopElement != null) { - stopRow.write(stopElement); - } else { - if (stopRow.size() > 0) { - range.setStopRow(stopRow.toByteArray()); + if (stopRow != null) { + if (stopElement != null) { + stopRow.write(stopElement); + } else { + if (stopRow.size() > 0) { + range.setStopRow(stopRow.toByteArray()); + } + stopRow = null; } - stopRow = null; + } + if (startElement == null && stopElement == null) { + break; } } - if (startElement == null && stopElement == null) { - break; + if (startRow != null && startRow.size() > 0) { + range.setStartRow(startRow.toByteArray()); } + if (stopRow != null && stopRow.size() > 0) { + range.setStopRow(stopRow.toByteArray()); + } + return range; } - if (startRow != null && startRow.size() > 0) { - range.setStartRow(startRow.toByteArray()); - } - if (stopRow != null && stopRow.size() > 0) { - range.setStopRow(stopRow.toByteArray()); + /** + * Get a list of scan ranges specifying start/stop keys and/or filters for one or more HBase scans. + * + * @param conditions + * @return + */ + @Override + protected List getScanRanges(List conditions) throws Exception { + return Lists.newArrayList(getScanRange(conditions)); } - return range; } private static class FixedLengthed implements LazyObjectBase { diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory3.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory3.java index dfcbaf5..a180910 100644 --- hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory3.java +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseKeyFactory3.java @@ -1,5 +1,6 @@ package org.apache.hadoop.hive.hbase; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -7,9 +8,13 @@ import java.util.List; import java.util.Map; +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; @@ -25,88 +30,55 @@ public class TestHBaseKeyFactory3 extends TestHBaseKeyFactory2 { @Override - public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + public HBaseDecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { TestHBasePredicateDecomposer decomposedPredicate = new TestHBasePredicateDecomposer(keyMapping); return decomposedPredicate.decomposePredicate(keyMapping.columnName, predicate); } } -class TestHBasePredicateDecomposer extends AbstractHBaseKeyPredicateDecomposer { +class TestHBasePredicateDecomposer extends TestHBaseKeyFactory2.FixedLengthPredicateDecomposer { - private static final int FIXED_LENGTH = 10; - - private ColumnMapping keyMapping; TestHBasePredicateDecomposer(ColumnMapping keyMapping) { - this.keyMapping = keyMapping; + super(keyMapping); } @Override - public HBaseScanRange getScanRange(List searchConditions) + public List getScanRanges(List searchConditions) throws Exception { - Map> fieldConds = - new HashMap>(); - for (IndexSearchCondition condition : searchConditions) { - String fieldName = condition.getFields()[0]; - List fieldCond = fieldConds.get(fieldName); - if (fieldCond == null) { - fieldConds.put(fieldName, fieldCond = new ArrayList()); - } - fieldCond.add(condition); - } - Filter filter = null; - HBaseScanRange range = new HBaseScanRange(); - StructTypeInfo type = (StructTypeInfo) keyMapping.columnType; - for (String name : type.getAllStructFieldNames()) { - List fieldCond = fieldConds.get(name); - if (fieldCond == null || fieldCond.size() > 2) { - continue; - } - for (IndexSearchCondition condition : fieldCond) { - if (condition.getConstantDesc().getValue() == null) { - continue; - } - String comparisonOp = condition.getComparisonOp(); - String constantVal = String.valueOf(condition.getConstantDesc().getValue()); + List rtn = Lists.newArrayList(); - byte[] valueAsBytes = toBinary(constantVal, FIXED_LENGTH, false, false); + // Translate start stop bounds on the scan range into filter objects + // Note: the previous implementation of this method was incorrect: + // it simply took the last condition in searchConditions as the value for the key, which + // made it fail on conditions such as: key.col1 = 128 AND key.col2 = 1128 (since the filter would look for + // "1128\x00\x00..." , but the key would be: "128\x00\x00..." . + // This failure was masked by the fact that HiveHBaseTableInputFormat previously didn't + // pass the filter objects along to the record reader, causing the erroneous filters to be dropped. + for (HBaseScanRange scanRange : super.getScanRanges(searchConditions)) { + Filter filter; - if (comparisonOp.endsWith("UDFOPEqual")) { - filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(valueAsBytes)); - } else if (comparisonOp.endsWith("UDFOPEqualOrGreaterThan")) { - filter = new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(valueAsBytes)); - } else if (comparisonOp.endsWith("UDFOPGreaterThan")) { - filter = new RowFilter(CompareOp.GREATER, new BinaryComparator(valueAsBytes)); - } else if (comparisonOp.endsWith("UDFOPEqualOrLessThan")) { - filter = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(valueAsBytes)); - } else if (comparisonOp.endsWith("UDFOPLessThan")) { - filter = new RowFilter(CompareOp.LESS, new BinaryComparator(valueAsBytes)); - } else { - throw new IOException(comparisonOp + " is not a supported comparison operator"); + if (Arrays.equals(scanRange.getStartRow(), scanRange.getStopRow())) { + filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(scanRange.getStartRow())); + } else { + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + filter = filterList; + if (scanRange.getStartRow() != null) { + filterList.addFilter(new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(scanRange.getStartRow()))); } - } - } - if (filter != null) { - range.addFilter(filter); - } - return range; - } - private byte[] toBinary(String value, int max, boolean end, boolean nextBA) { - return toBinary(value.getBytes(), max, end, nextBA); - } + if (scanRange.getStopRow() != null) { + filterList.addFilter(new RowFilter(CompareOp.LESS, new BinaryComparator(scanRange.getStopRow()))); + } + } + HBaseScanRange scanRangeWithFilters = new HBaseScanRange(); - private byte[] toBinary(byte[] value, int max, boolean end, boolean nextBA) { - byte[] bytes = new byte[max + 1]; - System.arraycopy(value, 0, bytes, 0, Math.min(value.length, max)); - if (end) { - Arrays.fill(bytes, value.length, max, (byte) 0xff); - } - if (nextBA) { - bytes[max] = 0x01; + scanRangeWithFilters.addFilter(filter); + rtn.add(scanRangeWithFilters); } - return bytes; + + return rtn; } } \ No newline at end of file diff --git hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestMultiScanHBaseKeyFactory.java hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestMultiScanHBaseKeyFactory.java new file mode 100644 index 0000000..afe8e64 --- /dev/null +++ hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestMultiScanHBaseKeyFactory.java @@ -0,0 +1,180 @@ +package org.apache.hadoop.hive.hbase; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.plan.*; +import org.apache.hadoop.hive.ql.udf.generic.*; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.JobConf; + +import javax.management.RuntimeErrorException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Key factory supporting fixed length composite row keys, where the first field in the row key is a "bucket" value intended + * to distribute data. Predicate pushdown in this class works by pushing down predicates on the rest of the fields + * to all buckets if a predicate on the bucket isn't provided. + */ +public class TestMultiScanHBaseKeyFactory extends TestHBaseKeyFactory2 { + + public static final String HIVE_HBASE_MULTISCAN_NUM_BUCKETS = "hive.hbase.multiscan.num_buckets"; + private int numBuckets; + + public static final List> ALLOWED_COMPARISONS = ImmutableList.of( + GenericUDFOPGreaterThan.class, + GenericUDFOPEqualOrGreaterThan.class, + GenericUDFOPEqual.class, + GenericUDFOPLessThan.class, + GenericUDFOPEqualOrLessThan.class + ); + + @Override + public void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException { + super.init(hbaseParam, properties); + + super.init(hbaseParam, properties); + String numBucketsProp = properties.getProperty(HIVE_HBASE_MULTISCAN_NUM_BUCKETS); + + try { + this.numBuckets = Integer.parseInt(numBucketsProp); + } catch (NullPointerException e) { + throw new SerDeException( + "Expected table property " + HIVE_HBASE_MULTISCAN_NUM_BUCKETS + " to be a valid integer; was: " + numBucketsProp, e); + } catch (NumberFormatException e) { + throw new SerDeException( + "Expected table property " + HIVE_HBASE_MULTISCAN_NUM_BUCKETS + " to be a valid integer; was: " + numBucketsProp, e); + } + } + + /** + * Decompose predicates on fields of the row key into one or more HBaseScanRanges. This implementation supports + * pushing down conditions into each bucket, creating one HBaseScanRange per bucket, if a condition on the bucket isn't + * otherwise provided. For instance, given the schema: + * + * (bucket int, age int) + * + * and the condition: + * + * WHERE age >= 20 AND age < 25 + * + * the method will return an HBaseScanRange for each bucket, with the age predicate pushed down: + * + * (1/20, 1/25)... (numBuckets/20, numBuckets/25) + * + * Given a condition on the bucket, it will return a single range: + * + * WHERE bucket = 1 AND age >= 20 AND age < 25 =\> + * (1/20, 1/25) + * + */ + @Override + public HBaseDecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { + IndexPredicateAnalyzer analyzer = getIndexPredicateAnalyzer(); + + List searchConds = Lists.newArrayList(); + + ExprNodeDesc residual = analyzer.analyzePredicate(predicate, searchConds); + + StructTypeInfo keyColumnType = (StructTypeInfo) keyMapping.columnType; + String bucketCol = keyColumnType.getAllStructFieldNames().get(0); + TypeInfo bucketTypeInfo = keyColumnType.getStructFieldTypeInfo(bucketCol); + + if (! (bucketTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE && + ((PrimitiveTypeInfo) bucketTypeInfo).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.INT)) { + throw new IllegalArgumentException("Bucket field " + bucketCol + " must be of type int"); + } + + List scanRanges; + + try { + // search for a condition already on the bucket + boolean conditionOnBucket = false; + for (IndexSearchCondition sc : searchConds) { + if (sc.getFields()[0].equals(bucketCol)) { + conditionOnBucket = true; + break; + } + } + + FixedLengthPredicateDecomposer predicateDecomposer = new FixedLengthPredicateDecomposer(keyMapping); + if (conditionOnBucket) { + scanRanges = predicateDecomposer.getScanRanges(searchConds); + } else { + scanRanges = Lists.newArrayList(); + List searchCondsWithBucket = cons(null, searchConds); + // no condition on the bucket column; create an artificial one for each bucket value. + for (int i = 0; i < this.numBuckets; i++) { + ExprNodeColumnDesc keyColumnDesc = searchConds.get(0).getColumnDesc(); + searchCondsWithBucket.set(0, searchConditionForBucketValue(bucketCol, i, keyColumnDesc)); + scanRanges.add(predicateDecomposer.getScanRange(searchCondsWithBucket)); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return new HBaseDecomposedPredicate(analyzer.translateSearchConditions(searchConds), scanRanges, (ExprNodeGenericFuncDesc) residual); + } + + /** + * Create an artificial search condition on the bucket column. + * @param bucketCol + * @param value + * @param keyColumnDesc + * @return + */ + private IndexSearchCondition searchConditionForBucketValue(String bucketCol, int value, ExprNodeColumnDesc keyColumnDesc) { + return new IndexSearchCondition( + new ExprNodeColumnDesc( + keyMapping.getColumnType(), + keyColumnDesc.getColumn(), + keyColumnDesc.getTabAlias(), + keyColumnDesc.getIsPartitionColOrVirtualCol() + ), + GenericUDFOPEqual.class.getCanonicalName(), + + new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, value), + null, + new String[]{bucketCol} + ); + } + + /** + * Create a new list formed by inserting ele at the front of lst. + * @param ele + * @param lst + * @param + * @return + */ + private List cons(T ele, List lst) { + List rtn = Lists.newArrayListWithCapacity(lst.size() + 1); + rtn.add(ele); + rtn.addAll(lst); + return rtn; + } + + private IndexPredicateAnalyzer getIndexPredicateAnalyzer() { + IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(true); + + for (Class compareOp : ALLOWED_COMPARISONS) { + analyzer.addComparisonOp(compareOp.getCanonicalName()); + } + analyzer.allowColumnName(keyMapping.getColumnName()); + analyzer.setAcceptsFields(true); + return analyzer; + } +} diff --git hbase-handler/src/test/queries/positive/hbase_multiscan_pushdown.q hbase-handler/src/test/queries/positive/hbase_multiscan_pushdown.q new file mode 100644 index 0000000..47fb185 --- /dev/null +++ hbase-handler/src/test/queries/positive/hbase_multiscan_pushdown.q @@ -0,0 +1,34 @@ +CREATE TABLE hbase_multiscan( + key struct, + value string) + ROW FORMAT SERDE + 'org.apache.hadoop.hive.hbase.HBaseSerDe' + STORED BY + 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ( + 'serialization.format'='1', + 'hbase.columns.mapping'=':key,cf:string', + 'hbase.composite.key.factory'='org.apache.hadoop.hive.hbase.TestMultiScanHBaseKeyFactory' + ) + TBLPROPERTIES ( + 'hbase.table.name'='mscan2', 'hive.hbase.multiscan.num_buckets'='3'); + +FROM src +INSERT INTO TABLE hbase_multiscan SELECT +named_struct( + "bucket", cast(hash(cast(key as int)) % 3 as string), + "other_val", cast(key as string)), +cast(value as string); + + +-- {"bucket":"1","other_val":"10"} val_10 +-- {"bucket":"1","other_val":"100"} val_100 +-- {"bucket":"1","other_val":"103"} val_103 +-- {"bucket":"1","other_val":"118"} val_118 +SELECT * FROM hbase_multiscan WHERE key.bucket = 1 AND key.other_val >= "0" AND key.other_val < "12" ORDER BY value; + +-- {"bucket":"2","other_val":"128"} val_128 +SELECT * FROM hbase_multiscan WHERE key.other_val = "128"; + +DROP TABLE hbase_multiscan; diff --git hbase-handler/src/test/results/positive/hbase_multiscan_pushdown.q.out hbase-handler/src/test/results/positive/hbase_multiscan_pushdown.q.out new file mode 100644 index 0000000..7ba7cc9 --- /dev/null +++ hbase-handler/src/test/results/positive/hbase_multiscan_pushdown.q.out @@ -0,0 +1,93 @@ +PREHOOK: query: CREATE TABLE hbase_multiscan( + key struct, + value string) + ROW FORMAT SERDE + 'org.apache.hadoop.hive.hbase.HBaseSerDe' + STORED BY + 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ( + 'serialization.format'='1', + 'hbase.columns.mapping'=':key,cf:string', + 'hbase.composite.key.factory'='org.apache.hadoop.hive.hbase.TestMultiScanHBaseKeyFactory' + ) + TBLPROPERTIES ( + 'hbase.table.name'='mscan2', 'hive.hbase.multiscan.num_buckets'='3') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@hbase_multiscan +POSTHOOK: query: CREATE TABLE hbase_multiscan( + key struct, + value string) + ROW FORMAT SERDE + 'org.apache.hadoop.hive.hbase.HBaseSerDe' + STORED BY + 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ( + 'serialization.format'='1', + 'hbase.columns.mapping'=':key,cf:string', + 'hbase.composite.key.factory'='org.apache.hadoop.hive.hbase.TestMultiScanHBaseKeyFactory' + ) + TBLPROPERTIES ( + 'hbase.table.name'='mscan2', 'hive.hbase.multiscan.num_buckets'='3') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_multiscan +PREHOOK: query: FROM src +INSERT INTO TABLE hbase_multiscan SELECT +named_struct( + "bucket", cast(hash(cast(key as int)) % 3 as string), + "other_val", cast(key as string)), +cast(value as string) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_multiscan +POSTHOOK: query: FROM src +INSERT INTO TABLE hbase_multiscan SELECT +named_struct( + "bucket", cast(hash(cast(key as int)) % 3 as string), + "other_val", cast(key as string)), +cast(value as string) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_multiscan +PREHOOK: query: -- {"bucket":"1","other_val":"10"} val_10 +-- {"bucket":"1","other_val":"100"} val_100 +-- {"bucket":"1","other_val":"103"} val_103 +-- {"bucket":"1","other_val":"118"} val_118 +SELECT * FROM hbase_multiscan WHERE key.bucket = 1 AND key.other_val >= "0" AND key.other_val < "12" ORDER BY value +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +POSTHOOK: query: -- {"bucket":"1","other_val":"10"} val_10 +-- {"bucket":"1","other_val":"100"} val_100 +-- {"bucket":"1","other_val":"103"} val_103 +-- {"bucket":"1","other_val":"118"} val_118 +SELECT * FROM hbase_multiscan WHERE key.bucket = 1 AND key.other_val >= "0" AND key.other_val < "12" ORDER BY value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +{"bucket":"1","other_val":"10"} val_10 +{"bucket":"1","other_val":"100"} val_100 +{"bucket":"1","other_val":"103"} val_103 +{"bucket":"1","other_val":"118"} val_118 +PREHOOK: query: -- {"bucket":"2","other_val":"128"} val_128 +SELECT * FROM hbase_multiscan WHERE key.other_val = "128" +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +POSTHOOK: query: -- {"bucket":"2","other_val":"128"} val_128 +SELECT * FROM hbase_multiscan WHERE key.other_val = "128" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +{"bucket":"2","other_val":"128"} val_128 +PREHOOK: query: DROP TABLE hbase_multiscan +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@hbase_multiscan +PREHOOK: Output: default@hbase_multiscan +POSTHOOK: query: DROP TABLE hbase_multiscan +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@hbase_multiscan +POSTHOOK: Output: default@hbase_multiscan diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index e7434a3..2b2da14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -340,11 +340,7 @@ private DecomposedPredicate decomposePredicate(ExprNodeDesc predicate, Index ind useSorted = false; } - DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); - decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions); - decomposedPredicate.residualPredicate = residualPredicate; - - return decomposedPredicate; + return new DecomposedPredicate(analyzer.translateSearchConditions(searchConditions), null, residualPredicate); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java index 7d7c764..b021658 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java @@ -64,6 +64,15 @@ public DecomposedPredicate decomposePredicate( * Struct class for returning multiple values from decomposePredicate. */ public static class DecomposedPredicate { + + public DecomposedPredicate() {} + + public DecomposedPredicate(ExprNodeGenericFuncDesc pushedPredicate, Serializable pushedPredicateObject, ExprNodeGenericFuncDesc residualPredicate) { + this.pushedPredicate = pushedPredicate; + this.pushedPredicateObject = pushedPredicateObject; + this.residualPredicate = residualPredicate; + } + /** * Portion of predicate to be evaluated by storage handler. Hive * will pass this into the storage handler's input format. diff --git serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java index e403ad9..9b06ef5 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java @@ -115,14 +115,13 @@ public static void appendReadColumns(StringBuilder readColumnsBuffer, List 0) { readColumnsBuffer.append(StringUtils.COMMA_STR).append(newConfStr); - } - if (readColumnsBuffer.length() == 0) { - readColumnsBuffer.append(READ_COLUMN_IDS_CONF_STR_DEFAULT); + } else if (readColumnsBuffer.length() == 0) { + readColumnsBuffer.append(newConfStr); } } - private static void appendReadColumnNames(StringBuilder readColumnNamesBuffer, List cols) { - boolean first = readColumnNamesBuffer.length() > 0; + public static void appendReadColumnNames(StringBuilder readColumnNamesBuffer, List cols) { + boolean first = readColumnNamesBuffer.length() == 0; for(String col: cols) { if (first) { first = false; @@ -159,6 +158,8 @@ private static void setReadColumnIDConf(Configuration conf, String id) { } } + // TODO: all of these append methods are joining strings separated by a comma; use + // a common string join method? private static void appendReadColumnNames(Configuration conf, List cols) { String old = conf.get(READ_COLUMN_NAMES_CONF_STR, ""); StringBuilder result = new StringBuilder(old); diff --git serde/src/test/org/apache/hadoop/hive/serde2/TestColumnProjectionUtils.java serde/src/test/org/apache/hadoop/hive/serde2/TestColumnProjectionUtils.java index 2b81b54..08af6a0 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/TestColumnProjectionUtils.java +++ serde/src/test/org/apache/hadoop/hive/serde2/TestColumnProjectionUtils.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.junit.Before; import org.junit.Test; @@ -114,4 +115,38 @@ public void testDeprecatedMethods() { ColumnProjectionUtils.setFullyReadColumns(conf); assertTrue(ColumnProjectionUtils.isReadAllColumns(conf)); } + + @Test + public void testAppendReadColumnsWithEmptyBuffer() throws Exception { + StringBuilder readColumnsBuffer = new StringBuilder(); + ColumnProjectionUtils.appendReadColumns(readColumnsBuffer, Lists.newArrayList(1, 2)); + + assertEquals("1,2", readColumnsBuffer.toString()); + } + + @Test + public void testAppendReadColumnsWithNonEmptyBuffer() throws Exception { + StringBuilder readColumnsBuffer = new StringBuilder("0"); + ColumnProjectionUtils.appendReadColumns(readColumnsBuffer, Lists.newArrayList(1, 2)); + + assertEquals("0,1,2", readColumnsBuffer.toString()); + } + + @Test + public void testAppendReadColumnNamesWithEmptyBuffer() throws Exception { + StringBuilder buffer = new StringBuilder(); + ColumnProjectionUtils.appendReadColumnNames(buffer, Lists.newArrayList("col1", "col2")); + + assertEquals("col1,col2", buffer.toString()); + } + + @Test + public void testAppendReadColumnNamesWithNonEmptyBuffer() throws Exception { + StringBuilder buffer = new StringBuilder("col0"); + ColumnProjectionUtils.appendReadColumnNames(buffer, Lists.newArrayList("col1", "col2")); + + assertEquals("col0,col1,col2", buffer.toString()); + } + + }