diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java index 18fb5eaebd87f2f194152241b3444adc3e24c5c3..d8a99617b5dedcdc472a863009a15956888ad31c 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyFactory.java @@ -47,7 +47,7 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) throws IOExce } @Override - public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { + public HBaseDecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { return HBaseStorageHandler.decomposePredicate(jobConf, (HBaseSerDe) deserializer, predicate); } } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java index 0cc21fa1812fbb08b49c7ec2efcb2ec98b1a0113..a46b6d9ea5c1b424da4b840721ca96b64cf19d92 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/AbstractHBaseKeyPredicateDecomposer.java @@ -20,6 +20,8 @@ import java.util.ArrayList; import java.util.List; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; @@ -30,43 +32,45 @@ /** * Simple abstract class to help with creation of a {@link DecomposedPredicate}. In order to create - * one, consumers should extend this class and override the "getScanRange" method to define the + * one, consumers should extend this class and override the "getScanRanges" method to define the * start/stop keys and/or filters on their hbase scans * */ public abstract class AbstractHBaseKeyPredicateDecomposer { public static final Log LOG = LogFactory.getLog(AbstractHBaseKeyPredicateDecomposer.class); - public DecomposedPredicate decomposePredicate(String keyColName, ExprNodeDesc predicate) { + public HBaseDecomposedPredicate decomposePredicate(String keyColName, ExprNodeDesc predicate) { IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(true); analyzer.allowColumnName(keyColName); analyzer.setAcceptsFields(true); analyzer.setFieldValidator(getFieldValidator()); - DecomposedPredicate decomposed = new DecomposedPredicate(); - List conditions = new ArrayList(); - decomposed.residualPredicate = + ExprNodeGenericFuncDesc residualPredicate = (ExprNodeGenericFuncDesc) analyzer.analyzePredicate(predicate, conditions); + ExprNodeGenericFuncDesc pushedPredicate = null; + List pushedPredicateObject = null; + + if (!conditions.isEmpty()) { - decomposed.pushedPredicate = analyzer.translateSearchConditions(conditions); + pushedPredicate = analyzer.translateSearchConditions(conditions); try { - decomposed.pushedPredicateObject = getScanRange(conditions); + pushedPredicateObject = getScanRanges(conditions); } catch (Exception e) { LOG.warn("Failed to decompose predicates", e); return null; } } - - return decomposed; + return new HBaseDecomposedPredicate(pushedPredicate, pushedPredicateObject, residualPredicate); } /** - * Get the scan range that specifies the start/stop keys and/or filters to be applied onto the - * hbase scan - * */ - protected abstract HBaseScanRange getScanRange(List searchConditions) - throws Exception; + * Get a list of scan ranges specifying start/stop keys and/or filters for one or more HBase scans. + * + * @param searchConditions + * @return + */ + protected abstract List getScanRanges(List searchConditions) throws Exception; /** * Get an optional {@link IndexPredicateAnalyzer.FieldValidator validator}. A validator can be diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java index 480b31fc4e6cc5b40fd53471db219fd6ea91bf0a..59d2462d0cbd2e5fb48a542a92fa661993c68bf2 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/CompositeHBaseKeyFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.hbase; +import com.google.common.collect.Lists; import java.io.IOException; import java.lang.reflect.Constructor; import java.util.Properties; diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseDecomposedPredicate.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseDecomposedPredicate.java new file mode 100644 index 0000000000000000000000000000000000000000..b74410f4451328b35a56f75c3437f14098968b5a --- /dev/null +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseDecomposedPredicate.java @@ -0,0 +1,42 @@ +package org.apache.hadoop.hive.hbase; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * A DecomposedPredicate for the HBase handler. The primary customization is that the pushedPredicateObject must be + * a List. You *could* hack around this, since the fields are public, but {@link org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat} + * won't understand any other types. + */ +public class HBaseDecomposedPredicate extends HiveStoragePredicateHandler.DecomposedPredicate { + + public HBaseDecomposedPredicate(ExprNodeGenericFuncDesc pushedPredicate, List pushedPredicateObject, ExprNodeGenericFuncDesc residualPredicate) { + super(pushedPredicate, getListAsSerializable(pushedPredicateObject), residualPredicate); + } + + /** + * Ensure that pushedPredicateObject is serializable by checking its runtime type. If it's not an instance of Serializable, + * copy it to an ArrayList (which is Serializable) + * @param pushedPredicateObject + * @return + */ + private static Serializable getListAsSerializable(List pushedPredicateObject) { + + if (pushedPredicateObject == null) { + return null; + } + + Serializable serializablePushedPred; + if (! (pushedPredicateObject instanceof Serializable)) { + serializablePushedPred = Lists.newArrayList(pushedPredicateObject); + } else { + serializablePushedPred = (Serializable) pushedPredicateObject; + } + return serializablePushedPred; + } +} diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java index 4817b43140c9caf06909cd529b6c5a08270ec0ee..800dfdb660f9db647cf467c28116544e597e853c 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseKeyFactory.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.hbase; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -39,6 +41,29 @@ public interface HBaseKeyFactory extends HiveStoragePredicateHandler { /** + * Gives the storage handler a chance to decompose a predicate. The storage + * handler should analyze the predicate and return the portion of it which + * cannot be evaluated during table access. For example, if the original + * predicate is x = 2 AND upper(y)='YUM', the storage handler + * might be able to handle x = 2 but leave the "residual" + * upper(y)='YUM' for Hive to deal with. The breakdown + * need not be non-overlapping; for example, given the + * predicate x LIKE 'a%b', the storage handler might + * be able to evaluate the prefix search x LIKE 'a%', leaving + * x LIKE '%b' as the residual. + * + * @param jobConf contains a job configuration matching the one that + * will later be passed to getRecordReader and getSplits + * @param deserializer deserializer which will be used when + * fetching rows + * @param predicate predicate to be decomposed + * @return decomposed form of predicate, or null if no pushdown is + * possible at all + */ + @Override + HBaseDecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate); + + /** * initialize factory with properties */ void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException; diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseShimRecordReader.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseShimRecordReader.java new file mode 100644 index 0000000000000000000000000000000000000000..8db18e65b56da3fc396bbc24c1f1710ca17e727b --- /dev/null +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseShimRecordReader.java @@ -0,0 +1,71 @@ +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.RecordReader; + +import java.io.IOException; + +/** + * mapred.RecordReader wrapper around mapreduce.RecordReader + */ +public class HBaseShimRecordReader implements RecordReader { + + private final org.apache.hadoop.mapreduce.RecordReader recordReader; + + public HBaseShimRecordReader(org.apache.hadoop.mapreduce.RecordReader recordReader) { + this.recordReader = recordReader; + } + + @Override + public void close() throws IOException { + recordReader.close(); + } + + @Override + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + @Override + public ResultWritable createValue() { + return new ResultWritable(new Result()); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public float getProgress() throws IOException { + float progress = 0.0F; + + try { + progress = recordReader.getProgress(); + } catch (InterruptedException e) { + throw new IOException(e); + } + + return progress; + } + + @Override + public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException { + + boolean next = false; + + try { + next = recordReader.nextKeyValue(); + + if (next) { + rowKey.set(recordReader.getCurrentValue().getRow()); + value.setResult(recordReader.getCurrentValue()); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + + return next; + } +} diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index 53d913435cd43c96f044cf8668461fc686817ef4..b0e32d84b5bb45b416ebdffcd9c9fbf980d8ac76 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.Bytes; @@ -519,15 +520,19 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { // Get credentials using the configuration instance which has HBase properties JobConf hbaseJobConf = new JobConf(getConf()); - org.apache.hadoop.hbase.mapred.TableMapReduceUtil.initCredentials(hbaseJobConf); - ShimLoader.getHadoopShims().mergeCredentials(jobConf, hbaseJobConf); + + UserProvider userProvider = UserProvider.instantiate(getConf()); + if (userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled()) { + org.apache.hadoop.hbase.mapred.TableMapReduceUtil.initCredentials(hbaseJobConf); + ShimLoader.getHadoopShims().mergeCredentials(jobConf, hbaseJobConf); + } } catch (Exception e) { throw new RuntimeException(e); } } @Override - public DecomposedPredicate decomposePredicate( + public HBaseDecomposedPredicate decomposePredicate( JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) @@ -536,7 +541,7 @@ public DecomposedPredicate decomposePredicate( return keyFactory.decomposePredicate(jobConf, deserializer, predicate); } - public static DecomposedPredicate decomposePredicate( + public static HBaseDecomposedPredicate decomposePredicate( JobConf jobConf, HBaseSerDe hBaseSerDe, ExprNodeDesc predicate) { @@ -571,9 +576,6 @@ public static DecomposedPredicate decomposePredicate( } } - DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); - decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(conditions); - decomposedPredicate.residualPredicate = residualPredicate; - return decomposedPredicate; + return new HBaseDecomposedPredicate(analyzer.translateSearchConditions(conditions), null, residualPredicate); } } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java index 05245728ba7ad8579554d44ff6abad61db89ed16..b720f79b82046a0c9fdcd5e46829a6e511dc7c8f 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseInputFormatUtil.java @@ -51,6 +51,29 @@ public static HTable getTable(JobConf jobConf) throws IOException { * Parse {@code jobConf} to create a {@link Scan} instance. */ public static Scan getScan(JobConf jobConf) throws IOException { + Scan scan = new Scan(); + configureScan(jobConf, scan); + return scan; + } + + public static void configureScan(JobConf jobConf, Scan scan) throws IOException { + pushScanColumns(jobConf, scan); + + String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); + if (scanCache != null) { + scan.setCaching(Integer.valueOf(scanCache)); + } + String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); + if (scanCacheBlocks != null) { + scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); + } + String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); + if (scanBatch != null) { + scan.setBatch(Integer.valueOf(scanBatch)); + } + } + + public static void pushScanColumns(JobConf jobConf, Scan scan) throws IOException { String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); @@ -67,7 +90,11 @@ public static Scan getScan(JobConf jobConf) throws IOException { } boolean readAllColumns = ColumnProjectionUtils.isReadAllColumns(jobConf); - Scan scan = new Scan(); + + pushScanColumns(scan, columnMappings, readAllColumns, readColIDs); + } + + public static void pushScanColumns(Scan scan, ColumnMappings columnMappings, boolean readAllColumns, List readColIDs) { boolean empty = true; // The list of families that have been added to the scan @@ -117,20 +144,6 @@ public static Scan getScan(JobConf jobConf) throws IOException { } } } - - String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); - if (scanCache != null) { - scan.setCaching(Integer.valueOf(scanCache)); - } - String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); - if (scanCacheBlocks != null) { - scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); - } - String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); - if (scanBatch != null) { - scan.setBatch(Integer.valueOf(scanBatch)); - } - return scan; } public static boolean getStorageFormatOfKey(String spec, String defaultFormat) throws IOException{ diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index 8e72759acd939e8650e4763e55240f49de578770..b00e7e2c1a425a0fca6ee6d1801ad2e54f5e6346 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -20,22 +20,18 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Arrays; import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; -import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; @@ -56,9 +52,10 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; @@ -67,12 +64,6 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.security.UserGroupInformation; /** @@ -80,90 +71,122 @@ * tables, decorating an underlying HBase TableInputFormat with extra Hive logic * such as column pruning and filter pushdown. */ -public class HiveHBaseTableInputFormat extends TableInputFormatBase +public class HiveHBaseTableInputFormat extends HiveMultiTableInputFormatBase implements InputFormat { static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class); private static final Object hbaseTableMonitor = new Object(); - @Override - public RecordReader getRecordReader( - InputSplit split, - JobConf jobConf, - final Reporter reporter) throws IOException { + private void configureScan( + Scan scan, + JobConf jobConf) throws IOException { - HBaseSplit hbaseSplit = (HBaseSplit) split; - TableSplit tableSplit = hbaseSplit.getTableSplit(); + String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - setHTable(HiveHBaseInputFormatUtil.getTable(jobConf)); - setScan(HiveHBaseInputFormatUtil.getScan(jobConf)); + if (hbaseTableName == null) { + throw new IOException("HBase table must be specified in the JobConf"); + } else { + scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(hbaseTableName)); + } - Job job = new Job(jobConf); - TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext( - job.getConfiguration(), reporter); + String scanCache = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHE); + if (scanCache != null) { + scan.setCaching(Integer.valueOf(scanCache)); + } + String scanCacheBlocks = jobConf.get(HBaseSerDe.HBASE_SCAN_CACHEBLOCKS); + if (scanCacheBlocks != null) { + scan.setCacheBlocks(Boolean.valueOf(scanCacheBlocks)); + } + String scanBatch = jobConf.get(HBaseSerDe.HBASE_SCAN_BATCH); + if (scanBatch != null) { + scan.setBatch(Integer.valueOf(scanBatch)); + } + } - final org.apache.hadoop.mapreduce.RecordReader - recordReader = createRecordReader(tableSplit, tac); - try { - recordReader.initialize(tableSplit, tac); - } catch (InterruptedException e) { - throw new IOException("Failed to initialize RecordReader", e); + /** + * Converts a filter (which has been pushed down from Hive's optimizer) + * into corresponding restrictions on the HBase scan. The + * filter should already be in a form which can be fully converted. + * + * @return converted scan + */ + private List createFilterScans(JobConf jobConf) throws IOException { + String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); + String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + + // If nothing was pushed, exit early--some other things might not be present as well. + if (filterObjectSerialized == null && filterExprSerialized == null) { + return ImmutableList.of(new Scan()); } - return new RecordReader() { + String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); + String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); - @Override - public void close() throws IOException { - recordReader.close(); - } + if (hbaseColumnsMapping == null) { + throw new IOException("hbase.columns.mapping required for HBase Table."); + } - @Override - public ImmutableBytesWritable createKey() { - return new ImmutableBytesWritable(); - } - @Override - public ResultWritable createValue() { - return new ResultWritable(new Result()); - } + String listColumnsString = jobConf.get(serdeConstants.LIST_COLUMNS); + String listColumnTypeString = jobConf.get(serdeConstants.LIST_COLUMN_TYPES); - @Override - public long getPos() throws IOException { - return 0; - } + List columns = Arrays.asList(listColumnsString.split(",")); + List columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(listColumnTypeString); - @Override - public float getProgress() throws IOException { - float progress = 0.0F; + String defaultStorageType = jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"); - try { - progress = recordReader.getProgress(); - } catch (InterruptedException e) { - throw new IOException(e); - } + ColumnMappings columnMappings; + try { + columnMappings = getColumnMappings(hbaseColumnsMapping, doColumnRegexMatching, columns, columnTypes, defaultStorageType); + } catch (SerDeException e) { + throw new IOException(e); + } - return progress; - } + int iKey = columnMappings.getKeyIndex(); + ColumnMapping keyMapping = columnMappings.getKeyMapping(); - @Override - public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException { + int iTimestamp = columnMappings.getTimestampIndex(); + + List ranges; + if (filterObjectSerialized != null) { + ranges = (List) Utilities.deserializeObject(filterObjectSerialized, + ArrayList.class); + } else { + ranges = null; + } - boolean next = false; + ExprNodeGenericFuncDesc filterExpr = filterExprSerialized != null ? + Utilities.deserializeExpression(filterExprSerialized) : null; - try { - next = recordReader.nextKeyValue(); + String colName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; + String keyColType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey]; + boolean isKeyBinary = HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec, + jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")); - if (next) { - rowKey.set(recordReader.getCurrentValue().getRow()); - value.setResult(recordReader.getCurrentValue()); - } - } catch (InterruptedException e) { - throw new IOException(e); - } + String tsColName = null; + if (iTimestamp >= 0) { + tsColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp]; + } + return createFilterScans(ranges, filterExpr, colName, keyColType, isKeyBinary, tsColName, jobConf); + } - return next; - } - }; + private ColumnMappings getColumnMappings(String hbaseColumnMappings, + boolean doColumnRegexMatching, + List columnsList, + List columnTypeList, + String defaultStorageType + ) throws SerDeException { + ColumnMappings columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnMappings, doColumnRegexMatching); + + // Only non virtual columns are mapped, and all of the virtual columns ought to be at the end. Therefore, + // take only as many columns as there are in the mapping. + List nonVirtualColumns = columnsList.subList(0, columnMappings.size()); + List nonVirtualColumnTypes = columnTypeList.subList(0, columnMappings.size()); + columnMappings.setHiveColumnDescription(HBaseSerDe.class.getName(), nonVirtualColumns, nonVirtualColumnTypes); + columnMappings.parseColumnStorageTypes(defaultStorageType); + + return columnMappings; } /** @@ -171,47 +194,47 @@ public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws * into corresponding restrictions on the HBase scan. The * filter should already be in a form which can be fully converted. * - * @param jobConf configuration for the scan - * - * @param iKey 0-based offset of key column within Hive table - * - * @return converted table split if any + * @return converted scan */ - private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean isKeyBinary) - throws IOException { + private List createFilterScans(List ranges, ExprNodeGenericFuncDesc filterExpr, String keyColName, String keyColType, boolean isKeyBinary, String tsColName, JobConf jobConf) throws IOException { // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL - Scan scan = new Scan(); - String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); - if (filterObjectSerialized != null) { - HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized, - HBaseScanRange.class); - try { - range.setup(scan, jobConf); - } catch (Exception e) { - throw new IOException(e); - } - return scan; - } - - String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - if (filterExprSerialized == null) { - return scan; - } - - ExprNodeGenericFuncDesc filterExpr = - Utilities.deserializeExpression(filterExprSerialized); + if (ranges != null) { - String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; - String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey]; - boolean isKeyComparable = isKeyBinary || colType.equalsIgnoreCase("string"); + List rtn = Lists.newArrayListWithCapacity(ranges.size()); - String tsColName = null; - if (iTimestamp >= 0) { - tsColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp]; + for (HBaseScanRange range : ranges) { + Scan scan = new Scan(); + try { + range.setup(scan, jobConf); + } catch (Exception e) { + throw new IOException(e); + } + rtn.add(scan); + } + return rtn; + } else if (filterExpr == null) { + return ImmutableList.of(new Scan()); + } else { + return ImmutableList.of(createScanFromFilterExpr(filterExpr, keyColName, keyColType, isKeyBinary, tsColName)); } + } + + /** + * Create a scan with the filters represented by filterExpr. TODO: this code path may not be hittable anymore; excise + * it if so. + * @param filterExpr + * @param keyColName + * @param keyColType + * @param isKeyBinary + * @return + * @throws IOException + */ + private Scan createScanFromFilterExpr(ExprNodeGenericFuncDesc filterExpr, String keyColName, String keyColType, boolean isKeyBinary, String tsColName) throws IOException { + Scan scan = new Scan(); + boolean isKeyComparable = isKeyBinary || keyColType.equals("string"); IndexPredicateAnalyzer analyzer = newIndexPredicateAnalyzer(keyColName, isKeyComparable, tsColName); @@ -336,7 +359,7 @@ private long getTimestampVal(IndexSearchCondition sc) throws IOException { return timestamp; } - private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, + private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, boolean isKeyBinary) throws IOException{ if (!isKeyBinary){ @@ -434,76 +457,27 @@ static IndexPredicateAnalyzer newIndexPredicateAnalyzer( } private InputSplit[] getSplitsInternal(JobConf jobConf, int numSplits) throws IOException { - //obtain delegation tokens for the job if (UserGroupInformation.getCurrentUser().hasKerberosCredentials()) { TableMapReduceUtil.initCredentials(jobConf); } - String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); - setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); - String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); - boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); - - if (hbaseColumnsMapping == null) { - throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table."); - } - - ColumnMappings columnMappings = null; - try { - columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); - } catch (SerDeException e) { - throw new IOException(e); - } - - int iKey = columnMappings.getKeyIndex(); - int iTimestamp = columnMappings.getTimestampIndex(); - ColumnMapping keyMapping = columnMappings.getKeyMapping(); - // Take filter pushdown into account while calculating splits; this // allows us to prune off regions immediately. Note that although // the Javadoc for the superclass getSplits says that it returns one // split per region, the implementation actually takes the scan // definition into account and excludes regions which don't satisfy // the start/stop row conditions (HBASE-1829). - Scan scan = createFilterScan(jobConf, iKey, iTimestamp, - HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec, - jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); - - // The list of families that have been added to the scan - List addedFamilies = new ArrayList(); - + List scans = createFilterScans(jobConf); // REVIEW: are we supposed to be applying the getReadColumnIDs // same as in getRecordReader? - for (ColumnMapping colMap : columnMappings) { - if (colMap.hbaseRowKey || colMap.hbaseTimestamp) { - continue; - } - - if (colMap.qualifierName == null) { - scan.addFamily(colMap.familyNameBytes); - addedFamilies.add(colMap.familyName); - } else { - if(!addedFamilies.contains(colMap.familyName)){ - // add the column only if the family has not already been added - scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes); - } - } - } - setScan(scan); - - Job job = new Job(jobConf); - JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); - Path [] tablePaths = FileInputFormat.getInputPaths(jobContext); - - List splits = - super.getSplits(jobContext); - InputSplit [] results = new InputSplit[splits.size()]; - for (int i = 0; i < splits.size(); i++) { - results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]); + for (Scan scan : scans) { + HiveHBaseInputFormatUtil.pushScanColumns(jobConf, scan); + configureScan(scan, jobConf); } - return results; + setScans(scans); + return super.getSplits(jobConf, numSplits); } } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveMultiTableInputFormatBase.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveMultiTableInputFormatBase.java new file mode 100644 index 0000000000000000000000000000000000000000..163209b3cae3e6f73d97bd575baa2e6bc97f43d6 --- /dev/null +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveMultiTableInputFormatBase.java @@ -0,0 +1,62 @@ +package org.apache.hadoop.hive.hbase; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase; +import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +/** + * mapred port of {@link org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase} + */ +public class HiveMultiTableInputFormatBase extends MultiTableInputFormatBase implements InputFormat { + + @Override + public InputSplit[] getSplits(JobConf jobConf, int splitNum) throws IOException { + //obtain delegation tokens for the job + TableMapReduceUtil.initCredentials(jobConf); + Job job = new Job(jobConf); + org.apache.hadoop.mapreduce.JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); + + + List splits = super.getSplits(jobContext); + + //Convert mapreduce splits to mapred splits + InputSplit[] rtn = new InputSplit[splits.size()]; + + for (int i = 0; i < splits.size(); i++) { + rtn[i] = new HBaseSplit((TableSplit) splits.get(i), FileInputFormat.getInputPaths(jobConf)[0]); + } + + return rtn; + } + + @Override + public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { + HBaseSplit hbaseSplit = (HBaseSplit) inputSplit; + TableSplit tableSplit = hbaseSplit.getTableSplit(); + Job job = new Job(jobConf); + HadoopShims hadoopShims = ShimLoader.getHadoopShims(); + TaskAttemptContext tac = hadoopShims.newTaskAttemptContext( + job.getConfiguration(), reporter); + + final org.apache.hadoop.mapreduce.RecordReader recordReader; + try { + recordReader = createRecordReader(tableSplit, tac); + recordReader.initialize(tableSplit, tac); + } catch (InterruptedException e) { + throw new IOException("Failed to initialized RecordReader", e); + } + + return new HBaseShimRecordReader(recordReader); + } + +} diff --git a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory2.java b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory2.java index 24fa203cca71b1d01ce454380de079d24a58a94f..04831bd977ecf1ec1957ea9ffc8bb811fc96812c 100644 --- a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory2.java +++ b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory2.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.hbase; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -44,6 +45,9 @@ import java.util.List; import java.util.Map; +/** + * HBaseKeyFactory for fixed length keys. + */ public class SampleHBaseKeyFactory2 extends AbstractHBaseKeyFactory { private static final int FIXED_LENGTH = 10; @@ -90,7 +94,7 @@ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException } @Override - public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + public HBaseDecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { String keyColName = keyMapping.columnName; @@ -98,35 +102,26 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese analyzer.allowColumnName(keyColName); analyzer.setAcceptsFields(true); - DecomposedPredicate decomposed = new DecomposedPredicate(); + ExprNodeGenericFuncDesc pushedPredicate = null; + List pushedPredicateObject = null; List searchConditions = new ArrayList(); - decomposed.residualPredicate = - (ExprNodeGenericFuncDesc)analyzer.analyzePredicate(predicate, searchConditions); + + ExprNodeGenericFuncDesc residualPredicate = (ExprNodeGenericFuncDesc) analyzer.analyzePredicate(predicate, searchConditions); if (!searchConditions.isEmpty()) { - decomposed.pushedPredicate = analyzer.translateSearchConditions(searchConditions); + pushedPredicate = analyzer.translateSearchConditions(searchConditions); try { - decomposed.pushedPredicateObject = setupFilter(keyColName, searchConditions); + pushedPredicateObject = Lists.newArrayList(setupFilter(keyColName, searchConditions)); } catch (IOException e) { throw new RuntimeException(e); } } - return decomposed; + return new HBaseDecomposedPredicate(pushedPredicate, pushedPredicateObject, residualPredicate); } - private HBaseScanRange setupFilter(String keyColName, List conditions) + protected HBaseScanRange setupFilter(String keyColName, List conditions) throws IOException { - Map> fieldConds = - new HashMap>(); - for (IndexSearchCondition condition : conditions) { - assert keyColName.equals(condition.getColumnDesc().getColumn()); - String fieldName = condition.getFields()[0]; - List fieldCond = fieldConds.get(fieldName); - if (fieldCond == null) { - fieldConds.put(fieldName, fieldCond = new ArrayList()); - } - fieldCond.add(condition); - } + Map> fieldConds = groupCondsByFieldName(keyColName, conditions); HBaseScanRange range = new HBaseScanRange(); ByteArrayOutputStream startRow = new ByteArrayOutputStream(); @@ -195,12 +190,26 @@ private HBaseScanRange setupFilter(String keyColName, List return range; } + protected Map> groupCondsByFieldName(String keyColName, List conditions) { + Map> fieldConds = + new HashMap>(); + for (IndexSearchCondition condition : conditions) { + assert keyMapping.getColumnName().equals(keyColName); + String fieldName = condition.getFields()[0]; + List fieldCond = fieldConds.get(fieldName); + if (fieldCond == null) { + fieldConds.put(fieldName, fieldCond = new ArrayList()); + } + fieldCond.add(condition); + } + return fieldConds; + } + private static class FixedLengthed implements LazyObjectBase { private final int fixedLength; private final List fields = new ArrayList(); - - private transient boolean isNull; + private boolean isNull; public FixedLengthed(int fixedLength) { this.fixedLength = fixedLength; @@ -215,6 +224,7 @@ public void init(ByteArrayRef bytes, int start, int length) { for (; rowStart < length; rowStart = rowStop + 1, rowStop = rowStart + fixedLength) { fields.add(new String(data, rowStart, rowStop - rowStart).trim()); } + isNull = false; } @@ -252,4 +262,5 @@ public Object getStructFieldData(Object data, StructField fieldRef) { return ((FixedLengthed)data).fields; } } + } diff --git a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory3.java b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory3.java index 712725f208f4b2c9cc166750509c25859518ec3e..4eeb0a0604a5b2636c146fd91adfbedecf38a276 100644 --- a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory3.java +++ b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleHBaseKeyFactory3.java @@ -18,22 +18,17 @@ package org.apache.hadoop.hive.hbase; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.RowFilter; +import com.google.common.collect.ImmutableList; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.mapred.JobConf; /** @@ -43,7 +38,7 @@ public class SampleHBaseKeyFactory3 extends SampleHBaseKeyFactory2 { @Override - public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, + public HBaseDecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { SampleHBasePredicateDecomposer decomposedPredicate = new SampleHBasePredicateDecomposer(keyMapping); return decomposedPredicate.decomposePredicate(keyMapping.columnName, predicate); @@ -61,7 +56,7 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese } @Override - public HBaseScanRange getScanRange(List searchConditions) + public List getScanRanges(List searchConditions) throws Exception { Map> fieldConds = new HashMap>(); @@ -73,43 +68,12 @@ public HBaseScanRange getScanRange(List searchConditions) } fieldCond.add(condition); } - Filter filter = null; HBaseScanRange range = new HBaseScanRange(); - StructTypeInfo type = (StructTypeInfo) keyMapping.columnType; - for (String name : type.getAllStructFieldNames()) { - List fieldCond = fieldConds.get(name); - if (fieldCond == null || fieldCond.size() > 2) { - continue; - } - for (IndexSearchCondition condition : fieldCond) { - if (condition.getConstantDesc().getValue() == null) { - continue; - } - String comparisonOp = condition.getComparisonOp(); - String constantVal = String.valueOf(condition.getConstantDesc().getValue()); - - byte[] valueAsBytes = toBinary(constantVal, FIXED_LENGTH, false, false); + // xxx TODO: bring back addition of a filter here. The previous implementation was broken; it doesn't consider the possibility of conditions on multiple + // fields in the struct, and simply ends up using the last one. - if (comparisonOp.endsWith("UDFOPEqual")) { - filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(valueAsBytes)); - } else if (comparisonOp.endsWith("UDFOPEqualOrGreaterThan")) { - filter = new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(valueAsBytes)); - } else if (comparisonOp.endsWith("UDFOPGreaterThan")) { - filter = new RowFilter(CompareOp.GREATER, new BinaryComparator(valueAsBytes)); - } else if (comparisonOp.endsWith("UDFOPEqualOrLessThan")) { - filter = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(valueAsBytes)); - } else if (comparisonOp.endsWith("UDFOPLessThan")) { - filter = new RowFilter(CompareOp.LESS, new BinaryComparator(valueAsBytes)); - } else { - throw new IOException(comparisonOp + " is not a supported comparison operator"); - } - } - } - if (filter != null) { - range.addFilter(filter); - } - return range; + return ImmutableList.of(range); } private byte[] toBinary(String value, int max, boolean end, boolean nextBA) { diff --git a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleMultiScanHBaseKeyFactory.java b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleMultiScanHBaseKeyFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..a13a8b0cea03fcaf6d47d291b42e40b8e73e4028 --- /dev/null +++ b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/SampleMultiScanHBaseKeyFactory.java @@ -0,0 +1,170 @@ +package org.apache.hadoop.hive.hbase; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; +import org.apache.hadoop.hive.ql.index.IndexSearchCondition; +import org.apache.hadoop.hive.ql.plan.*; +import org.apache.hadoop.hive.ql.udf.generic.*; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Key factory supporting fixed length composite row keys, where the first field in the row key is a "bucket" value intended + * to distribute data. Predicate pushdown in this class works by pushing down predicates on the rest of the fields + * to all buckets if a predicate on the bucket isn't provided. + */ +public class SampleMultiScanHBaseKeyFactory extends SampleHBaseKeyFactory2 { + + public static final String HIVE_HBASE_MULTISCAN_NUM_BUCKETS = "hive.hbase.multiscan.num_buckets"; + private int numBuckets; + + public static final List> ALLOWED_COMPARISONS = ImmutableList.of( + GenericUDFOPGreaterThan.class, + GenericUDFOPEqualOrGreaterThan.class, + GenericUDFOPEqual.class, + GenericUDFOPLessThan.class, + GenericUDFOPEqualOrLessThan.class + ); + + @Override + public void init(HBaseSerDeParameters hbaseParam, Properties properties) throws SerDeException { + super.init(hbaseParam, properties); + + super.init(hbaseParam, properties); + String numBucketsProp = properties.getProperty(HIVE_HBASE_MULTISCAN_NUM_BUCKETS); + + try { + this.numBuckets = Integer.parseInt(numBucketsProp); + } catch (NullPointerException e) { + throw new SerDeException( + "Expected table property " + HIVE_HBASE_MULTISCAN_NUM_BUCKETS + " to be a valid integer; was: " + numBucketsProp, e); + } catch (NumberFormatException e) { + throw new SerDeException( + "Expected table property " + HIVE_HBASE_MULTISCAN_NUM_BUCKETS + " to be a valid integer; was: " + numBucketsProp, e); + } + } + + /** + * Decompose predicates on fields of the row key into one or more HBaseScanRanges. This implementation supports + * pushing down conditions into each bucket, creating one HBaseScanRange per bucket, if a condition on the bucket isn't + * otherwise provided. For instance, given the schema: + * + * (bucket int, age int) + * + * and the condition: + * + * WHERE age >= 20 AND age < 25 + * + * the method will return an HBaseScanRange for each bucket, with the age predicate pushed down: + * + * (1/20, 1/25)... (numBuckets/20, numBuckets/25) + * + * Given a condition on the bucket, it will return a single range: + * + * WHERE bucket = 1 AND age >= 20 AND age < 25 =\> + * (1/20, 1/25) + * + */ + @Override + public HBaseDecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { + IndexPredicateAnalyzer analyzer = getIndexPredicateAnalyzer(); + + List searchConds = Lists.newArrayList(); + + ExprNodeDesc residual = analyzer.analyzePredicate(predicate, searchConds); + + StructTypeInfo keyColumnType = (StructTypeInfo) keyMapping.columnType; + String bucketCol = keyColumnType.getAllStructFieldNames().get(0); + + ArrayList scanRanges; + + try { + // search for a condition already on the bucket + boolean conditionOnBucket = false; + for (IndexSearchCondition sc : searchConds) { + if (sc.getFields()[0].equals(bucketCol)) { + conditionOnBucket = true; + break; + } + } + + if (conditionOnBucket) { + scanRanges = Lists.newArrayList(super.setupFilter(keyMapping.columnName, searchConds)); + } else { + scanRanges = Lists.newArrayList(); + List searchCondsWithBucket = cons(null, searchConds); + // no condition on the bucket column; create an artificial one for each bucket value. + for (int i = 0; i < this.numBuckets; i++) { + ExprNodeColumnDesc keyColumnDesc = searchConds.get(0).getColumnDesc(); + searchCondsWithBucket.set(0, searchConditionForBucketValue(bucketCol, i, keyColumnDesc)); + scanRanges.add(super.setupFilter(keyMapping.columnName, searchCondsWithBucket)); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new HBaseDecomposedPredicate(analyzer.translateSearchConditions(searchConds), scanRanges, (ExprNodeGenericFuncDesc) residual); + } + + /** + * Create an artificial search condition on the bucket column. + * @param bucketCol + * @param value + * @param keyColumnDesc + * @return + */ + private IndexSearchCondition searchConditionForBucketValue(String bucketCol, int value, ExprNodeColumnDesc keyColumnDesc) { + return new IndexSearchCondition( + new ExprNodeColumnDesc( + keyMapping.getColumnType(), + keyColumnDesc.getColumn(), + keyColumnDesc.getTabAlias(), + keyColumnDesc.getIsPartitionColOrVirtualCol() + ), + GenericUDFOPEqual.class.getCanonicalName(), + + new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, value), + null, + new String[]{bucketCol} + ); + } + + /** + * Create a new list formed by inserting ele at the front of lst. + * @param ele + * @param lst + * @param + * @return + */ + private List cons(T ele, List lst) { + List rtn = Lists.newArrayListWithCapacity(lst.size() + 1); + rtn.add(ele); + rtn.addAll(lst); + return rtn; + } + + private IndexPredicateAnalyzer getIndexPredicateAnalyzer() { + IndexPredicateAnalyzer analyzer = IndexPredicateAnalyzer.createAnalyzer(true); + + for (Class compareOp : ALLOWED_COMPARISONS) { + analyzer.addComparisonOp(compareOp.getCanonicalName()); + } + analyzer.allowColumnName(keyMapping.getColumnName()); + analyzer.setAcceptsFields(true); + return analyzer; + } +} diff --git a/hbase-handler/src/test/queries/positive/hbase_multiscan_pushdown.q b/hbase-handler/src/test/queries/positive/hbase_multiscan_pushdown.q new file mode 100644 index 0000000000000000000000000000000000000000..ccecc5661878ac2e1d52213845bb555909b74683 --- /dev/null +++ b/hbase-handler/src/test/queries/positive/hbase_multiscan_pushdown.q @@ -0,0 +1,34 @@ +CREATE TABLE hbase_multiscan( + key struct, + value string) + ROW FORMAT SERDE + 'org.apache.hadoop.hive.hbase.HBaseSerDe' + STORED BY + 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ( + 'serialization.format'='1', + 'hbase.columns.mapping'=':key,cf:string', + 'hbase.composite.key.factory'='org.apache.hadoop.hive.hbase.SampleMultiScanHBaseKeyFactory' + ) + TBLPROPERTIES ( + 'hbase.table.name'='mscan2', 'hive.hbase.multiscan.num_buckets'='3'); + +FROM src +INSERT INTO TABLE hbase_multiscan SELECT +named_struct( + "bucket", cast(hash(cast(key as int)) % 3 as string), + "other_val", cast(key as string)), +cast(value as string); + + +-- {"bucket":"1","other_val":"10"} val_10 +-- {"bucket":"1","other_val":"100"} val_100 +-- {"bucket":"1","other_val":"103"} val_103 +-- {"bucket":"1","other_val":"118"} val_118 +SELECT * FROM hbase_multiscan WHERE key.bucket = 1 AND key.other_val >= "0" AND key.other_val < "12" ORDER BY value; + +-- {"bucket":"2","other_val":"128"} val_128 +SELECT * FROM hbase_multiscan WHERE key.other_val = "128"; + +DROP TABLE hbase_multiscan; diff --git a/hbase-handler/src/test/results/positive/hbase_multiscan_pushdown.q.out b/hbase-handler/src/test/results/positive/hbase_multiscan_pushdown.q.out new file mode 100644 index 0000000000000000000000000000000000000000..c5b1a14c993869d16cc422b11a44bf9a5012fe3f --- /dev/null +++ b/hbase-handler/src/test/results/positive/hbase_multiscan_pushdown.q.out @@ -0,0 +1,93 @@ +PREHOOK: query: CREATE TABLE hbase_multiscan( + key struct, + value string) + ROW FORMAT SERDE + 'org.apache.hadoop.hive.hbase.HBaseSerDe' + STORED BY + 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ( + 'serialization.format'='1', + 'hbase.columns.mapping'=':key,cf:string', + 'hbase.composite.key.factory'='org.apache.hadoop.hive.hbase.SampleMultiScanHBaseKeyFactory' + ) + TBLPROPERTIES ( + 'hbase.table.name'='mscan2', 'hive.hbase.multiscan.num_buckets'='3') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@hbase_multiscan +POSTHOOK: query: CREATE TABLE hbase_multiscan( + key struct, + value string) + ROW FORMAT SERDE + 'org.apache.hadoop.hive.hbase.HBaseSerDe' + STORED BY + 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' + WITH SERDEPROPERTIES ( + 'serialization.format'='1', + 'hbase.columns.mapping'=':key,cf:string', + 'hbase.composite.key.factory'='org.apache.hadoop.hive.hbase.SampleMultiScanHBaseKeyFactory' + ) + TBLPROPERTIES ( + 'hbase.table.name'='mscan2', 'hive.hbase.multiscan.num_buckets'='3') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hbase_multiscan +PREHOOK: query: FROM src +INSERT INTO TABLE hbase_multiscan SELECT +named_struct( + "bucket", cast(hash(cast(key as int)) % 3 as string), + "other_val", cast(key as string)), +cast(value as string) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@hbase_multiscan +POSTHOOK: query: FROM src +INSERT INTO TABLE hbase_multiscan SELECT +named_struct( + "bucket", cast(hash(cast(key as int)) % 3 as string), + "other_val", cast(key as string)), +cast(value as string) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@hbase_multiscan +PREHOOK: query: -- {"bucket":"1","other_val":"10"} val_10 +-- {"bucket":"1","other_val":"100"} val_100 +-- {"bucket":"1","other_val":"103"} val_103 +-- {"bucket":"1","other_val":"118"} val_118 +SELECT * FROM hbase_multiscan WHERE key.bucket = 1 AND key.other_val >= "0" AND key.other_val < "12" ORDER BY value +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +POSTHOOK: query: -- {"bucket":"1","other_val":"10"} val_10 +-- {"bucket":"1","other_val":"100"} val_100 +-- {"bucket":"1","other_val":"103"} val_103 +-- {"bucket":"1","other_val":"118"} val_118 +SELECT * FROM hbase_multiscan WHERE key.bucket = 1 AND key.other_val >= "0" AND key.other_val < "12" ORDER BY value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +{"bucket":"1","other_val":"10"} val_10 +{"bucket":"1","other_val":"100"} val_100 +{"bucket":"1","other_val":"103"} val_103 +{"bucket":"1","other_val":"118"} val_118 +PREHOOK: query: -- {"bucket":"2","other_val":"128"} val_128 +SELECT * FROM hbase_multiscan WHERE key.other_val = "128" +PREHOOK: type: QUERY +PREHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +POSTHOOK: query: -- {"bucket":"2","other_val":"128"} val_128 +SELECT * FROM hbase_multiscan WHERE key.other_val = "128" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@hbase_multiscan +#### A masked pattern was here #### +{"bucket":"2","other_val":"128"} val_128 +PREHOOK: query: DROP TABLE hbase_multiscan +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@hbase_multiscan +PREHOOK: Output: default@hbase_multiscan +POSTHOOK: query: DROP TABLE hbase_multiscan +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@hbase_multiscan +POSTHOOK: Output: default@hbase_multiscan diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index 1a2f1318a9021ad12eba895c9579e761a9de5ed0..ac3f648c16c3b2efcca267b2e9fe1113ad79de04 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -256,58 +256,75 @@ + + + download-spark + + true + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + download-spark + generate-sources + + run + + + + + + + + + + + + + + - - - - org.apache.maven.plugins - maven-antrun-plugin - - - download-spark - generate-sources - - run - - - - - - - - - - - setup-metastore-scripts - process-test-resources - - run - - - - - - - - - - - - - - org.apache.maven.plugins - maven-jar-plugin - ${maven.jar.plugin.version} - - - - test-jar - - - - - - + + + + org.apache.maven.plugins + maven-antrun-plugin + + + setup-metastore-scripts + process-test-resources + + run + + + + + + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + ${maven.jar.plugin.version} + + + + test-jar + + + + + + diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml index 1c3f74c113df4c1d45664f68098ec428666e1d3e..c2fb3a8460eeaa4ea81858063e0ad1c026bfe434 100644 --- a/itests/qtest/pom.xml +++ b/itests/qtest/pom.xml @@ -41,11 +41,11 @@ - - org.apache.accumulo - accumulo-minicluster - test - + + + + + org.apache.hive hive-ant @@ -88,6 +88,12 @@ hive-it-util ${project.version} test + + + org.apache.accumulo + accumulo-minicluster + + org.apache.hive diff --git a/pom.xml b/pom.xml index 64d1d3724ab5499abbb242f33f5ff04e8f2fa76f..9b04ab9a0d35feb5591072a554a8ebd86ae59190 100644 --- a/pom.xml +++ b/pom.xml @@ -71,6 +71,7 @@ file://${test.tmp.dir} ${project.build.directory}/warehouse pfile:// + true @@ -823,7 +824,7 @@ **/TestHiveMetaStore.java ${test.excludes.additional} - true + ${test.redirectTestOutputToFile} false false -Xmx2048m -XX:MaxPermSize=512m diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index 1dbe230917564d5c17c198a898d4de7b52adab3b..1540136ba0df2f375422d0714cc3b06fbb48de54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -342,11 +342,7 @@ private DecomposedPredicate decomposePredicate(ExprNodeDesc predicate, Index ind useSorted = false; } - DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); - decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(searchConditions); - decomposedPredicate.residualPredicate = residualPredicate; - - return decomposedPredicate; + return new DecomposedPredicate(analyzer.translateSearchConditions(searchConditions), null, residualPredicate); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java index 7d7c764f8d800d90c7b23ed61f7deb589e51627d..699f247ef4349933a5bb83c1bb58266b13a1a6ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStoragePredicateHandler.java @@ -64,6 +64,17 @@ public DecomposedPredicate decomposePredicate( * Struct class for returning multiple values from decomposePredicate. */ public static class DecomposedPredicate { + + public DecomposedPredicate() { + + } + + public DecomposedPredicate(ExprNodeGenericFuncDesc pushedPredicate, Serializable pushedPredicateObject, ExprNodeGenericFuncDesc residualPredicate) { + this.pushedPredicate = pushedPredicate; + this.pushedPredicateObject = pushedPredicateObject; + this.residualPredicate = residualPredicate; + } + /** * Portion of predicate to be evaluated by storage handler. Hive * will pass this into the storage handler's input format. diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java index 08a4e991ed3850cab9342df960cd9d3cd4c888a7..e054b00ddc9311582bc532a3d190f9b0fc75ecd7 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java @@ -114,14 +114,17 @@ public static void appendReadColumns( } public static void appendReadColumns(StringBuilder readColumnsBuffer, List ids) { - String id = toReadColumnIDString(ids); - String newConfStr = id; - if (readColumnsBuffer.length() > 0) { - readColumnsBuffer.append(StringUtils.COMMA_STR).append(newConfStr); - } - if (readColumnsBuffer.length() == 0) { + if (ids.isEmpty()) { readColumnsBuffer.append(READ_COLUMN_IDS_CONF_STR_DEFAULT); + return; + } + + // at least one column to append + String newConfStr = toReadColumnIDString(ids); + if (readColumnsBuffer.length() > 0) { + readColumnsBuffer.append(StringUtils.COMMA_STR); } + readColumnsBuffer.append(newConfStr); } private static void appendReadColumnNames(StringBuilder readColumnNamesBuffer, List cols) { diff --git a/shims/0.23/pom.xml b/shims/0.23/pom.xml index 422bc2839046dd328b0900a309ef41dc6a73d786..ac4db6b116090a00e08e5aba75ee9be5ab0fb8ed 100644 --- a/shims/0.23/pom.xml +++ b/shims/0.23/pom.xml @@ -123,6 +123,7 @@ org.apache.hadoop hadoop-yarn-server-resourcemanager ${hadoop-23.version} + true javax.servlet