diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 0d9c1a6..583b82b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -353,6 +353,7 @@ public int execute(DriverContext driverContext) { pushFeed(FeedType.DYNAMIC_PARTITIONS, dps); } + long startTime = System.currentTimeMillis(); // load the list of DP partitions and return the list of partition specs // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions // to use Utilities.getFullDPSpecs() to get the list of full partSpecs. @@ -360,7 +361,7 @@ public int execute(DriverContext driverContext) { // iterate over it and call loadPartition() here. // The reason we don't do inside HIVE-1361 is the latter is large and we // want to isolate any potential issue it may introduce. - ArrayList> dp = + Map, Partition> dp = db.loadDynamicPartitions( tbd.getSourcePath(), tbd.getTable().getTableName(), @@ -370,16 +371,19 @@ public int execute(DriverContext driverContext) { tbd.getHoldDDLTime(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID); + console.printInfo("\t Time taken for load dynamic partitions : " + + (System.currentTimeMillis() - startTime)); if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) { throw new HiveException("This query creates no partitions." + " To turn off this error, set hive.error.on.empty.partition=false."); } + startTime = System.currentTimeMillis(); // for each partition spec, get the partition // and put it to WriteEntity for post-exec hook - for (LinkedHashMap partSpec: dp) { - Partition partn = db.getPartition(table, partSpec, false); + for(Map.Entry, Partition> entry : dp.entrySet()) { + Partition partn = entry.getValue(); if (bucketCols != null || sortCols != null) { updatePartitionBucketSortColumns(table, partn, bucketCols, numBuckets, sortCols); @@ -412,8 +416,10 @@ public int execute(DriverContext driverContext) { table.getCols()); } - console.printInfo("\tLoading partition " + partSpec); + console.printInfo("\tLoading partition " + entry.getKey()); } + console.printInfo("\t Time taken for adding to write entity : " + + (System.currentTimeMillis() - startTime)); dc = null; // reset data container to prevent it being added again. } else { // static partitions List partVals = MetaStoreUtils.getPvals(table.getPartCols(), diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 4941427..a9efdf4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1237,6 +1237,15 @@ public Database getDatabaseCurrent() throws HiveException { return getDatabase(currentDb); } + public void loadPartition(Path loadPath, String tableName, + Map partSpec, boolean replace, boolean holdDDLTime, + boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, + boolean isSrcLocal, boolean isAcid) throws HiveException { + Table tbl = getTable(tableName); + loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs, + isSkewedStoreAsSubdir, isSrcLocal, isAcid); + } + /** * Load a directory into a Hive Table Partition - Alters existing content of * the partition with the contents of loadPath. - If the partition does not @@ -1245,7 +1254,7 @@ public Database getDatabaseCurrent() throws HiveException { * * @param loadPath * Directory containing files to load into Table - * @param tableName + * @param tbl * name of table to be loaded. * @param partSpec * defines which partition needs to be loaded @@ -1258,12 +1267,12 @@ public Database getDatabaseCurrent() throws HiveException { * @param isSrcLocal * If the source directory is LOCAL */ - public void loadPartition(Path loadPath, String tableName, + public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, boolean holdDDLTime, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid) throws HiveException { - Table tbl = getTable(tableName); Path tblDataLocationPath = tbl.getDataLocation(); + Partition newTPart = null; try { /** * Move files before creating the partition since down stream processes @@ -1312,10 +1321,10 @@ public void loadPartition(Path loadPath, String tableName, Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid); } + boolean forceCreate = (!holdDDLTime) ? true : false; + newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs); // recreate the partition if it existed before if (!holdDDLTime) { - Partition newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), - inheritTableSpecs); if (isSkewedStoreAsSubdir) { org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition(); SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo(); @@ -1325,9 +1334,9 @@ public void loadPartition(Path loadPath, String tableName, /* Add list bucketing location mappings. */ skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps); newCreatedTpart.getSd().setSkewedInfo(skewedInfo); - alterPartition(tbl.getTableName(), new Partition(tbl, newCreatedTpart)); + alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart)); newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs); - newCreatedTpart = newTPart.getTPartition(); + return new Partition(tbl, newCreatedTpart); } } } catch (IOException e) { @@ -1340,7 +1349,7 @@ public void loadPartition(Path loadPath, String tableName, LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); } - + return newTPart; } /** @@ -1436,18 +1445,18 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param replace * @param numDP number of dynamic partitions * @param holdDDLTime - * @return a list of strings with the dynamic partition paths + * @return partition map details (PartitionSpec and Partition) * @throws HiveException */ - public ArrayList> loadDynamicPartitions(Path loadPath, + public Map, Partition> loadDynamicPartitions(Path loadPath, String tableName, Map partSpec, boolean replace, int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid) throws HiveException { Set validPartitions = new HashSet(); try { - ArrayList> fullPartSpecs = - new ArrayList>(); + Map, Partition> partitionsMap = new + LinkedHashMap, Partition>(); FileSystem fs = loadPath.getFileSystem(conf); FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs); @@ -1481,6 +1490,7 @@ private void constructOneLBLocationMap(FileStatus fSta, + " to at least " + validPartitions.size() + '.'); } + Table tbl = getTable(tableName); // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that Iterator iter = validPartitions.iterator(); @@ -1493,14 +1503,12 @@ private void constructOneLBLocationMap(FileStatus fSta, // generate a full partition specification LinkedHashMap fullPartSpec = new LinkedHashMap(partSpec); Warehouse.makeSpecFromName(fullPartSpec, partPath); - fullPartSpecs.add(fullPartSpec); - - // finally load the partition -- move the file to the final table address - loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true, - listBucketingEnabled, false, isAcid); + Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, + holdDDLTime, true, listBucketingEnabled, false, isAcid); + partitionsMap.put(fullPartSpec, newPartition); LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); } - return fullPartSpecs; + return partitionsMap; } catch (IOException e) { throw new HiveException(e); }