diff --git ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java index 0ab77e84c6..e8f3623575 100644 --- ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java +++ ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -78,6 +78,10 @@ public synchronized Partition add_partition(Partition partition) throws TExcepti return client.add_partition(partition); } + public synchronized int add_partitions(List partitions) throws TException { + return client.add_partitions(partitions); + } + public synchronized void alter_partition(String catName, String dbName, String tblName, Partition newPart, EnvironmentContext environmentContext, String writeIdList) throws TException { client.alter_partition(catName, dbName, tblName, newPart, environmentContext, writeIdList); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 4de038913a..a68814420c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -45,6 +45,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -96,18 +97,17 @@ import org.apache.hadoop.hive.metastore.HiveMetaException; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; -import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.PartitionDropOptions; -import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -1866,17 +1866,102 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException { + + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION); + + // Get the partition object if it already exists + Partition oldPart = getPartition(tbl, partSpec, false); + boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); + + // If config is set, table is not temporary and partition being inserted exists, capture + // the list of files added. For not yet existing partitions (insert overwrite to new partition + // or dynamic partition inserts), the add partition event will capture the list of files added. + List newFiles = Collections.synchronizedList(new ArrayList<>()); + + Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart, + loadFileType, inheritTableSpecs, + inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation, + hasFollowingStatsTask, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles); + + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, + newTPart.getTable(), true); + if (tableSnapshot != null) { + newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId()); + } + + if (oldPart == null) { + addPartitionToMetastore(newTPart, hasFollowingStatsTask, tbl, tableSnapshot); + // For acid table, add the acid_write event with file list at the time of load itself. But + // it should be done after partition is created. + if (isTxnTable && (null != newFiles)) { + addWriteNotificationLog(tbl, partSpec, newFiles, writeId); + } + } else { + try { + setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot); + } catch (TException e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); + } + } + + perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION); + + return newTPart; + } + + /** + * Move all the files from loadPath into Hive. If the partition + * does not exist - one is created - files in loadPath are moved into Hive. But the + * directory itself is not removed. + * + * @param loadPath + * Directory containing files to load into Table + * @param tbl + * name of table to be loaded. + * @param partSpec + * defines which partition needs to be loaded + * @param oldPart + * already existing partition object, can be null + * @param loadFileType + * if REPLACE_ALL - replace files in the table, + * otherwise add files to table (KEEP_EXISTING, OVERWRITE_EXISTING) + * @param inheritTableSpecs if true, on [re]creating the partition, take the + * location/inputformat/outputformat/serde details from table spec + * @param inheritLocation + * if true, partition path is generated from table + * @param isSkewedStoreAsSubdir + * if true, skewed is stored as sub-directory + * @param isSrcLocal + * If the source directory is LOCAL + * @param isAcidIUDoperation + * true if this is an ACID operation Insert/Update/Delete operation + * @param hasFollowingStatsTask + * true if there is a following task which updates the stats, so, this method need not update. + * @param writeId + * write ID allocated for the current load operation + * @param stmtId + * statement ID of the current load statement + * @param isInsertOverwrite + * @param isTxnTable + * + * @return Partition object being loaded with data + * @throws HiveException + */ + private Partition loadPartitionInternal(Path loadPath, Table tbl, Map partSpec, + Partition oldPart, LoadFileType loadFileType, boolean inheritTableSpecs, + boolean inheritLocation, boolean isSkewedStoreAsSubdir, + boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, + Long writeId, int stmtId, boolean isInsertOverwrite, + boolean isTxnTable, List newFiles) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); - boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); try { PerfLogger perfLogger = SessionState.getPerfLogger(); - perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION); - // Get the partition object if it already exists - Partition oldPart = getPartition(tbl, partSpec, false); /** * Move files before creating the partition since down stream processes * check for existence of partition in metadata before accessing the data. @@ -1908,17 +1993,9 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par newPartPath = oldPartPath == null ? genPartPathFromTable(tbl, partSpec, tblDataLocationPath) : oldPartPath; } - List newFiles = Collections.synchronizedList(new ArrayList()); perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES); - // If config is set, table is not temporary and partition being inserted exists, capture - // the list of files added. For not yet existing partitions (insert overwrite to new partition - // or dynamic partition inserts), the add partition event will capture the list of files added. - if (areEventsForDmlNeeded(tbl, oldPart)) { - newFiles = Collections.synchronizedList(new ArrayList()); - } - // Note: the stats for ACID tables do not have any coordination with either Hive ACID logic // like txn commits, time outs, etc.; nor the lower level sync in metastore pertaining // to ACID updates. So the are not themselves ACID. @@ -1975,11 +2052,6 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); - AcidUtils.TableSnapshot tableSnapshot = null; - tableSnapshot = AcidUtils.getTableSnapshot(conf, newTPart.getTable(), true); - if (tableSnapshot != null) { - newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId()); - } // If config is set, table is not temporary and partition being inserted exists, capture // the list of files added. For not yet existing partitions (insert overwrite to new partition @@ -2037,55 +2109,101 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par // The ACID state is probably absent. Warning is logged in the get method. MetaStoreServerUtils.clearQuickStats(newTPart.getParameters()); } - try { - LOG.debug("Adding new partition " + newTPart.getSpec()); - getSynchronizedMSC().add_partition(newTPart.getTPartition()); - } catch (AlreadyExistsException aee) { - // With multiple users concurrently issuing insert statements on the same partition has - // a side effect that some queries may not see a partition at the time when they're issued, - // but will realize the partition is actually there when it is trying to add such partition - // to the metastore and thus get AlreadyExistsException, because some earlier query just created it (race condition). - // For example, imagine such a table is created: - // create table T (name char(50)) partitioned by (ds string); - // and the following two queries are launched at the same time, from different sessions: - // insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today' - // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException - // In that case, we want to retry with alterPartition. - LOG.debug("Caught AlreadyExistsException, trying to alter partition instead"); - setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot); - } catch (Exception e) { - try { - final FileSystem newPathFileSystem = newPartPath.getFileSystem(this.getConf()); - boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); - final FileStatus status = newPathFileSystem.getFileStatus(newPartPath); - Hive.trashFiles(newPathFileSystem, new FileStatus[] {status}, this.getConf(), isAutoPurge); - } catch (IOException io) { - LOG.error("Could not delete partition directory contents after failed partition creation: ", io); - } - throw e; - } - - // For acid table, add the acid_write event with file list at the time of load itself. But - // it should be done after partition is created. - if (isTxnTable && (null != newFiles)) { - addWriteNotificationLog(tbl, partSpec, newFiles, writeId); - } - } else { - setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot); } - perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_PARTITION); return newTPart; - } catch (IOException e) { - LOG.error(StringUtils.stringifyException(e)); - throw new HiveException(e); - } catch (MetaException e) { + } catch (IOException | MetaException | InvalidOperationException e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); - } catch (InvalidOperationException e) { + } + } + + private void addPartitionToMetastore(Partition newTPart, boolean hasFollowingStatsTask, + Table tbl, TableSnapshot tableSnapshot) throws HiveException{ + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding new partition " + newTPart.getSpec()); + } + getSynchronizedMSC().add_partition(newTPart.getTPartition()); + } catch (AlreadyExistsException aee) { + // With multiple users concurrently issuing insert statements on the same partition has + // a side effect that some queries may not see a partition at the time when they're issued, + // but will realize the partition is actually there when it is trying to add such partition + // to the metastore and thus get AlreadyExistsException, because some earlier query just + // created it (race condition). + // For example, imagine such a table is created: + // create table T (name char(50)) partitioned by (ds string); + // and the following two queries are launched at the same time, from different sessions: + // insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today' + // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException + // In that case, we want to retry with alterPartition. + LOG.debug("Caught AlreadyExistsException, trying to alter partition instead"); + try { + setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot); + } catch (TException e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); + } + } catch (Exception e) { + try { + final FileSystem newPathFileSystem = newTPart.getPartitionPath().getFileSystem(this.getConf()); + boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + final FileStatus status = newPathFileSystem.getFileStatus(newTPart.getPartitionPath()); + Hive.trashFiles(newPathFileSystem, new FileStatus[]{status}, this.getConf(), isAutoPurge); + } catch (IOException io) { + LOG.error("Could not delete partition directory contents after failed partition creation: ", io); + } LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); - } catch (TException e) { + } + } + + private void addPartitionsToMetastore(List partitions, + boolean hasFollowingStatsTask, Table tbl, + List tableSnapshots) + throws HiveException { + try { + if (LOG.isDebugEnabled()) { + StringBuffer debugMsg = new StringBuffer("Adding new partitions "); + partitions.forEach(partition -> debugMsg.append(partition.getSpec() + " ")); + LOG.debug(debugMsg.toString()); + } + getSynchronizedMSC().add_partitions(partitions.parallelStream().map(Partition::getTPartition) + .collect(Collectors.toList())); + } catch(AlreadyExistsException aee) { + // With multiple users concurrently issuing insert statements on the same partition has + // a side effect that some queries may not see a partition at the time when they're issued, + // but will realize the partition is actually there when it is trying to add such partition + // to the metastore and thus get AlreadyExistsException, because some earlier query just + // created it (race condition). + // For example, imagine such a table is created: + // create table T (name char(50)) partitioned by (ds string); + // and the following two queries are launched at the same time, from different sessions: + // insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today' + // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException + // In that case, we want to retry with alterPartition. + LOG.debug("Caught AlreadyExistsException, trying to alter partition instead"); + assert partitions.size() == tableSnapshots.size(); + for (int i = 0; i < partitions.size(); i++) { + try { + setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, partitions.get(i), + tableSnapshots.get(i)); + } catch (TException e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); + } + } + } catch (Exception e) { + try { + for (Partition partition : partitions) { + final FileSystem newPathFileSystem = partition.getPartitionPath().getFileSystem(this.getConf()); + boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + final FileStatus status = newPathFileSystem.getFileStatus(partition.getPartitionPath()); + Hive.trashFiles(newPathFileSystem, new FileStatus[]{status}, this.getConf(), isAutoPurge); + } + } catch (IOException io) { + LOG.error("Could not delete partition directory contents after failed partition creation: ", io); + } LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); } @@ -2375,16 +2493,6 @@ private void constructOneLBLocationMap(FileStatus fSta, PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS); - final Map, Partition> partitionsMap = - Collections.synchronizedMap(new LinkedHashMap, Partition>()); - - int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1); - final ExecutorService pool = Executors.newFixedThreadPool(poolSize, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("load-dynamic-partitions-%d") - .build()); - // Get all valid partition paths and existing partitions for them (if any) final Table tbl = getTable(tableName); final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId, @@ -2392,113 +2500,193 @@ private void constructOneLBLocationMap(FileStatus fSta, final int partsToLoad = validPartitions.size(); final AtomicInteger partitionsLoaded = new AtomicInteger(0); - final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent(); final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null; + final SessionState parentSession = SessionState.get(); + List> tasks = Lists.newLinkedList(); - final List> futures = Lists.newLinkedList(); - // for each dynamically created DP directory, construct a full partition spec - // and load the partition based on that - final Map rawStoreMap = new ConcurrentHashMap<>(); - try { - for(final Path partPath : validPartitions) { - // generate a full partition specification - final LinkedHashMap fullPartSpec = Maps.newLinkedHashMap(partSpec); - if (!Warehouse.makeSpecFromName( - fullPartSpec, partPath, new HashSet(partSpec.keySet()))) { - Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath); - continue; - } - futures.add(pool.submit(new Callable() { - @Override - public Void call() throws Exception { - try { - // move file would require session details (needCopy() invokes SessionState.get) - SessionState.setCurrentSessionState(parentSession); - LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); - - // load the partition - Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, loadFileType, - true, false, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId, stmtId, - isInsertOverwrite); - partitionsMap.put(fullPartSpec, newPartition); - - if (inPlaceEligible) { - synchronized (ps) { - InPlaceUpdate.rePositionCursor(ps); - partitionsLoaded.incrementAndGet(); - InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" - + partsToLoad + " partitions."); - } - } - return null; - } catch (Exception t) { - LOG.error("Exception when loading partition with parameters " - + " partPath=" + partPath + ", " + final class PartitionDetails { + Map fullSpec; + Partition partition; + List newFiles; + boolean hasOldPartition = false; + AcidUtils.TableSnapshot tableSnapshot; + } + + Map partitionDetailsMap = + Collections.synchronizedMap(new LinkedHashMap<>()); + + // calculate full path spec for each valid partition path + validPartitions.forEach(partPath -> { + Map fullPartSpec = Maps.newLinkedHashMap(partSpec); + if (!Warehouse.makeSpecFromName(fullPartSpec, partPath, new HashSet<>(partSpec.keySet()))) { + Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath); + } else { + PartitionDetails details = new PartitionDetails(); + details.fullSpec = fullPartSpec; + partitionDetailsMap.put(partPath, details); + } + }); + + // fetch all the partitions matching the part spec using the partition iterable + // this way the maximum batch size configuration parameter is considered + PartitionIterable partitionIterable = new PartitionIterable(Hive.get(), tbl, partSpec, + conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 300)); + Iterator iterator = partitionIterable.iterator(); + + // Match valid partition path to partitions + while (iterator.hasNext()) { + Partition partition = iterator.next(); + partitionDetailsMap.entrySet().parallelStream() + .filter(entry -> entry.getValue().fullSpec.equals(partition.getSpec())) + .findAny().ifPresent(entry -> { + entry.getValue().partition = partition; + entry.getValue().hasOldPartition = true; + }); + } + + boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); + + for (Entry entry : partitionDetailsMap.entrySet()) { + tasks.add(() -> { + PartitionDetails partitionDetails = entry.getValue(); + Map fullPartSpec = partitionDetails.fullSpec; + try { + + SessionState.setCurrentSessionState(parentSession); + LOG.info("New loading path = " + entry.getKey() + " withPartSpec " + fullPartSpec); + + List newFiles = Lists.newArrayList(); + Partition oldPartition = partitionDetails.partition; + // load the partition + Partition partition = loadPartitionInternal(entry.getKey(), tbl, + fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid, + hasFollowingStatsTask, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles); + // if the partition already existed before the loading, no need to add it again to the + // metastore + + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, + partition.getTable(), true); + if (tableSnapshot != null) { + partition.getTPartition().setWriteId(tableSnapshot.getWriteId()); + } + partitionDetails.tableSnapshot = tableSnapshot; + if (oldPartition == null) { + partitionDetails.newFiles = newFiles; + partitionDetails.partition = partition; + } + + if (inPlaceEligible) { + synchronized (ps) { + InPlaceUpdate.rePositionCursor(ps); + partitionsLoaded.incrementAndGet(); + InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" + + partsToLoad + " partitions."); + } + } + + return partition; + } catch (Exception e) { + LOG.error("Exception when loading partition with parameters " + + " partPath=" + entry.getKey() + ", " + " table=" + tbl.getTableName() + ", " + " partSpec=" + fullPartSpec + ", " + " loadFileType=" + loadFileType.toString() + ", " + " listBucketingLevel=" + numLB + ", " + " isAcid=" + isAcid + ", " - + " hasFollowingStatsTask=" + hasFollowingStatsTask, t); - throw t; - } finally { - // Add embedded rawstore, so we can cleanup later to avoid memory leak - if (getMSC().isLocalMetaStore()) { - Long threadId = Thread.currentThread().getId(); - RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore(); - if (threadLocalRawStore == null) { - // If the thread local rawStore is already cleaned by current thread, then remove from rawStoreMap. - rawStoreMap.remove(threadId); - } else { - // If same thread is re-used, then need to cleanup the latest thread local instance of rawStore. - // So, overwrite the old one if exists in rawStoreMap. - rawStoreMap.put(threadId, threadLocalRawStore); - } - } - } - } - })); - } - pool.shutdown(); - LOG.debug("Number of partitions to be added is " + futures.size()); + + " hasFollowingStatsTask=" + hasFollowingStatsTask, e); + throw e; + } + }); + } - for (Future future : futures) { - future.get(); - } + int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1); + ExecutorService executor = Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("load-dynamic-partitionsToAdd-%d").build()); + + List> futures = Lists.newLinkedList(); + Map, Partition> result = Maps.newLinkedHashMap(); + try { + futures = executor.invokeAll(tasks); + LOG.debug("Number of partitionsToAdd to be added is " + futures.size()); + for (Future future : futures) { + Partition partition = future.get(); + result.put(partition.getSpec(), partition); + } + // add new partitions in batch + + addPartitionsToMetastore( + partitionDetailsMap.entrySet() + .parallelStream() + .filter(entry -> !entry.getValue().hasOldPartition) + .map(entry -> entry.getValue().partition) + .collect(Collectors.toList()), + hasFollowingStatsTask, + tbl, + partitionDetailsMap.entrySet() + .parallelStream() + .filter(entry -> !entry.getValue().hasOldPartition) + .map(entry -> entry.getValue().tableSnapshot) + .collect(Collectors.toList())); + // For acid table, add the acid_write event with file list at the time of load itself. But + // it should be done after partition is created. + + for (Entry entry : partitionDetailsMap.entrySet()) { + PartitionDetails partitionDetails = entry.getValue(); + if (isTxnTable && partitionDetails.newFiles != null) { + addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles, writeId); + } + if (partitionDetails.hasOldPartition) { + setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, partitionDetails.partition, + partitionDetails.tableSnapshot); + } + } } catch (InterruptedException | ExecutionException e) { - LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); - //cancel other futures - for (Future future : futures) { - future.cancel(true); - } - throw new HiveException("Exception when loading " - + partsToLoad + " in table " + tbl.getTableName() - + " with loadPath=" + loadPath, e); + throw new HiveException("Exception when loading " + validPartitions.size() + + " in table " + tbl.getTableName() + + " with loadPath=" + loadPath); + } catch (TException e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); + } catch (Exception e) { + + StringBuffer logMsg = new StringBuffer(); + logMsg.append("Exception when loading partitionsToAdd with parameters "); + logMsg.append("partPaths="); + validPartitions.forEach(path -> logMsg.append(path + ", ")); + logMsg.append("table=" + tbl.getTableName() + ", "). + append("partSpec=" + partSpec + ", "). + append("loadFileType=" + loadFileType.toString() + ", "). + append("listBucketingLevel=" + numLB + ", "). + append("isAcid=" + isAcid + ", "). + append("hasFollowingStatsTask=" + hasFollowingStatsTask); + + LOG.error(logMsg.toString(), e); + throw e; } finally { - rawStoreMap.forEach((k, rs) -> rs.shutdown()); + LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); + executor.shutdownNow(); } try { if (isAcid) { - List partNames = new ArrayList<>(partitionsMap.size()); - for (Partition p : partitionsMap.values()) { - partNames.add(p.getName()); - } + List partNames = + result.values().parallelStream().map(Partition::getName).collect(Collectors.toList()); getMSC().addDynamicPartitions(parentSession.getTxnMgr().getCurrentTxnId(), writeId, tbl.getDbName(), tbl.getTableName(), partNames, AcidUtils.toDataOperationType(operation)); } - LOG.info("Loaded " + partitionsMap.size() + " partitions"); + LOG.info("Loaded " + result.size() + "partitionsToAdd"); perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS); - return partitionsMap; + return result; } catch (TException te) { + LOG.error(StringUtils.stringifyException(te)); throw new HiveException("Exception updating metastore for acid table " - + tableName + " with partitions " + partitionsMap.values(), te); + + tableName + " with partitions " + result.values(), te); } } @@ -2590,8 +2778,8 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType try { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, - loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles, - tbl.getNumBuckets() > 0, isFullAcidTable, isManaged); + loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles, + tbl.getNumBuckets() > 0, isFullAcidTable, isManaged); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); } @@ -3184,6 +3372,11 @@ public boolean dropPartition(String dbName, String tableName, List partV List names = null; try { names = getMSC().listPartitionNames(dbName, tblName, max); + } catch (NoSuchObjectException nsoe) { + // this means no partition exists for the given dbName and tblName + // key value pairs - thrift cannot handle null return values, hence + // listPartitionNames() throws NoSuchObjectException to indicate null partitions + return Lists.newArrayList(); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); @@ -3200,6 +3393,11 @@ public boolean dropPartition(String dbName, String tableName, List partV try { names = getMSC().listPartitionNames(dbName, tblName, pvals, max); + } catch (NoSuchObjectException nsoe) { + // this means no partition exists for the given partition spec + // key value pairs - thrift cannot handle null return values, hence + // listPartitionNames() throws NoSuchObjectException to indicate null partitions + return Lists.newArrayList(); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java index dd23d7db3e..322b580885 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java @@ -901,14 +901,23 @@ private void addPartition(Partition p) throws AlreadyExistsException, MetaExcept assertPartitioned(); pTree.addPartition(p); } + private Partition getPartition(String partName) throws MetaException { assertPartitioned(); return pTree.getPartition(partName); } + + private int addPartitions(List partitions) throws AlreadyExistsException, + MetaException { + assertPartitioned(); + return pTree.addPartitions(partitions); + } + private List getPartitions(List partialPartVals) throws MetaException { assertPartitioned(); return pTree.getPartitions(partialPartVals); } + private void assertPartitioned() throws MetaException { if(tTable.getPartitionKeysSize() <= 0) { throw new MetaException(Warehouse.getQualifiedName(tTable) + " is not partitioned"); @@ -939,6 +948,17 @@ private void addPartition(Partition p) throws AlreadyExistsException, MetaExcept private Partition getPartition(String partName) { return parts.get(partName); } + + private int addPartitions(List partitions) + throws AlreadyExistsException, MetaException { + int partitionsAdded = 0; + for (Partition partition : partitions) { + addPartition(partition); + partitionsAdded++; + } + return partitionsAdded; + } + /** * Provided values for the 1st N partition columns, will return all matching PartitionS * The list is a partial list of partition values in the same order as partition columns. @@ -960,9 +980,7 @@ private Partition getPartition(String partName) { } } /** - * Loading Dynamic Partitons calls this. - * Hive.loadPartition() calls this which in turn can be called from Hive.loadDynamicPartitions() - * among others + * Hive.loadPartition() calls this. * @param partition * The partition to add * @return the partition added @@ -985,6 +1003,34 @@ private Partition getPartition(String partName) { tt.addPartition(deepCopy(partition)); return partition; } + + /** + * Loading Dynamic Partitions calls this. The partitions which are loaded, must belong to the + * same table. + * @param partitions the new partitions to be added, must be not null + * @return number of partitions that were added + * @throws TException + */ + @Override + public int add_partitions(List partitions) throws TException { + if (partitions.isEmpty()) { + return 0; + } + Partition partition = partitions.get(0); + org.apache.hadoop.hive.metastore.api.Table table = + getTempTable(partition.getDbName(), partition.getTableName()); + if (table == null) { + // not a temp table - Try underlying client + return super.add_partitions(partitions); + } + TempTable tt = getTempTable(table); + if (tt == null) { + throw new IllegalStateException("TempTable not found for" + + table.getTableName()); + } + return tt.addPartitions(deepCopyPartitions(partitions)); + } + /** * @param partialPvals partition values, can be partial. This really means that missing values * are represented by empty str. diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index aba63f050b..90b5764a2e 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2468,7 +2468,7 @@ protected PrincipalPrivilegeSet deepCopy(PrincipalPrivilegeSet pps) { return copy; } - private List deepCopyPartitions(List partitions) { + protected List deepCopyPartitions(List partitions) { return deepCopyPartitions(partitions, null); }