diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index fee7ffc4c7..89bf8eab5f 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -55,6 +55,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TException; +import com.google.common.collect.ImmutableList; + import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -467,12 +469,12 @@ private static void createPartitionIfNotExists(HiveEndPoint ep, Map partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), ep.partitionVals); - AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(ep.database, ep.table, true); - String partLocation = new Path(tableObject.getDataLocation(), - Warehouse.makePartPath(partSpec)).toString(); - addPartitionDesc.addPartition(partSpec, partLocation); - Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, - addPartitionDesc.getPartition(0), conf); + String partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString(); + AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc( + partSpec, partLocation, null); + AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(ep.database, ep.table, true, + ImmutableList.of(partitionDesc)); + Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, partitionDesc, conf); msClient.add_partition(partition); } catch (AlreadyExistsException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java index 72828efaae..be90d34d73 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.ddl.table.partition; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -32,45 +31,60 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; /** - * DDL task description for ALTER TABLE ... DROP PARTITION ... commands. + * DDL task description for ALTER TABLE ... ADD PARTITION ... commands. */ @Explain(displayName = "Add Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class AlterTableAddPartitionDesc implements DDLDesc, Serializable { private static final long serialVersionUID = 1L; + @Explain(displayName = "Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public static class PartitionDesc { - PartitionDesc( - Map partSpec, String location, Map params) { - this(partSpec, location); - this.partParams = params; - } - - PartitionDesc(Map partSpec, String location) { - this.partSpec = partSpec; + private final Map partitionSpec; + private String location; // TODO: make location final too + private final Map params; + private final String inputFormat; + private final String outputFormat; + private final int numBuckets; + private final List columns; + private final String serializationLib; + private final Map serdeParams; + private final List bucketColumns; + private final List sortColumns; + private final ColumnStatistics columnStats; + private final long writeId; + + public PartitionDesc(Map partitionSpec, String location, Map params) { + this(partitionSpec, location, params, null, null, -1, null, null, null, null, null, null, -1); + } + + public PartitionDesc(Map partitionSpec, String location, Map params, + String inputFormat, String outputFormat, int numBuckets, List columns, String serializationLib, + Map serdeParams, List bucketColumns, List sortColumns, + ColumnStatistics columnStats, long writeId) { + this.partitionSpec = partitionSpec; this.location = location; + this.params = params; + this.inputFormat = inputFormat; + this.outputFormat = outputFormat; + this.numBuckets = numBuckets; + this.columns = columns; + this.serializationLib = serializationLib; + this.serdeParams = serdeParams; + this.bucketColumns = bucketColumns; + this.sortColumns = sortColumns; + this.columnStats = columnStats; + this.writeId = writeId; } - Map partSpec; - Map partParams; - String location; - String inputFormat = null; - String outputFormat = null; - int numBuckets = -1; - List cols = null; - String serializationLib = null; - Map serdeParams = null; - List bucketCols = null; - List sortCols = null; - ColumnStatistics colStats = null; - long writeId = -1; - + @Explain(displayName = "partition spec", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public Map getPartSpec() { - return partSpec; + return partitionSpec; } /** * @return location of partition in relation to table */ + @Explain(displayName = "location", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getLocation() { return location; } @@ -79,228 +93,100 @@ public void setLocation(String location) { this.location = location; } + @Explain(displayName = "params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public Map getPartParams() { - return partParams; + return params; + } + + @Explain(displayName = "input format", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public String getInputFormat() { + return inputFormat; } - public void setPartParams(Map partParams) { - this.partParams = partParams; + @Explain(displayName = "output format", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public String getOutputFormat() { + return outputFormat; } public int getNumBuckets() { return numBuckets; } - public void setNumBuckets(int numBuckets) { - this.numBuckets = numBuckets; + @Explain(displayName = "num buckets", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public Integer getNumBucketsExplain() { + return numBuckets == -1 ? null : numBuckets; } + @Explain(displayName = "columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public List getCols() { - return cols; - } - - public void setCols(List cols) { - this.cols = cols; + return columns; } + @Explain(displayName = "serialization lib", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getSerializationLib() { return serializationLib; } - public void setSerializationLib(String serializationLib) { - this.serializationLib = serializationLib; - } - + @Explain(displayName = "serde params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public Map getSerdeParams() { return serdeParams; } - public void setSerdeParams(Map serdeParams) { - this.serdeParams = serdeParams; - } - + @Explain(displayName = "bucket columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public List getBucketCols() { - return bucketCols; - } - - public void setBucketCols(List bucketCols) { - this.bucketCols = bucketCols; + return bucketColumns; } + @Explain(displayName = "sort columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public List getSortCols() { - return sortCols; + return sortColumns; } - public void setSortCols(List sortCols) { - this.sortCols = sortCols; + @Explain(displayName = "column stats", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public ColumnStatistics getColStats() { + return columnStats; } - public String getInputFormat() { - return inputFormat; + public long getWriteId() { + return writeId; } - - public void setInputFormat(String inputFormat) { - this.inputFormat = inputFormat; - } - - public String getOutputFormat() { - return outputFormat; - } - - public void setOutputFormat(String outputFormat) { - this.outputFormat = outputFormat; - } - - public ColumnStatistics getColStats() { return colStats; } - - public void setColStats(ColumnStatistics colStats) { this.colStats = colStats; } - - public long getWriteId() { return writeId; } - - public void setWriteId(long writeId) { this.writeId = writeId; } } - String tableName; - String dbName; - boolean ifNotExists; - List partitions = null; - boolean replaceMode = false; - private ReplicationSpec replicationSpec = null; - - - /** - * For serialization only. - */ - public AlterTableAddPartitionDesc() { - } + private final String dbName; + private final String tableName; + private final boolean ifNotExists; + private final List partitions; - public AlterTableAddPartitionDesc( - String dbName, String tableName, boolean ifNotExists) { - super(); - this.dbName = dbName; - this.tableName = tableName; - this.ifNotExists = ifNotExists; - } + private boolean replaceMode = false; // TODO: make replaceMode final too + private ReplicationSpec replicationSpec = null; // TODO: make replicationSpec final too - /** - * Legacy single-partition ctor for ImportSemanticAnalyzer - * @param dbName - * database to add to. - * @param tableName - * table to add to. - * @param partSpec - * partition specification. - * @param location - * partition location, relative to table location. - * @param params - * partition parameters. - */ - @Deprecated - public AlterTableAddPartitionDesc(String dbName, String tableName, - Map partSpec, String location, Map params) { - super(); + public AlterTableAddPartitionDesc(String dbName, String tableName, boolean ifNotExists, + List partitions) { this.dbName = dbName; this.tableName = tableName; this.ifNotExists = true; - addPartition(partSpec, location, params); - } - - public void addPartition(Map partSpec, String location) { - addPartition(partSpec, location, null); - } - - private void addPartition( - Map partSpec, String location, Map params) { - if (this.partitions == null) { - this.partitions = new ArrayList(); - } - this.partitions.add(new PartitionDesc(partSpec, location, params)); + this.partitions = partitions; } - /** - * @return database name - */ + @Explain(displayName = "db name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getDbName() { return dbName; } - /** - * @param dbName - * database name - */ - public void setDbName(String dbName) { - this.dbName = dbName; - } - - /** - * @return the table we're going to add the partitions to. - */ + @Explain(displayName = "table name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getTableName() { return tableName; } - /** - * @param tableName - * the table we're going to add the partitions to. - */ - public void setTableName(String tableName) { - this.tableName = tableName; - } - - /** - * @return location of partition in relation to table - */ - @Explain(displayName = "Location") - public String getLocationForExplain() { - if (this.partitions == null || this.partitions.isEmpty()) return ""; - boolean isFirst = true; - StringBuilder sb = new StringBuilder(); - for (PartitionDesc desc : this.partitions) { - if (!isFirst) { - sb.append(", "); - } - isFirst = false; - sb.append(desc.location); - } - return sb.toString(); - } - - @Explain(displayName = "Spec") - public String getPartSpecStringForExplain() { - if (this.partitions == null || this.partitions.isEmpty()) return ""; - boolean isFirst = true; - StringBuilder sb = new StringBuilder(); - for (PartitionDesc desc : this.partitions) { - if (!isFirst) { - sb.append(", "); - } - isFirst = false; - sb.append(desc.partSpec.toString()); - } - return sb.toString(); - } - - /** - * @return if the partition should only be added if it doesn't exist already - */ + @Explain(displayName = "if not exists", displayOnlyOnTrue = true, + explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public boolean isIfNotExists() { - return this.ifNotExists; - } - - /** - * @param ifNotExists - * if the part should be added only if it doesn't exist - */ - public void setIfNotExists(boolean ifNotExists) { - this.ifNotExists = ifNotExists; - } - - public int getPartitionCount() { - return this.partitions.size(); + return ifNotExists; } - public PartitionDesc getPartition(int i) { - return this.partitions.get(i); + @Explain(displayName = "partitions", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public List getPartitions() { + return partitions; } /** @@ -314,7 +200,7 @@ public void setReplaceMode(boolean replaceMode){ * @return true if this AddPartition should behave like a replace-into alter instead */ public boolean getReplaceMode() { - return this.replaceMode; + return replaceMode; } /** @@ -331,8 +217,8 @@ public void setReplicationSpec(ReplicationSpec replicationSpec) { */ public ReplicationSpec getReplicationSpec(){ if (replicationSpec == null){ - this.replicationSpec = new ReplicationSpec(); + replicationSpec = new ReplicationSpec(); } - return this.replicationSpec; + return replicationSpec; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 64f9af3aba..79a93c34f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -41,9 +42,12 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration; +import com.google.common.collect.ImmutableList; + import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater; @@ -146,7 +150,7 @@ public ImportTableDesc tableDesc(String dbName) throws SemanticException { //TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose. for (Partition partition : metadata.getPartitions()) { // TODO: this should ideally not create AddPartitionDesc per partition - AlterTableAddPartitionDesc partsDesc = partitionDesc(fromPath, tblDesc, partition); + AlterTableAddPartitionDesc partsDesc = addPartitionDesc(fromPath, tblDesc, partition); descs.add(partsDesc); } return descs; @@ -167,46 +171,42 @@ public ImportTableDesc tableDesc(String dbName) throws SemanticException { return partitions; } - private AlterTableAddPartitionDesc partitionDesc(Path fromPath, - ImportTableDesc tblDesc, Partition partition) throws SemanticException { + private AlterTableAddPartitionDesc addPartitionDesc(Path fromPath, ImportTableDesc tblDesc, Partition partition) + throws SemanticException { try { - AlterTableAddPartitionDesc partsDesc = - new AlterTableAddPartitionDesc(tblDesc.getDatabaseName(), tblDesc.getTableName(), - EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), - partition.getSd().getLocation(), partition.getParameters()); - AlterTableAddPartitionDesc.PartitionDesc partDesc = partsDesc.getPartition(0); - partDesc.setInputFormat(partition.getSd().getInputFormat()); - partDesc.setOutputFormat(partition.getSd().getOutputFormat()); - partDesc.setNumBuckets(partition.getSd().getNumBuckets()); - partDesc.setCols(partition.getSd().getCols()); - partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib()); - partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters()); - partDesc.setBucketCols(partition.getSd().getBucketCols()); - partDesc.setSortCols(partition.getSd().getSortCols()); - if (tblDesc.isExternal() && !replicationSpec().isMigratingToExternalTable()) { - // we have to provide the source location so target location can be derived. - partDesc.setLocation(partition.getSd().getLocation()); - } else { + Map partitionSpec = EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()); + + StorageDescriptor sd = partition.getSd(); + String location = sd.getLocation(); + if (!tblDesc.isExternal() || replicationSpec().isMigratingToExternalTable()) { /** * this is required for file listing of all files in a partition for managed table as described in * {@link org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator} */ - partDesc.setLocation(new Path(fromPath, - Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); + location = new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString(); } - partsDesc.setReplicationSpec(replicationSpec()); + ColumnStatistics columnStatistics = null; + long writeId = -1; if (partition.isSetColStats()) { ColumnStatistics colStats = partition.getColStats(); ColumnStatisticsDesc colStatsDesc = new ColumnStatisticsDesc(colStats.getStatsDesc()); colStatsDesc.setTableName(tblDesc.getTableName()); colStatsDesc.setDbName(tblDesc.getDatabaseName()); - partDesc.setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj())); - long writeId = replicationSpec().isMigratingToTxnTable() ? + columnStatistics = new ColumnStatistics(colStatsDesc, colStats.getStatsObj()); + writeId = replicationSpec().isMigratingToTxnTable() ? ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID : partition.getWriteId(); - partDesc.setWriteId(writeId); } - return partsDesc; + + AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc( + partitionSpec, location, partition.getParameters(), sd.getInputFormat(), sd.getOutputFormat(), + sd.getNumBuckets(), sd.getCols(), sd.getSerdeInfo().getSerializationLib(), sd.getSerdeInfo().getParameters(), + sd.getBucketCols(), sd.getSortCols(), columnStatistics, writeId); + + AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(tblDesc.getDatabaseName(), + tblDesc.getTableName(), true, ImmutableList.of(partitionDesc)); + addPartitionDesc.setReplicationSpec(replicationSpec()); + return addPartitionDesc; } catch (Exception e) { throw new SemanticException(e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index c728e2d49a..40020ed257 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -182,7 +182,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc */ private Task tasksForAddPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, Task ptnRootTask) throws MetaException, HiveException { - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0); + AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); partSpec.setLocation(replicaWarehousePartitionLocation.toString()); @@ -362,7 +362,7 @@ private TaskTracker forExistingTable(AlterTableAddPartitionDesc lastPartitionRep boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null); Map lastReplicatedPartSpec = null; if (!encounteredTheLastReplicatedPartition) { - lastReplicatedPartSpec = lastPartitionReplicated.getPartition(0).getPartSpec(); + lastReplicatedPartSpec = lastPartitionReplicated.getPartitions().get(0).getPartSpec(); LOG.info("Start processing from partition info spec : {}", StringUtils.mapToString(lastReplicatedPartSpec)); } @@ -370,13 +370,13 @@ private TaskTracker forExistingTable(AlterTableAddPartitionDesc lastPartitionRep Iterator partitionIterator = event.partitionDescriptions(tableDesc).iterator(); while (!encounteredTheLastReplicatedPartition && partitionIterator.hasNext()) { AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next(); - Map currentSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Map currentSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); } while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next(); - Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); Task ptnRootTask = null; ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec); switch (loadPtnType) { diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 691f3ee2e9..e0f000a382 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3002,17 +3002,16 @@ public Partition createPartition(Table tbl, Map partSpec) throws public List createPartitions(AlterTableAddPartitionDesc addPartitionDesc) throws HiveException { // TODO: catalog name everywhere in this method Table tbl = getTable(addPartitionDesc.getDbName(), addPartitionDesc.getTableName()); - int size = addPartitionDesc.getPartitionCount(); List in = - new ArrayList(size); + new ArrayList(addPartitionDesc.getPartitions().size()); long writeId; String validWriteIdList; // In case of replication, get the writeId from the source and use valid write Id list // for replication. if (addPartitionDesc.getReplicationSpec().isInReplicationScope() && - addPartitionDesc.getPartition(0).getWriteId() > 0) { - writeId = addPartitionDesc.getPartition(0).getWriteId(); + addPartitionDesc.getPartitions().get(0).getWriteId() > 0) { + writeId = addPartitionDesc.getPartitions().get(0).getWriteId(); // We need a valid writeId list for a transactional change. During replication we do not // have a valid writeId list which was used for this on the source. But we know for sure // that the writeId associated with it was valid then (otherwise the change would have @@ -3030,9 +3029,8 @@ public Partition createPartition(Table tbl, Map partSpec) throws validWriteIdList = null; } } - for (int i = 0; i < size; ++i) { - org.apache.hadoop.hive.metastore.api.Partition tmpPart = - convertAddSpecToMetaPartition(tbl, addPartitionDesc.getPartition(i), conf); + for (AlterTableAddPartitionDesc.PartitionDesc partitionDesc : addPartitionDesc.getPartitions()) { + org.apache.hadoop.hive.metastore.api.Partition tmpPart = convertAddSpecToMetaPartition(tbl, partitionDesc, conf); if (tmpPart != null && writeId > 0) { tmpPart.setWriteId(writeId); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index f8d906f135..ab4f43c126 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -3505,15 +3505,15 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole // ^(TOK_ALTERTABLE_ADDPARTS identifier ifNotExists? alterStatementSuffixAddPartitionsElement+) boolean ifNotExists = ast.getChild(0).getType() == HiveParser.TOK_IFNOTEXISTS; - Table tab = getTable(qualified); - boolean isView = tab.isView(); - validateAlterTableType(tab, AlterTableType.ADDPARTITION, expectView); - outputs.add(new WriteEntity(tab, + Table table = getTable(qualified); + boolean isView = table.isView(); + validateAlterTableType(table, AlterTableType.ADDPARTITION, expectView); + outputs.add(new WriteEntity(table, /*use DDL_EXCLUSIVE to cause X lock to prevent races between concurrent add partition calls with IF NOT EXISTS. w/o this 2 concurrent calls to add the same partition may both add data since for transactional tables creating partition metadata and moving data there are 2 separate actions. */ - ifNotExists && AcidUtils.isTransactionalTable(tab) ? WriteType.DDL_EXCLUSIVE + ifNotExists && AcidUtils.isTransactionalTable(table) ? WriteType.DDL_EXCLUSIVE : WriteEntity.WriteType.DDL_SHARED)); int numCh = ast.getChildCount(); @@ -3522,17 +3522,17 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole String currentLocation = null; Map currentPart = null; // Parser has done some verification, so the order of tokens doesn't need to be verified here. - AlterTableAddPartitionDesc addPartitionDesc = - new AlterTableAddPartitionDesc(tab.getDbName(), tab.getTableName(), ifNotExists); + + List partitions = new ArrayList<>(); for (int num = start; num < numCh; num++) { ASTNode child = (ASTNode) ast.getChild(num); switch (child.getToken().getType()) { case HiveParser.TOK_PARTSPEC: if (currentPart != null) { - addPartitionDesc.addPartition(currentPart, currentLocation); + partitions.add(createPartitionDesc(table, currentLocation, currentPart)); currentLocation = null; } - currentPart = getValidatedPartSpec(tab, child, conf, true); + currentPart = getValidatedPartSpec(table, child, conf, true); validatePartitionValues(currentPart); // validate reserved values break; case HiveParser.TOK_PARTITIONLOCATION: @@ -3550,31 +3550,21 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole // add the last one if (currentPart != null) { - addPartitionDesc.addPartition(currentPart, currentLocation); + partitions.add(createPartitionDesc(table, currentLocation, currentPart)); } - if (this.conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { - for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) { - PartitionDesc desc = addPartitionDesc.getPartition(index); - if (desc.getLocation() == null) { - if (desc.getPartParams() == null) { - desc.setPartParams(new HashMap()); - } - StatsSetupConst.setStatsStateForCreateTable(desc.getPartParams(), - MetaStoreUtils.getColumnNames(tab.getCols()), StatsSetupConst.TRUE); - } - } - } - - if (addPartitionDesc.getPartitionCount() == 0) { + if (partitions.isEmpty()) { // nothing to do return; } + AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(table.getDbName(), + table.getTableName(), ifNotExists, partitions); + Task ddlTask = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc)); rootTasks.add(ddlTask); - handleTransactionalTable(tab, addPartitionDesc, ddlTask); + handleTransactionalTable(table, addPartitionDesc, ddlTask); if (isView) { // Compile internal query to capture underlying table partition dependencies @@ -3585,8 +3575,7 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole cmd.append(HiveUtils.unparseIdentifier(qualified[1])); cmd.append(" WHERE "); boolean firstOr = true; - for (int i = 0; i < addPartitionDesc.getPartitionCount(); ++i) { - AlterTableAddPartitionDesc.PartitionDesc partitionDesc = addPartitionDesc.getPartition(i); + for (AlterTableAddPartitionDesc.PartitionDesc partitionDesc : partitions) { if (firstOr) { firstOr = false; } else { @@ -3619,6 +3608,17 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole } } + private AlterTableAddPartitionDesc.PartitionDesc createPartitionDesc(Table table, String currentLocation, + Map currentPart) { + Map params = null; + if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER) && currentLocation == null) { + params = new HashMap(); + StatsSetupConst.setStatsStateForCreateTable(params, + MetaStoreUtils.getColumnNames(table.getCols()), StatsSetupConst.TRUE); + } + return new AlterTableAddPartitionDesc.PartitionDesc(currentPart, currentLocation, params); + } + /** * Add partition for Transactional tables needs to add (copy/rename) the data so that it lands * in a delta_x_x/ folder in the partition dir. @@ -3631,13 +3631,12 @@ private void handleTransactionalTable(Table tab, AlterTableAddPartitionDesc addP Long writeId = null; int stmtId = 0; - for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) { - PartitionDesc desc = addPartitionDesc.getPartition(index); - if (desc.getLocation() != null) { - AcidUtils.validateAcidPartitionLocation(desc.getLocation(), conf); + for (AlterTableAddPartitionDesc.PartitionDesc partitonDesc : addPartitionDesc.getPartitions()) { + if (partitonDesc.getLocation() != null) { + AcidUtils.validateAcidPartitionLocation(partitonDesc.getLocation(), conf); if(addPartitionDesc.isIfNotExists()) { //Don't add partition data if it already exists - Partition oldPart = getPartition(tab, desc.getPartSpec(), false); + Partition oldPart = getPartition(tab, partitonDesc.getPartSpec(), false); if(oldPart != null) { continue; } @@ -3653,15 +3652,15 @@ private void handleTransactionalTable(Table tab, AlterTableAddPartitionDesc addP } stmtId = getTxnMgr().getStmtIdAndIncrement(); } - LoadTableDesc loadTableWork = new LoadTableDesc(new Path(desc.getLocation()), - Utilities.getTableDesc(tab), desc.getPartSpec(), + LoadTableDesc loadTableWork = new LoadTableDesc(new Path(partitonDesc.getLocation()), + Utilities.getTableDesc(tab), partitonDesc.getPartSpec(), LoadTableDesc.LoadFileType.KEEP_EXISTING, //not relevant - creating new partition writeId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(true); try { - desc.setLocation(new Path(tab.getDataLocation(), - Warehouse.makePartPath(desc.getPartSpec())).toString()); + partitonDesc.setLocation(new Path(tab.getDataLocation(), + Warehouse.makePartPath(partitonDesc.getPartSpec())).toString()); } catch (MetaException ex) { throw new SemanticException("Could not determine partition path due to: " diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 687122a45f..aa4a317cdd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -70,6 +71,9 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; + import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import java.io.IOException; @@ -322,10 +326,17 @@ public static boolean prepareImport(boolean isImportCmd, x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf())); } - if ((parsedTableName != null) && (!parsedTableName.isEmpty())) { + if (StringUtils.isNotBlank(parsedTableName)) { tblDesc.setTableName(parsedTableName); } + if (tblDesc.getTableName() == null) { + // Either we got the tablename from the IMPORT statement (first priority) or from the export dump. + throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg()); + } else { + x.getConf().set("import.destination.table", tblDesc.getTableName()); + } + List partitionDescs = new ArrayList<>(); Iterable partitions = rv.getPartitions(); for (Partition partition : partitions) { @@ -343,7 +354,7 @@ public static boolean prepareImport(boolean isImportCmd, for (Iterator partnIter = partitionDescs .listIterator(); partnIter.hasNext(); ) { AlterTableAddPartitionDesc addPartitionDesc = partnIter.next(); - if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) { + if (!found && addPartitionDesc.getPartitions().get(0).getPartSpec().equals(parsedPartSpec)) { found = true; } else { partnIter.remove(); @@ -356,17 +367,6 @@ public static boolean prepareImport(boolean isImportCmd, } } - if (tblDesc.getTableName() == null) { - // Either we got the tablename from the IMPORT statement (first priority) - // or from the export dump. - throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg()); - } else { - x.getConf().set("import.destination.table", tblDesc.getTableName()); - for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) { - addPartitionDesc.setTableName(tblDesc.getTableName()); - } - } - Warehouse wh = new Warehouse(x.getConf()); Table table = tableIfExists(tblDesc, x.getHive()); boolean tableExists = false; @@ -410,36 +410,32 @@ public static boolean prepareImport(boolean isImportCmd, return tableExists; } - private static AlterTableAddPartitionDesc getBaseAddPartitionDescFromPartition( - Path fromPath, String dbName, ImportTableDesc tblDesc, Partition partition, - ReplicationSpec replicationSpec, HiveConf conf) + private static AlterTableAddPartitionDesc getBaseAddPartitionDescFromPartition(Path fromPath, String dbName, + ImportTableDesc tblDesc, Partition partition, ReplicationSpec replicationSpec, HiveConf conf) throws MetaException, SemanticException { - AlterTableAddPartitionDesc partsDesc = new AlterTableAddPartitionDesc(dbName, tblDesc.getTableName(), - EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), - partition.getSd().getLocation(), partition.getParameters()); - AlterTableAddPartitionDesc.PartitionDesc partDesc = partsDesc.getPartition(0); - partDesc.setInputFormat(partition.getSd().getInputFormat()); - partDesc.setOutputFormat(partition.getSd().getOutputFormat()); - partDesc.setNumBuckets(partition.getSd().getNumBuckets()); - partDesc.setCols(partition.getSd().getCols()); - partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib()); - partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters()); - partDesc.setBucketCols(partition.getSd().getBucketCols()); - partDesc.setSortCols(partition.getSd().getSortCols()); - if (replicationSpec.isInReplicationScope() && tblDesc.isExternal() - && !replicationSpec.isMigratingToExternalTable()) { - String newLocation = ReplExternalTables - .externalTableLocation(conf, partition.getSd().getLocation()); - LOG.debug("partition {} has data location: {}", partition, newLocation); - partDesc.setLocation(newLocation); + Map partitionSpec = EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()); + + StorageDescriptor sd = partition.getSd(); + + String location = null; + if (replicationSpec.isInReplicationScope() && tblDesc.isExternal() && + !replicationSpec.isMigratingToExternalTable()) { + location = ReplExternalTables.externalTableLocation(conf, partition.getSd().getLocation()); + LOG.debug("partition {} has data location: {}", partition, location); } else { - partDesc.setLocation(new Path(fromPath, - Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); + location = new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString(); } + + long writeId = -1; if (tblDesc.getReplWriteId() != null) { - partDesc.setWriteId(tblDesc.getReplWriteId()); + writeId = tblDesc.getReplWriteId(); } - return partsDesc; + + AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc( + partitionSpec, location, sd.getParameters(), sd.getInputFormat(), sd.getOutputFormat(), sd.getNumBuckets(), + sd.getCols(), sd.getSerdeInfo().getSerializationLib(), sd.getSerdeInfo().getParameters(), sd.getBucketCols(), + sd.getSortCols(), null, writeId); + return new AlterTableAddPartitionDesc(dbName, tblDesc.getTableName(), true, ImmutableList.of(partitionDesc)); } private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, @@ -570,7 +566,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { addPartitionDesc.setReplicationSpec(replicationSpec); } - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0); + AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); if (ptn == null) { fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x); } else if (!externalTablePartition(tblDesc, replicationSpec)) { @@ -584,7 +580,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0); + AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); boolean isAutoPurge = false; boolean needRecycle = false; boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); @@ -1038,7 +1034,7 @@ private static void createRegularImportTasks( x.getLOG().debug("table partitioned"); for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) { - Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( @@ -1235,7 +1231,7 @@ private static void createReplImportTasks( } if (updatedMetadata != null) { updatedMetadata.addPartition(table.getDbName(), table.getTableName(), - addPartitionDesc.getPartition(0).getPartSpec()); + addPartitionDesc.getPartitions().get(0).getPartSpec()); } } } else if (!replicationSpec.isMetadataOnly() @@ -1289,7 +1285,7 @@ private static void createReplImportTasks( x.getLOG().debug("table partitioned"); for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); - Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if (isOldTableValid) { // If existing table is valid but the partition spec is different, then ignore partition @@ -1308,15 +1304,13 @@ private static void createReplImportTasks( x.getTasks().add(addSinglePartition( tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { - updatedMetadata.addPartition(table.getDbName(), table.getTableName(), - addPartitionDesc.getPartition(0).getPartSpec()); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec); } } else { x.getTasks().add(alterSinglePartition( tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x)); if (updatedMetadata != null) { - updatedMetadata.addPartition(table.getDbName(), table.getTableName(), - addPartitionDesc.getPartition(0).getPartSpec()); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec); } } } else { @@ -1331,8 +1325,7 @@ private static void createReplImportTasks( tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); } if (updatedMetadata != null) { - updatedMetadata.addPartition(table.getDbName(), table.getTableName(), - addPartitionDesc.getPartition(0).getPartSpec()); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec); } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; diff --git ql/src/test/results/clientpositive/add_part_multiple.q.out ql/src/test/results/clientpositive/add_part_multiple.q.out index 81454f7dc5..676b36e095 100644 --- ql/src/test/results/clientpositive/add_part_multiple.q.out +++ ql/src/test/results/clientpositive/add_part_multiple.q.out @@ -30,8 +30,32 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition -#### A masked pattern was here #### - Spec: {ds=2010-01-01}, {ds=2010-02-01}, {ds=2010-03-01}, {ds=2010-04-01} + db name: default + partitions: + Partition + location: A + partition spec: + ds 2010-01-01 + Partition + location: B + partition spec: + ds 2010-02-01 + Partition + params: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + numFiles 0 + numFilesErasureCoded 0 + numRows 0 + rawDataSize 0 + totalSize 0 + partition spec: + ds 2010-03-01 + Partition + location: C + partition spec: + ds 2010-04-01 + table name: add_part_test_n1 + if not exists: true PREHOOK: query: ALTER TABLE add_part_test_n1 ADD IF NOT EXISTS PARTITION (ds='2010-01-01') location 'A' diff --git ql/src/test/results/clientpositive/drop_partitions_filter.q.out ql/src/test/results/clientpositive/drop_partitions_filter.q.out index 457fa0bc2f..b8a0575600 100644 --- ql/src/test/results/clientpositive/drop_partitions_filter.q.out +++ ql/src/test/results/clientpositive/drop_partitions_filter.q.out @@ -33,8 +33,21 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition -#### A masked pattern was here #### - Spec: {c=US, d=1} + db name: default + partitions: + Partition + params: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true"}} + numFiles 0 + numFilesErasureCoded 0 + numRows 0 + rawDataSize 0 + totalSize 0 + partition spec: + c US + d 1 + table name: ptestfilter_n1 + if not exists: true PREHOOK: query: alter table ptestfilter_n1 add partition (c='US', d=1) PREHOOK: type: ALTERTABLE_ADDPARTS diff --git ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out index b307c021ab..c921d88dc7 100644 --- ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out +++ ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out @@ -31,8 +31,14 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition -#### A masked pattern was here #### - Spec: {day=20110102} + db name: default + partitions: + Partition + location: hdfs://### HDFS PATH ### + partition spec: + day 20110102 + table name: supply + if not exists: true Stage: Stage-1 Move Operator diff --git ql/src/test/results/clientpositive/spark/add_part_multiple.q.out ql/src/test/results/clientpositive/spark/add_part_multiple.q.out index 81454f7dc5..676b36e095 100644 --- ql/src/test/results/clientpositive/spark/add_part_multiple.q.out +++ ql/src/test/results/clientpositive/spark/add_part_multiple.q.out @@ -30,8 +30,32 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition -#### A masked pattern was here #### - Spec: {ds=2010-01-01}, {ds=2010-02-01}, {ds=2010-03-01}, {ds=2010-04-01} + db name: default + partitions: + Partition + location: A + partition spec: + ds 2010-01-01 + Partition + location: B + partition spec: + ds 2010-02-01 + Partition + params: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + numFiles 0 + numFilesErasureCoded 0 + numRows 0 + rawDataSize 0 + totalSize 0 + partition spec: + ds 2010-03-01 + Partition + location: C + partition spec: + ds 2010-04-01 + table name: add_part_test_n1 + if not exists: true PREHOOK: query: ALTER TABLE add_part_test_n1 ADD IF NOT EXISTS PARTITION (ds='2010-01-01') location 'A' diff --git streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index f2beafea9e..4177413472 100644 --- streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; /** * Streaming connection implementation for hive. To create a streaming connection, use the builder API @@ -437,11 +438,13 @@ public PartitionInfo createPartitionIfNotExists(final List partitionValu try { Map partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues); - AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(database, table, true); - partName = Warehouse.makePartName(tableObject.getPartitionKeys(), partitionValues); partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString(); - addPartitionDesc.addPartition(partSpec, partLocation); - Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf); + AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc( + partSpec, partLocation, null); + AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(database, table, true, + ImmutableList.of(partitionDesc)); + partName = Warehouse.makePartName(tableObject.getPartitionKeys(), partitionValues); + Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, partitionDesc, conf); if (getMSC() == null) { // We assume it doesn't exist if we can't check it