diff --git ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java index 0ab77e84c6..e8f3623575 100644 --- ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java +++ ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -78,6 +78,10 @@ public synchronized Partition add_partition(Partition partition) throws TExcepti return client.add_partition(partition); } + public synchronized int add_partitions(List partitions) throws TException { + return client.add_partitions(partitions); + } + public synchronized void alter_partition(String catName, String dbName, String tblName, Partition newPart, EnvironmentContext environmentContext, String writeIdList) throws TException { client.alter_partition(catName, dbName, tblName, newPart, environmentContext, writeIdList); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 4de038913a..1e5b305a50 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -43,8 +43,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -96,18 +98,17 @@ import org.apache.hadoop.hive.metastore.HiveMetaException; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; -import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.PartitionDropOptions; -import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -1866,17 +1867,46 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException { + // Get the partition object if it already exists + Partition oldPart = getPartition(tbl, partSpec, false); + boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); + + // If config is set, table is not temporary and partition being inserted exists, capture + // the list of files added. For not yet existing partitions (insert overwrite to new partition + // or dynamic partition inserts), the add partition event will capture the list of files added. + List newFiles = Collections.synchronizedList(new ArrayList<>()); + + Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart, + loadFileType, inheritTableSpecs, + inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation, + hasFollowingStatsTask, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles); + + if (oldPart == null) { + addPartitionToMetastore(newTPart, hasFollowingStatsTask, tbl); + // For acid table, add the acid_write event with file list at the time of load itself. But + // it should be done after partition is created. + if (isTxnTable && (null != newFiles)) { + addWriteNotificationLog(tbl, partSpec, newFiles, writeId); + } + } + + return newTPart; + } + + private Partition loadPartitionInternal(Path loadPath, Table tbl, Map partSpec, + Partition oldPart, LoadFileType loadFileType, boolean inheritTableSpecs, + boolean inheritLocation, boolean isSkewedStoreAsSubdir, + boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, + Long writeId, int stmtId, boolean isInsertOverwrite, + boolean isTxnTable, List newFiles) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); - boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); try { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION); - // Get the partition object if it already exists - Partition oldPart = getPartition(tbl, partSpec, false); /** * Move files before creating the partition since down stream processes * check for existence of partition in metadata before accessing the data. @@ -1885,7 +1915,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par */ Path oldPartPath = (oldPart != null) ? oldPart.getDataLocation() : null; - Path newPartPath = null; + Path newPartPath; if (inheritLocation) { newPartPath = genPartPathFromTable(tbl, partSpec, tblDataLocationPath); @@ -1906,19 +1936,11 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } } else { newPartPath = oldPartPath == null - ? genPartPathFromTable(tbl, partSpec, tblDataLocationPath) : oldPartPath; + ? genPartPathFromTable(tbl, partSpec, tblDataLocationPath) : oldPartPath; } - List newFiles = Collections.synchronizedList(new ArrayList()); perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES); - // If config is set, table is not temporary and partition being inserted exists, capture - // the list of files added. For not yet existing partitions (insert overwrite to new partition - // or dynamic partition inserts), the add partition event will capture the list of files added. - if (areEventsForDmlNeeded(tbl, oldPart)) { - newFiles = Collections.synchronizedList(new ArrayList()); - } - // Note: the stats for ACID tables do not have any coordination with either Hive ACID logic // like txn commits, time outs, etc.; nor the lower level sync in metastore pertaining // to ACID updates. So the are not themselves ACID. @@ -1936,7 +1958,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("maybe deleting stuff from " + oldPartPath - + " (new " + newPartPath + ") for replace"); + + " (new " + newPartPath + ") for replace"); } } else { // Either a non-MM query, or a load into MM table from an external source. @@ -1945,7 +1967,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par assert !isAcidIUDoperation; // We will load into MM directory, and hide previous directories if needed. destPath = new Path(destPath, isInsertOverwrite - ? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + ? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId)); } if (!isAcidIUDoperation && isFullAcidTable) { destPath = fixFullAcidPathForLoadData(loadFileType, destPath, writeId, stmtId, tbl); @@ -1963,20 +1985,20 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par boolean needRecycle = !tbl.isTemporary() && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName())); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, - isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged); + isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged); } else { FileSystem fs = destPath.getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, - (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles, - tbl.getNumBuckets() > 0, isFullAcidTable, isManaged); + (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles, + tbl.getNumBuckets() > 0, isFullAcidTable, isManaged); } } perfLogger.PerfLogEnd("MoveTask", PerfLogger.FILE_MOVES); Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); - AcidUtils.TableSnapshot tableSnapshot = null; - tableSnapshot = AcidUtils.getTableSnapshot(conf, newTPart.getTable(), true); + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, + newTPart.getTable(), true); if (tableSnapshot != null) { newTPart.getTPartition().setWriteId(tableSnapshot.getWriteId()); } @@ -2008,7 +2030,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo(); /* Construct list bucketing location mappings from sub-directory name. */ Map, String> skewedColValueLocationMaps = constructListBucketingLocationMap( - newPartPath, skewedInfo); + newPartPath, skewedInfo); /* Add list bucketing location mappings. */ skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps); newCreatedTpart.getSd().setSkewedInfo(skewedInfo); @@ -2020,16 +2042,16 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par newTPart.getTPartition().setParameters(new HashMap()); if (this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { StatsSetupConst.setStatsStateForCreateTable(newTPart.getParameters(), - MetaStoreUtils.getColumnNames(tbl.getCols()), StatsSetupConst.TRUE); + MetaStoreUtils.getColumnNames(tbl.getCols()), StatsSetupConst.TRUE); } // Note: we are creating a brand new the partition, so this is going to be valid for ACID. List filesForStats = null; if (isTxnTable) { filesForStats = AcidUtils.getAcidFilesForStats( - newTPart.getTable(), newPartPath, conf, null); + newTPart.getTable(), newPartPath, conf, null); } else { filesForStats = HiveStatsUtils.getFileStatusRecurse( - newPartPath, -1, newPartPath.getFileSystem(conf)); + newPartPath, -1, newPartPath.getFileSystem(conf)); } if (filesForStats != null) { MetaStoreServerUtils.populateQuickStats(filesForStats, newTPart.getParameters()); @@ -2037,39 +2059,6 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par // The ACID state is probably absent. Warning is logged in the get method. MetaStoreServerUtils.clearQuickStats(newTPart.getParameters()); } - try { - LOG.debug("Adding new partition " + newTPart.getSpec()); - getSynchronizedMSC().add_partition(newTPart.getTPartition()); - } catch (AlreadyExistsException aee) { - // With multiple users concurrently issuing insert statements on the same partition has - // a side effect that some queries may not see a partition at the time when they're issued, - // but will realize the partition is actually there when it is trying to add such partition - // to the metastore and thus get AlreadyExistsException, because some earlier query just created it (race condition). - // For example, imagine such a table is created: - // create table T (name char(50)) partitioned by (ds string); - // and the following two queries are launched at the same time, from different sessions: - // insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today' - // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException - // In that case, we want to retry with alterPartition. - LOG.debug("Caught AlreadyExistsException, trying to alter partition instead"); - setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot); - } catch (Exception e) { - try { - final FileSystem newPathFileSystem = newPartPath.getFileSystem(this.getConf()); - boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); - final FileStatus status = newPathFileSystem.getFileStatus(newPartPath); - Hive.trashFiles(newPathFileSystem, new FileStatus[] {status}, this.getConf(), isAutoPurge); - } catch (IOException io) { - LOG.error("Could not delete partition directory contents after failed partition creation: ", io); - } - throw e; - } - - // For acid table, add the acid_write event with file list at the time of load itself. But - // it should be done after partition is created. - if (isTxnTable && (null != newFiles)) { - addWriteNotificationLog(tbl, partSpec, newFiles, writeId); - } } else { setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot); } @@ -2089,6 +2078,93 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); } + + } + + private void addPartitionToMetastore(Partition newTPart, boolean hasFollowingStatsTask, + Table tbl) throws HiveException{ + try { + LOG.debug("Adding new partition " + newTPart.getSpec()); + getSynchronizedMSC().add_partition(newTPart.getTPartition()); + } catch (AlreadyExistsException aee) { + // With multiple users concurrently issuing insert statements on the same partition has + // a side effect that some queries may not see a partition at the time when they're issued, + // but will realize the partition is actually there when it is trying to add such partition + // to the metastore and thus get AlreadyExistsException, because some earlier query just + // created it (race condition). + // For example, imagine such a table is created: + // create table T (name char(50)) partitioned by (ds string); + // and the following two queries are launched at the same time, from different sessions: + // insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today' + // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException + // In that case, we want to retry with alterPartition. + LOG.debug("Caught AlreadyExistsException, trying to alter partition instead"); + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, + newTPart.getTable(), true); + try { + setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart, tableSnapshot); + } catch (TException e) { + + } + } catch (Exception e) { + try { + final FileSystem newPathFileSystem = newTPart.getPartitionPath().getFileSystem(this.getConf()); + boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + final FileStatus status = newPathFileSystem.getFileStatus(newTPart.getPartitionPath()); + Hive.trashFiles(newPathFileSystem, new FileStatus[]{status}, this.getConf(), isAutoPurge); + } catch (IOException io) { + LOG.error("Could not delete partition directory contents after failed partition creation: ", io); + } + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); + } + } + + private void addPartitionsToMetastore(List partitions, + boolean hasFollowingStatsTask, Table tbl) throws HiveException { + try { + StringBuffer debugMsg = new StringBuffer("Adding new partitions "); + partitions.forEach(partition -> debugMsg.append(partition.getSpec() + " ")); + LOG.debug(debugMsg.toString()); + getSynchronizedMSC().add_partitions(partitions.parallelStream().map(Partition::getTPartition) + .collect(Collectors.toList())); + } catch(AlreadyExistsException aee) { + // With multiple users concurrently issuing insert statements on the same partition has + // a side effect that some queries may not see a partition at the time when they're issued, + // but will realize the partition is actually there when it is trying to add such partition + // to the metastore and thus get AlreadyExistsException, because some earlier query just + // created it (race condition). + // For example, imagine such a table is created: + // create table T (name char(50)) partitioned by (ds string); + // and the following two queries are launched at the same time, from different sessions: + // insert into table T partition (ds) values ('Bob', 'today'); -- creates the partition 'today' + // insert into table T partition (ds) values ('Joe', 'today'); -- will fail with AlreadyExistsException + // In that case, we want to retry with alterPartition. + LOG.debug("Caught AlreadyExistsException, trying to alter partition instead"); + for (Partition partition : partitions) { + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, partition.getTable(), + true); + try { + setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, partition, tableSnapshot); + } catch (TException e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); + } + } + } catch (Exception e) { + try { + for (Partition partition : partitions) { + final FileSystem newPathFileSystem = partition.getPartitionPath().getFileSystem(this.getConf()); + boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + final FileStatus status = newPathFileSystem.getFileStatus(partition.getPartitionPath()); + Hive.trashFiles(newPathFileSystem, new FileStatus[]{status}, this.getConf(), isAutoPurge); + } + } catch (IOException io) { + LOG.error("Could not delete partition directory contents after failed partition creation: ", io); + } + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); + } } @@ -2375,130 +2451,174 @@ private void constructOneLBLocationMap(FileStatus fSta, PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS); - final Map, Partition> partitionsMap = - Collections.synchronizedMap(new LinkedHashMap, Partition>()); - - int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1); - final ExecutorService pool = Executors.newFixedThreadPool(poolSize, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("load-dynamic-partitions-%d") - .build()); - - // Get all valid partition paths and existing partitions for them (if any) + // Get all valid partition paths and existing partitionsToAdd for them (if any) final Table tbl = getTable(tableName); final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId, - AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite); + AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite); - final int partsToLoad = validPartitions.size(); - final AtomicInteger partitionsLoaded = new AtomicInteger(0); - - final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 - && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent(); - final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null; final SessionState parentSession = SessionState.get(); + List> tasks = Lists.newLinkedList(); - final List> futures = Lists.newLinkedList(); - // for each dynamically created DP directory, construct a full partition spec - // and load the partition based on that - final Map rawStoreMap = new ConcurrentHashMap<>(); - try { - for(final Path partPath : validPartitions) { - // generate a full partition specification - final LinkedHashMap fullPartSpec = Maps.newLinkedHashMap(partSpec); - if (!Warehouse.makeSpecFromName( - fullPartSpec, partPath, new HashSet(partSpec.keySet()))) { - Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath); - continue; + final class PartitionDetails { + Map fullSpec; + Partition oldPartition; + List newFiles; + } + + Map partitionDetailsMap = + Collections.synchronizedMap(new LinkedHashMap<>()); + + // calculate full path spec for each valid partition path + validPartitions.forEach(partPath -> { + Map fullPartSpec = Maps.newLinkedHashMap(partSpec); + if (!Warehouse.makeSpecFromName(fullPartSpec, partPath, new HashSet<>(partSpec.keySet()))) { + Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath); + } else { + PartitionDetails details = new PartitionDetails(); + details.fullSpec = fullPartSpec; + partitionDetailsMap.put(partPath, details); + } + }); + + // fetch all the partitions matching the part spec using the partition iterable + // this way the maximum batch size configuration parameter is considered + PartitionIterable partitionIterable = new PartitionIterable(Hive.get(), tbl, partSpec, + conf.getInt(MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX.getVarname(), 300)); + Iterator iterator = partitionIterable.iterator(); + + // Match valid partition path to partitions + while (iterator.hasNext()) { + // comparator for checking partition specifications + Comparator> specComparator = (map1, map2) -> { + for (Entry entry : map1.entrySet()) { + if (!map2.containsKey(entry.getKey()) || !entry.getValue().equals(map2.get(entry.getKey()))) { + return -1; + } } - futures.add(pool.submit(new Callable() { - @Override - public Void call() throws Exception { - try { - // move file would require session details (needCopy() invokes SessionState.get) - SessionState.setCurrentSessionState(parentSession); - LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); - - // load the partition - Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, loadFileType, - true, false, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId, stmtId, - isInsertOverwrite); - partitionsMap.put(fullPartSpec, newPartition); - - if (inPlaceEligible) { - synchronized (ps) { - InPlaceUpdate.rePositionCursor(ps); - partitionsLoaded.incrementAndGet(); - InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" - + partsToLoad + " partitions."); - } - } - return null; - } catch (Exception t) { - LOG.error("Exception when loading partition with parameters " - + " partPath=" + partPath + ", " + + for (Entry entry : map2.entrySet()) { + if (!map1.containsKey(entry.getKey()) || !entry.getValue().equals(map1.get(entry.getKey()))) { + return -1; + } + } + + return 0; + }; + Partition partition = iterator.next(); + partitionDetailsMap.entrySet().parallelStream() + .filter(entry -> specComparator.compare(entry.getValue().fullSpec, + partition.getSpec()) == 0) + .findAny().ifPresent(entry -> entry.getValue().oldPartition = partition); + } + + boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); + + List partitionsToAdd = Lists.newArrayList(); + for (Entry entry : partitionDetailsMap.entrySet()) { + tasks.add(() -> { + Map fullPartSpec = entry.getValue().fullSpec; + try { + SessionState.setCurrentSessionState(parentSession); + LOG.info("New loading path = " + entry.getKey() + " withPartSpec " + fullPartSpec); + + List newFiles = Lists.newArrayList(); + Partition oldPartition = entry.getValue().oldPartition; + // load the partition + Partition partition = loadPartitionInternal(entry.getKey(), tbl, + fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid, + hasFollowingStatsTask, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles); + // if the partition already existed before the loading, no need to add it again to the + // metastore + if (oldPartition == null) { + entry.getValue().newFiles = newFiles; + partitionsToAdd.add(partition); + } + return partition; + } catch (Exception e) { + LOG.error("Exception when loading partition with parameters " + + " partPath=" + entry.getKey() + ", " + " table=" + tbl.getTableName() + ", " + " partSpec=" + fullPartSpec + ", " + " loadFileType=" + loadFileType.toString() + ", " + " listBucketingLevel=" + numLB + ", " + " isAcid=" + isAcid + ", " - + " hasFollowingStatsTask=" + hasFollowingStatsTask, t); - throw t; - } finally { - // Add embedded rawstore, so we can cleanup later to avoid memory leak - if (getMSC().isLocalMetaStore()) { - Long threadId = Thread.currentThread().getId(); - RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore(); - if (threadLocalRawStore == null) { - // If the thread local rawStore is already cleaned by current thread, then remove from rawStoreMap. - rawStoreMap.remove(threadId); - } else { - // If same thread is re-used, then need to cleanup the latest thread local instance of rawStore. - // So, overwrite the old one if exists in rawStoreMap. - rawStoreMap.put(threadId, threadLocalRawStore); - } - } - } - } - })); - } - pool.shutdown(); - LOG.debug("Number of partitions to be added is " + futures.size()); + + " hasFollowingStatsTask=" + hasFollowingStatsTask, e); + throw e; + } + }); + } - for (Future future : futures) { - future.get(); + int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1); + ExecutorService executor = Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("load-dynamic-partitionsToAdd-%d").build()); + + List> futures = Lists.newLinkedList(); + Map, Partition> result = Maps.newLinkedHashMap(); + try { + futures = executor.invokeAll(tasks); + LOG.debug("Number of partitionsToAdd to be added is " + futures.size()); + for (Future future : futures) { + Partition partition = future.get(); + result.put(partition.getSpec(), partition); + } + // add new partitions in batch + addPartitionsToMetastore(partitionsToAdd, hasFollowingStatsTask, tbl); + // For acid table, add the acid_write event with file list at the time of load itself. But + // it should be done after partition is created. + if (isTxnTable) { + for (Entry entry : partitionDetailsMap.entrySet()) { + PartitionDetails partitionDetails = entry.getValue(); + if (partitionDetails.newFiles != null) { + addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles, writeId); + } + } } } catch (InterruptedException | ExecutionException e) { - LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); - //cancel other futures - for (Future future : futures) { - future.cancel(true); - } - throw new HiveException("Exception when loading " - + partsToLoad + " in table " + tbl.getTableName() - + " with loadPath=" + loadPath, e); + throw new HiveException("Exception when loading " + validPartitions.size() + + " in table " + tbl.getTableName() + + " with loadPath=" + loadPath); + } catch (Exception e) { + + StringBuffer logMsg = new StringBuffer(); + logMsg.append("Exception when loading partitionsToAdd with parameters "); + logMsg.append("partPaths="); + validPartitions.forEach(path -> logMsg.append(path + ", ")); + logMsg.append("table=" + tbl.getTableName() + ", "). + append("partSpec=" + partSpec + ", "). + append("loadFileType=" + loadFileType.toString() + ", "). + append("listBucketingLevel=" + numLB + ", "). + append("isAcid=" + isAcid + ", "). + append("hasFollowingStatsTask=" + hasFollowingStatsTask); + + LOG.error(logMsg.toString(), e); + throw e; } finally { - rawStoreMap.forEach((k, rs) -> rs.shutdown()); + LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); + executor.shutdownNow(); + } + + if (conf.getLong("fs.trash.interval", 0) <= 0 + && InPlaceUpdate.canRenderInPlace(conf) + && !SessionState.getConsole().getIsSilent()) { + PrintStream ps = SessionState.getConsole().getInfoStream(); + InPlaceUpdate.rePositionCursor(ps); + InPlaceUpdate.reprintLine(ps, + "Loaded : " + futures.size() + "/" + validPartitions.size() + " partitionsToAdd."); + } try { if (isAcid) { - List partNames = new ArrayList<>(partitionsMap.size()); - for (Partition p : partitionsMap.values()) { - partNames.add(p.getName()); - } + List partNames = + result.values().parallelStream().map(Partition::getName).collect(Collectors.toList()); getMSC().addDynamicPartitions(parentSession.getTxnMgr().getCurrentTxnId(), writeId, - tbl.getDbName(), tbl.getTableName(), partNames, - AcidUtils.toDataOperationType(operation)); + tbl.getDbName(), tbl.getTableName(), partNames, AcidUtils.toDataOperationType(operation)); } - LOG.info("Loaded " + partitionsMap.size() + " partitions"); + LOG.info("Loaded " + result.size() + "partitionsToAdd"); - perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS); - - return partitionsMap; + return result; } catch (TException te) { - throw new HiveException("Exception updating metastore for acid table " - + tableName + " with partitions " + partitionsMap.values(), te); + throw new HiveException("Exception updating metastore for acid table"); } } @@ -3181,9 +3301,14 @@ public boolean dropPartition(String dbName, String tableName, List partV public List getPartitionNames(String dbName, String tblName, short max) throws HiveException { - List names = null; + List names; try { names = getMSC().listPartitionNames(dbName, tblName, max); + } catch (NoSuchObjectException nsoe) { + // this means no partition exists for the given dbName and tblName + // key value pairs - thrift cannot handle null return values, hence + // listPartitionNames() throws NoSuchObjectException to indicate null partitions + return Lists.newArrayList(); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); @@ -3193,13 +3318,18 @@ public boolean dropPartition(String dbName, String tableName, List partV public List getPartitionNames(String dbName, String tblName, Map partSpec, short max) throws HiveException { - List names = null; + List names; Table t = getTable(dbName, tblName); List pvals = MetaStoreUtils.getPvals(t.getPartCols(), partSpec); try { names = getMSC().listPartitionNames(dbName, tblName, pvals, max); + } catch (NoSuchObjectException nsoe) { + // this means no partition exists for the given partition spec + // key value pairs - thrift cannot handle null return values, hence + // listPartitionNames() throws NoSuchObjectException to indicate null partitions + return Lists.newArrayList(); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java index a2b57fb646..dc23e28c2c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java @@ -901,14 +901,23 @@ private void addPartition(Partition p) throws AlreadyExistsException, MetaExcept assertPartitioned(); pTree.addPartition(p); } + private Partition getPartition(String partName) throws MetaException { assertPartitioned(); return pTree.getPartition(partName); } + + public int addPartitions(List partitions) throws AlreadyExistsException, + MetaException { + assertPartitioned(); + return pTree.addPartitions(partitions); + } + private List getPartitions(List partialPartVals) throws MetaException { assertPartitioned(); return pTree.getPartitions(partialPartVals); } + private void assertPartitioned() throws MetaException { if(tTable.getPartitionKeysSize() <= 0) { throw new MetaException(Warehouse.getQualifiedName(tTable) + " is not partitioned"); @@ -939,6 +948,17 @@ private void addPartition(Partition p) throws AlreadyExistsException, MetaExcept private Partition getPartition(String partName) { return parts.get(partName); } + + public int addPartitions(List partitions) + throws AlreadyExistsException, MetaException { + int partitionsAdded = 0; + for (Partition partition : partitions) { + addPartition(partition); + partitionsAdded++; + } + return partitionsAdded; + } + /** * Provided values for the 1st N partition columns, will return all matching PartitionS * The list is a partial list of partition values in the same order as partition columns. @@ -960,9 +980,7 @@ private Partition getPartition(String partName) { } } /** - * Loading Dynamic Partitons calls this. - * Hive.loadPartition() calls this which in turn can be called from Hive.loadDynamicPartitions() - * among others + * Hive.loadPartition() calls this * @param partition * The partition to add * @return the partition added @@ -985,6 +1003,33 @@ private Partition getPartition(String partName) { tt.addPartition(deepCopy(partition)); return partition; } + + /** + * Loading Dynamic Partitions calls this. + * @param new_parts the new partitions to be added, must be not null + * @return number of partitions that were added + * @throws TException + */ + @Override + public int add_partitions(List new_parts) throws TException { + // assume that all the partition belongs to the same table + if (new_parts.isEmpty()) + return 0; + Partition partition = new_parts.get(0); + org.apache.hadoop.hive.metastore.api.Table table = + getTempTable(partition.getDbName(), partition.getTableName()); + if (table == null) { + // not a temp table - Try underlying client + return super.add_partitions(new_parts); + } + TempTable tt = getTempTable(table); + if (tt == null) { + throw new IllegalStateException("TempTable not found for" + + table.getTableName()); + } + return tt.addPartitions(deepCopyPartitions(new_parts)); + } + /** * @param partialPvals partition values, can be partial. This really means that missing values * are represented by empty str. diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index a2ec09f715..f45e96fa60 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2467,7 +2467,7 @@ protected PrincipalPrivilegeSet deepCopy(PrincipalPrivilegeSet pps) { return copy; } - private List deepCopyPartitions(List partitions) { + protected List deepCopyPartitions(List partitions) { return deepCopyPartitions(partitions, null); }