diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index b4ae1d1..aece568 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -54,11 +54,11 @@ private long MAX_SLEEP; //longer term we should always have a txn id and then we won't need to track locks here at all private Set locks; - private DbTxnManager.SynchronizedMetaStoreClient client; + private SynchronizedMetaStoreClient client; private long nextSleep = 50; private final HiveConf conf; - DbLockManager(DbTxnManager.SynchronizedMetaStoreClient client, HiveConf conf) { + DbLockManager(SynchronizedMetaStoreClient client, HiveConf conf) { locks = new HashSet<>(); this.client = client; this.conf = conf; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 7b0369d..0650e34 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -706,54 +706,4 @@ public void run() { } } } - - /** - * Synchronized MetaStoreClient wrapper - */ - final class SynchronizedMetaStoreClient { - private final IMetaStoreClient client; - SynchronizedMetaStoreClient(IMetaStoreClient client) { - this.client = client; - } - - synchronized long openTxn(String user) throws TException { - return client.openTxn(user); - } - - synchronized void commitTxn(long txnid) throws TException { - client.commitTxn(txnid); - } - - synchronized void rollbackTxn(long txnid) throws TException { - client.rollbackTxn(txnid); - } - - synchronized void heartbeat(long txnid, long lockid) throws TException { - client.heartbeat(txnid, lockid); - } - - synchronized ValidTxnList getValidTxns(long currentTxn) throws TException { - return client.getValidTxns(currentTxn); - } - - synchronized LockResponse lock(LockRequest request) throws TException { - return client.lock(request); - } - - synchronized LockResponse checkLock(long lockid) throws TException { - return client.checkLock(lockid); - } - - synchronized void unlock(long lockid) throws TException { - client.unlock(lockid); - } - - synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { - return client.showLocks(showLocksRequest); - } - - synchronized void close() { - client.close(); - } - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/SynchronizedMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/SynchronizedMetaStoreClient.java new file mode 100644 index 0000000..d55db6b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/SynchronizedMetaStoreClient.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.thrift.TException; + + +/** + * Synchronized MetaStoreClient wrapper + */ +public final class SynchronizedMetaStoreClient { + + private final IMetaStoreClient client; + + public SynchronizedMetaStoreClient(IMetaStoreClient client) { + this.client = client; + } + + public synchronized long openTxn(String user) throws TException { + return client.openTxn(user); + } + + public synchronized void commitTxn(long txnid) throws TException { + client.commitTxn(txnid); + } + + public synchronized void rollbackTxn(long txnid) throws TException { + client.rollbackTxn(txnid); + } + + public synchronized void heartbeat(long txnid, long lockid) throws TException { + client.heartbeat(txnid, lockid); + } + + public synchronized ValidTxnList getValidTxns(long currentTxn) throws TException { + return client.getValidTxns(currentTxn); + } + + public synchronized LockResponse lock(LockRequest request) throws TException { + return client.lock(request); + } + + public synchronized Partition add_partition(Partition partition) throws TException { + return client.add_partition(partition); + } + + public synchronized void alter_partition(String dbName, String tblName, + Partition newPart, EnvironmentContext environmentContext) throws TException { + client.alter_partition(dbName, tblName, newPart, environmentContext); + } + + public synchronized LockResponse checkLock(long lockid) throws TException { + return client.checkLock(lockid); + } + + public synchronized void unlock(long lockid) throws TException { + client.unlock(lockid); + } + + public synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { + return client.showLocks(showLocksRequest); + } + + public synchronized void close() { + client.close(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index ef0bb3d..340d72f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -126,6 +126,7 @@ import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DropTableDesc; @@ -163,6 +164,7 @@ private HiveConf conf = null; private IMetaStoreClient metaStoreClient; + private SynchronizedMetaStoreClient syncMetaStoreClient; private UserGroupInformation owner; // metastore calls timing information @@ -1499,6 +1501,16 @@ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { + + Partition oldPart = getPartition(tbl, partSpec, false); + return loadPartition(loadPath, tbl, partSpec, oldPart, replace, inheritTableSpecs, + isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask); + } + + public Partition loadPartition(Path loadPath, Table tbl, + Map partSpec, Partition oldPart, boolean replace, + boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, + boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); try { /** @@ -1508,7 +1520,6 @@ public Partition loadPartition(Path loadPath, Table tbl, * processes might move forward with partial data */ - Partition oldPart = getPartition(tbl, partSpec, false); Path oldPartPath = null; if(oldPart != null) { oldPartPath = oldPart.getDataLocation(); @@ -1582,14 +1593,17 @@ public Partition loadPartition(Path loadPath, Table tbl, StatsSetupConst.TRUE); } MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath, -1, newPartPath.getFileSystem(conf)), newTPart.getParameters()); - getMSC().add_partition(newTPart.getTPartition()); + getSychronizedMSC().add_partition(newTPart.getTPartition()); } else { EnvironmentContext environmentContext = null; if (hasFollowingStatsTask) { environmentContext = new EnvironmentContext(); environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); } - alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newTPart.getTPartition()), environmentContext); + Partition newPart = new Partition(tbl, newTPart.getTPartition()); + validatePartition(newPart); + getSychronizedMSC().alter_partition(tbl.getDbName(), tbl.getTableName(), + newPart.getTPartition(), environmentContext); } return newTPart; } catch (IOException e) { @@ -1705,16 +1719,23 @@ private void constructOneLBLocationMap(FileStatus fSta, * @return partition map details (PartitionSpec and Partition) * @throws HiveException */ - public Map, Partition> loadDynamicPartitions(Path loadPath, - String tableName, Map partSpec, boolean replace, - int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, boolean hasFollowingStatsTask, - AcidUtils.Operation operation) + public Map, Partition> loadDynamicPartitions(final Path loadPath, + final String tableName, final Map partSpec, final boolean replace, + final int numDP, final boolean listBucketingEnabled, final boolean isAcid, final long txnId, + final boolean hasFollowingStatsTask, final AcidUtils.Operation operation) throws HiveException { Set validPartitions = new HashSet(); + int poolSize = Math.max(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), 1); + final ExecutorService pool = + Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("load-dynamic-partitions-%d") + .build()); try { - Map, Partition> partitionsMap = new - LinkedHashMap, Partition>(); + final Map, Partition> partitionsMap = + Collections.synchronizedMap(new LinkedHashMap, Partition>()); FileSystem fs = loadPath.getFileSystem(conf); FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); @@ -1723,7 +1744,7 @@ private void constructOneLBLocationMap(FileStatus fSta, validPartitions.add(s.getPath()); } - int partsToLoad = validPartitions.size(); + final int partsToLoad = validPartitions.size(); if (partsToLoad == 0) { LOG.warn("No partition is generated by dynamic partitioning"); } @@ -1736,36 +1757,73 @@ private void constructOneLBLocationMap(FileStatus fSta, + " to at least " + partsToLoad + '.'); } - Table tbl = getTable(tableName); + final Table tbl = getTable(tableName); + LOG.info("Getting partition details for spec : " + partSpec + " for table " + tableName); + List partitions = getPartitions(tbl, partSpec, (short) -1); + final Map tablePartitions = + Collections.synchronizedMap(new LinkedHashMap()); + for(Partition p : partitions) { + tablePartitions.put(p.getSpec().toString(), p); + } + // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that Iterator iter = validPartitions.iterator(); LOG.info("Going to load " + partsToLoad + " partitions."); - PrintStream ps = null; - boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 + final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 && InPlaceUpdates.inPlaceEligible(conf); - if(inPlaceEligible) { - ps = SessionState.getConsole().getInfoStream(); - } - int partitionsLoaded = 0; + final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null; + final AtomicInteger partitionsLoaded = new AtomicInteger(0); + final List> futures = new LinkedList<>(); + final SessionState parentSession = SessionState.get(); while (iter.hasNext()) { // get the dynamically created directory - Path partPath = iter.next(); + final Path partPath = iter.next(); assert fs.getFileStatus(partPath).isDir(): "partitions " + partPath + " is not a directory !"; // generate a full partition specification - LinkedHashMap fullPartSpec = new LinkedHashMap(partSpec); + final LinkedHashMap fullPartSpec = + new LinkedHashMap(partSpec); Warehouse.makeSpecFromName(fullPartSpec, partPath); - Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, - true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); - partitionsMap.put(fullPartSpec, newPartition); - if (inPlaceEligible) { - InPlaceUpdates.rePositionCursor(ps); - InPlaceUpdates.reprintLine(ps, "Loaded : " + ++partitionsLoaded + "/" + partsToLoad +" partitions."); - } - LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); + + futures.add(pool.submit(new Callable() { + @Override + public Boolean call() throws Exception { + try { + // move file would require seession details (needCopy() invokes SessionState.get) + SessionState.setCurrentSessionState(parentSession); + + // load the partition + Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, + tablePartitions.get(fullPartSpec), replace, + true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); + partitionsMap.put(fullPartSpec, newPartition); + + if (inPlaceEligible) { + synchronized (ps) { + InPlaceUpdates.rePositionCursor(ps); + partitionsLoaded.incrementAndGet(); + InPlaceUpdates.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" + + partsToLoad + " partitions."); + } + } + LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); + return true; + } catch(Exception t) { + LOG.error("Exception when loading partition " + partPath, t); + throw t; + } + } + })); } + pool.shutdown(); + LOG.debug("Number of partitions to be added is " + futures.size()); + + for(Future future : futures) { + future.get(); + } + if (isAcid) { List partNames = new ArrayList<>(partitionsMap.size()); for (Partition p : partitionsMap.values()) { @@ -1779,6 +1837,10 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new HiveException(e); } catch (TException te) { throw new HiveException(te); + } catch (InterruptedException e) { + throw new HiveException(e); + } catch (ExecutionException e) { + throw new HiveException(e); } } @@ -3376,6 +3438,19 @@ public synchronized IMetaStoreClient getMSC() throws MetaException { } /** + * @return synchronized metastore client + * @throws MetaException + */ + @LimitedPrivate(value = {"Hive"}) + @Unstable + public synchronized SynchronizedMetaStoreClient getSychronizedMSC() throws MetaException { + if (syncMetaStoreClient == null) { + syncMetaStoreClient = new SynchronizedMetaStoreClient(getMSC(true, false)); + } + return syncMetaStoreClient; + } + + /** * @return the metastore client for the current thread * @throws MetaException */