Index: hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java (revision 1fc9320f07b066e4850a04958a2c73643b5ad5b1) +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java (revision ) @@ -29,7 +29,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; @@ -84,6 +87,30 @@ throws IOException { job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema)); + } + + /** + * Set expression for predicate pushdown. + * @param job the job object + * @param serializedPredicateExpression Serialized ExprNodeGenericFuncDesc object representing the pushdown-predicate. + * @throws IOException + */ + public static void setPushdownPredicate(Job job, String serializedPredicateExpression) + throws IOException { + // TODO: Consider new HCatConstants constant for the conf-label. + job.getConfiguration().set(TableScanDesc.FILTER_EXPR_CONF_STR, + serializedPredicateExpression); + } + + /** + * Set expression for predicate pushdown. + * @param job the job object + * @param predicateExpression ExprNodeGenericFuncDesc object representing the pushdown-predicate. + * @throws IOException + */ + public static void setPushdownPredicate(Job job, ExprNodeGenericFuncDesc predicateExpression) + throws IOException { + setPushdownPredicate(job, Utilities.serializeExpression(predicateExpression)); } protected static org.apache.hadoop.mapred.InputFormat Index: hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java (revision 1fc9320f07b066e4850a04958a2c73643b5ad5b1) +++ hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java (revision ) @@ -19,19 +19,57 @@ package org.apache.hive.hcatalog.pig; import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; import java.util.Enumeration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.Set; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.Credentials; import org.apache.hive.hcatalog.common.HCatConstants; @@ -39,15 +77,20 @@ import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.Pair; import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.hive.hcatalog.mapreduce.InputJobInfo; import org.apache.hive.hcatalog.mapreduce.SpecialCases; import org.apache.pig.Expression; import org.apache.pig.Expression.BinaryExpression; +import org.apache.pig.Expression.UnaryExpression; +import org.apache.pig.LoadPredicatePushdown; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceStatistics; +import org.apache.pig.data.DataType; import org.apache.pig.impl.util.UDFContext; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +99,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class HCatLoader extends HCatBaseLoader { +public class HCatLoader extends HCatBaseLoader implements LoadPredicatePushdown { private static final Logger LOG = LoggerFactory.getLogger(HCatLoader.class); private static final String PARTITION_FILTER = "partition.filter"; // for future use @@ -67,10 +110,12 @@ private String hcatServerUri; private String partitionFilterString; private final PigHCatUtil phutil = new PigHCatUtil(); + private Map typeInfoMap = null; // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize final public static String INNER_SIGNATURE = "hcatloader.inner.signature"; final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature"; + final public static String PREDICATE_FOR_PUSHDOWN_SUFFIX = TableScanDesc.FILTER_EXPR_CONF_STR; // A hash map which stores job credentials. The key is a signature passed by Pig, which is //unique to the load func and input file name (table, in our case). private static Map jobCredentials = new HashMap(); @@ -135,7 +180,7 @@ //be called many times. for (Entry keyValue : job.getConfiguration()) { String oldValue = clone.getConfiguration().getRaw(keyValue.getKey()); - if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) { + if ((oldValue == null) || !(keyValue.getValue().equals(oldValue))) { udfProps.put(keyValue.getKey(), keyValue.getValue()); } } @@ -161,6 +206,13 @@ if (requiredFieldsInfo != null) { // convert to hcatschema and pass to HCatInputFormat try { + ArrayList columnIds = Lists.newArrayListWithExpectedSize(requiredFieldsInfo.getFields().size()); + ArrayList columnNames = Lists.newArrayListWithExpectedSize(requiredFieldsInfo.getFields().size()); + for (RequiredField rf : requiredFieldsInfo.getFields()) { + columnIds.add(rf.getIndex()); + columnNames.add(rf.getAlias()); + } + ColumnProjectionUtils.appendReadColumns(job.getConfiguration(), columnIds, columnNames); outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(), signature, this.getClass()); HCatInputFormat.setOutputSchema(job, outputSchema); } catch (Exception e) { @@ -172,8 +224,7 @@ // setOutputSchema on HCatInputFormat if (HCatUtil.checkJobContextIfRunningFromBackend(job)) { try { - HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA); - outputSchema = hcatTableSchema; + outputSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA); HCatInputFormat.setOutputSchema(job, outputSchema); } catch (Exception e) { throw new IOException(e); @@ -184,7 +235,16 @@ LOG.debug("outputSchema=" + outputSchema); } + // Handle pushdown predicate. + String pushdownPredicate = udfProps.getProperty(signature + PREDICATE_FOR_PUSHDOWN_SUFFIX); + if (StringUtils.isNotBlank(pushdownPredicate)) { + LOG.info("Pushing down predicate."); + HCatInputFormat.setPushdownPredicate(job, pushdownPredicate); - } + } + else { + LOG.info("Predicate is empty/blank. Skipping pushdown-predicate."); + } + } @Override public String[] getPartitionKeys(String location, Job job) @@ -290,4 +350,205 @@ } } + private static final Set SUPPORTED_PREDICATE_PUSHDOWN_DATA_TYPES = Sets.newHashSet( + DataType.BOOLEAN, + DataType.INTEGER, + DataType.LONG, + DataType.FLOAT, + DataType.DOUBLE, + DataType.DATETIME, + DataType.CHARARRAY, + DataType.BIGINTEGER, + DataType.BIGDECIMAL + ); + + @Override + public List getPredicateFields(String location, Job job) throws IOException { + List allFields = Arrays.asList(getSchema(location, job).getFields()); + Iterable filteredPredicateFields + = Iterables.filter(allFields, new Predicate(){ + @Override + public boolean apply(ResourceSchema.ResourceFieldSchema input) { + return SUPPORTED_PREDICATE_PUSHDOWN_DATA_TYPES.contains(input.getType()); + } + }); + + // Return the names of the filtered predicate-fields. + return Lists.newArrayList( + Iterables.transform(filteredPredicateFields, new Function(){ + @Override + public String apply(ResourceSchema.ResourceFieldSchema input) { + return input.getName(); + } + })); + + } + + @Override + public List getSupportedExpressionTypes() { + return Arrays.asList( + Expression.OpType.OP_EQ, + Expression.OpType.OP_NE, + Expression.OpType.OP_GT, + Expression.OpType.OP_GE, + Expression.OpType.OP_LT, + Expression.OpType.OP_LE, + Expression.OpType.OP_IN, + Expression.OpType.OP_BETWEEN, + Expression.OpType.OP_NULL, + Expression.OpType.OP_NOT, + Expression.OpType.OP_AND, + Expression.OpType.OP_OR + ); + } + + @Override + public void setPushdownPredicate(Expression predicate) throws IOException { + LOG.info("HCatLoader::setPushdownPredicate(). Predicate == " + predicate); + ExprNodeDesc hiveExpression = getHiveExpressionFor(predicate); + try { + LOG.debug("HiveExpression: " + hiveExpression); + ExprNodeGenericFuncDesc genericFuncDesc = (ExprNodeGenericFuncDesc)hiveExpression; + UDFContext.getUDFContext() + .getUDFProperties(getClass(), new String[]{signature}) + .setProperty(signature + PREDICATE_FOR_PUSHDOWN_SUFFIX, + Utilities.serializeExpression(genericFuncDesc)); + } + catch (Exception exception) { + throw new IOException("Invalid pushdown-predicate received: " + + "PigExpr == (" + predicate + "). " + + "HiveExpr == (" + hiveExpression + ")", exception); + } + } + + private ExprNodeDesc getHiveExpressionFor(Expression expression) throws IOException { + + if (expression instanceof BinaryExpression) { + return getHiveExpressionFor((BinaryExpression)expression); + } + else + if (expression instanceof UnaryExpression) { + return getHiveExpressionFor((UnaryExpression)expression); + } + else + if (expression instanceof Expression.Const) { + assert expression.getOpType().equals(Expression.OpType.TERM_CONST); + return new ExprNodeConstantDescConstructor().apply(((Expression.Const)expression).getValue()); + } + else + if (expression instanceof Expression.Column) { + assert expression.getOpType().equals(Expression.OpType.TERM_COL); + Expression.Column columnExpression = (Expression.Column) expression; + return new ExprNodeColumnDesc(getTypeInfoMap().get(columnExpression.getName()), + columnExpression.getName(), + "TableNameNotSet!", // Table-name isn't required, for predicate-pushdown. + false); + } + else { + throw new IOException("Could not convert pig's push-down predicate " + + "(" + expression + ") into a Hive expression."); + } + } + + private ExprNodeGenericFuncDesc getHiveExpressionFor(BinaryExpression binaryPredicate) throws IOException { + + List arguments = Lists.newArrayList(); + // Add LHS column expression. + arguments.add(getHiveExpressionFor(binaryPredicate.getLhs())); + + Expression.OpType opType = binaryPredicate.getOpType(); + if (opType.equals(Expression.OpType.OP_IN)) { + // Add RHS value-list, as constant values. + // TODO: Short circuit for values that aren't BigInt/BigDecimal/DateTime. + arguments.addAll( + Lists.newArrayList( + Iterators.transform( + ((Expression.InExpression) binaryPredicate.getRhs()).getValues().iterator(), + new ExprNodeConstantDescConstructor()) + ) + ); + } + else { + arguments.add(getHiveExpressionFor(binaryPredicate.getRhs())); + } + + try { + return ExprNodeGenericFuncDesc.newInstance(getHiveFunctionFor(opType), arguments); + } + catch (UDFArgumentException exception) { + throw new IOException("Could not convert pig's push-down predicate " + + "(" + binaryPredicate + ") into a Hive expression.", exception); + } + + } + + private static class ExprNodeConstantDescConstructor implements Function { + + @Override + public ExprNodeConstantDesc apply(Object input) { + if (input instanceof BigInteger) { + input = new BigDecimal((BigInteger)input); + } + else + if (input instanceof DateTime) { + input = new Timestamp(((DateTime)input).getMillis()); + } + + return new ExprNodeConstantDesc(input); + } + } + + private ExprNodeGenericFuncDesc getHiveExpressionFor(UnaryExpression unaryPredicate) throws IOException { + + try { + return ExprNodeGenericFuncDesc.newInstance( + getHiveFunctionFor(unaryPredicate.getOpType()), + Collections.singletonList(getHiveExpressionFor(unaryPredicate.getExpression())) + ); + } + catch (UDFArgumentException exception) { + throw new IOException("Could not convert pig's push-down predicate " + + "(" + unaryPredicate + ") into a Hive expression.", exception); + } + + } + + private GenericUDF getHiveFunctionFor(Expression.OpType operator) throws IOException { + switch (operator) { + + // + case OP_AND: return new GenericUDFOPAnd(); + case OP_OR: return new GenericUDFOPOr(); + case OP_EQ: return new GenericUDFOPEqual(); + case OP_NE: return new GenericUDFOPNotEqual(); + case OP_LT: return new GenericUDFOPLessThan(); + case OP_LE: return new GenericUDFOPEqualOrLessThan(); + case OP_GT: return new GenericUDFOPGreaterThan(); + case OP_GE: return new GenericUDFOPEqualOrGreaterThan(); + case OP_BETWEEN: return new GenericUDFBetween(); + case OP_IN: return new GenericUDFIn(); + // + + // + case OP_NOT: return new GenericUDFOPNot(); + case OP_NULL: return new GenericUDFOPNull(); + // + + default: + throw new IOException("Unsupported operator for predicate push-down: " + operator); + } + } + + private Map getTypeInfoMap() { + if (typeInfoMap == null) { + + typeInfoMap = Maps.newHashMapWithExpectedSize(outputSchema.size()); + + for (FieldSchema field : HCatSchemaUtils.getFieldSchemas(outputSchema.getFields())) { + typeInfoMap.put(field.getName(), TypeInfoUtils.getTypeInfoFromTypeString(field.getType())); + } + } + + return typeInfoMap; + } } Index: hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java (revision 1fc9320f07b066e4850a04958a2c73643b5ad5b1) +++ hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java (revision ) @@ -1058,12 +1058,17 @@ assertEquals("Table after deserialization should have been identical to sourceTable.", sourceTable.diff(targetTable), HCatTable.NO_DIFF); + EnumSet ignoreTableProperties + = EnumSet.copyOf(HCatTable.DEFAULT_COMPARISON_ATTRIBUTES); + ignoreTableProperties.remove(HCatTable.TableAttribute.TABLE_PROPERTIES); + // Create table on Target. targetMetaStore.createTable(HCatCreateTableDesc.create(targetTable).build()); // Verify that the created table is identical to sourceTable. targetTable = targetMetaStore.getTable(dbName, tableName); assertEquals("Table after deserialization should have been identical to sourceTable.", - sourceTable.diff(targetTable), HCatTable.NO_DIFF); + sourceTable.diff(targetTable, ignoreTableProperties), // Ignore differences in Table-properties. + HCatTable.NO_DIFF); // Modify sourceTable. List newColumnSchema = new ArrayList(columnSchema); Index: hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (revision 1fc9320f07b066e4850a04958a2c73643b5ad5b1) +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (revision ) @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -195,21 +196,24 @@ return scan; } + String columnString = jobConf.get(serdeConstants.LIST_COLUMNS); + String columnTypeString = jobConf.get(serdeConstants.LIST_COLUMN_TYPES); String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); - if (filterExprSerialized == null) { + if (filterExprSerialized == null || StringUtils.isBlank(columnString) || StringUtils.isBlank(columnTypeString)) { + LOG.warn("Abandoning filter push-down for HBase table."); return scan; } ExprNodeGenericFuncDesc filterExpr = Utilities.deserializeExpression(filterExprSerialized); - String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; - String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey]; + String keyColName = columnString.split(",")[iKey]; + String colType = columnTypeString.split(",")[iKey]; boolean isKeyComparable = isKeyBinary || colType.equalsIgnoreCase("string"); String tsColName = null; if (iTimestamp >= 0) { - tsColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp]; + tsColName = columnString.split(",")[iTimestamp]; } IndexPredicateAnalyzer analyzer = Index: pom.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- pom.xml (revision 1fc9320f07b066e4850a04958a2c73643b5ad5b1) +++ pom.xml (revision ) @@ -154,7 +154,7 @@ 2.0.0-M5 4.0.23.Final 1.8.1 - 0.12.0 + 0.14.0 2.5.0 1.0.1 1.7.5