diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a809f17..7e5a515 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -601,6 +601,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { + "is set to instance of HiveAuthorizerFactory, then this value is ignored."), FIRE_EVENTS_FOR_DML("hive.metastore.dml.events", false, "If true, the metastore will be asked" + " to fire events for DML operations"), + METASTORE_CLIENT_DROP_PARTITIONS_WITH_EXPRESSIONS("hive.metastore.client.drop.partitions.using.expressions", true, + "Choose whether dropping partitions with HCatClient pushes the partition-predicate to the metastore, " + + "or drops partitions iteratively"), // Parameters for exporting metadata on table drop (requires the use of the) // org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener diff --git a/hcatalog/webhcat/java-client/pom.xml b/hcatalog/webhcat/java-client/pom.xml index b93931f..9acf12c 100644 --- a/hcatalog/webhcat/java-client/pom.xml +++ b/hcatalog/webhcat/java-client/pom.xml @@ -45,6 +45,11 @@ hive-hcatalog-core ${project.version} + + org.apache.hive + hive-exec + ${project.version} + org.apache.hive diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java index cd05254..8cb1961 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java @@ -20,12 +20,15 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; @@ -36,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -48,12 +52,25 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The HCatClientHMSImpl is the Hive Metastore client based implementation of @@ -61,6 +78,7 @@ */ public class HCatClientHMSImpl extends HCatClient { + private static final Logger LOG = LoggerFactory.getLogger(HCatClientHMSImpl.class); private HiveMetaStoreClient hmsClient; private Configuration config; private HiveConf hiveConfig; @@ -480,19 +498,122 @@ public void addPartition(HCatAddPartitionDesc partInfo) } } + /** + * Helper class to help build ExprDesc tree to represent the partitions to be dropped. + * Note: At present, the ExpressionBuilder only constructs partition predicates where + * partition-keys equal specific values, and logical-AND expressions. E.g. + * ( dt = '20150310' AND region = 'US' ) + * This only supports the partition-specs specified by the Map argument of: + * {@link org.apache.hive.hcatalog.api.HCatClient#dropPartitions(String, String, Map, boolean)} + */ + private static class ExpressionBuilder { + + private Map partColumnTypesMap = Maps.newHashMap(); + private Map partSpecs; + + public ExpressionBuilder(Table table, Map partSpecs) { + this.partSpecs = partSpecs; + for (FieldSchema partField : table.getPartitionKeys()) { + partColumnTypesMap.put(partField.getName().toLowerCase(), + TypeInfoFactory.getPrimitiveTypeInfo(partField.getType())); + } + } + + private PrimitiveTypeInfo getTypeFor(String partColumn) { + return partColumnTypesMap.get(partColumn.toLowerCase()); + } + + private Object getTypeAppropriateValueFor(PrimitiveTypeInfo type, String value) { + ObjectInspectorConverters.Converter converter = ObjectInspectorConverters.getConverter( + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(TypeInfoFactory.stringTypeInfo), + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(type)); + + return converter.convert(value); + } + + public ExprNodeGenericFuncDesc equalityPredicate(String partColumn, String value) throws SemanticException { + + PrimitiveTypeInfo partColumnType = getTypeFor(partColumn); + ExprNodeColumnDesc partColumnExpr = new ExprNodeColumnDesc(partColumnType, partColumn, null, true); + ExprNodeConstantDesc valueExpr = new ExprNodeConstantDesc(partColumnType, + getTypeAppropriateValueFor(partColumnType, value)); + + return binaryPredicate("=", partColumnExpr, valueExpr); + } + + public ExprNodeGenericFuncDesc binaryPredicate(String function, ExprNodeDesc lhs, ExprNodeDesc rhs) throws SemanticException { + return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getFunctionInfo(function).getGenericUDF(), + Lists.newArrayList(lhs, rhs)); + } + + public ExprNodeGenericFuncDesc build() throws SemanticException { + ExprNodeGenericFuncDesc resultExpr = null; + + for (Map.Entry partSpec : partSpecs.entrySet()) { + String column = partSpec.getKey(); + String value = partSpec.getValue(); + ExprNodeGenericFuncDesc partExpr = equalityPredicate(column, value); + + resultExpr = (resultExpr == null? partExpr : binaryPredicate("and", resultExpr, partExpr)); + } + + return resultExpr; + } + } // class ExpressionBuilder; + + private static boolean isExternal(Table table) { + return table.getParameters() != null + && "TRUE".equalsIgnoreCase(table.getParameters().get("EXTERNAL")); + } + + private void dropPartitionsUsingExpressions(Table table, Map partitionSpec, boolean ifExists) + throws SemanticException, TException { + LOG.info("HCatClient: Dropping partitions using partition-predicate Expressions."); + ExprNodeGenericFuncDesc partitionExpression = new ExpressionBuilder(table, partitionSpec).build(); + ObjectPair serializedPartitionExpression = + new ObjectPair(partitionSpec.size(), + Utilities.serializeExpressionToKryo(partitionExpression)); + hmsClient.dropPartitions(table.getDbName(), table.getTableName(), Arrays.asList(serializedPartitionExpression), + !isExternal(table), // Delete data? + false, // Ignore Protection? + ifExists, // Fail if table doesn't exist? + false); // Need results back? + } + + private void dropPartitionsIteratively(String dbName, String tableName, + Map partitionSpec, boolean ifExists) throws HCatException, TException { + LOG.info("HCatClient: Dropping partitions iteratively."); + List partitions = hmsClient.listPartitionsByFilter(dbName, tableName, + getFilterString(partitionSpec), (short) -1); + for (Partition partition : partitions) { + dropPartition(partition, ifExists); + } + } + @Override public void dropPartitions(String dbName, String tableName, Map partitionSpec, boolean ifExists) throws HCatException { + LOG.info("HCatClient dropPartitions(db=" + dbName + ",table=" + tableName + ", partitionSpec: ["+ partitionSpec + "])."); try { dbName = checkDB(dbName); - List partitions = hmsClient.listPartitionsByFilter(dbName, tableName, - getFilterString(partitionSpec), (short)-1); + Table table = hmsClient.getTable(dbName, tableName); - for (Partition partition : partitions) { - dropPartition(partition, ifExists); + if (hiveConfig.getBoolVar(HiveConf.ConfVars.METASTORE_CLIENT_DROP_PARTITIONS_WITH_EXPRESSIONS)) { + try { + dropPartitionsUsingExpressions(table, partitionSpec, ifExists); + } + catch (SemanticException parseFailure) { + LOG.warn("Could not push down partition-specification to back-end, for dropPartitions(). Resorting to iteration.", + parseFailure); + dropPartitionsIteratively(dbName, tableName, partitionSpec, ifExists); + } + } + else { + // Not using expressions. + dropPartitionsIteratively(dbName, tableName, partitionSpec, ifExists); } - } catch (NoSuchObjectException e) { throw new ObjectNotFoundException( "NoSuchObjectException while dropping partition. " +