diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ff5c858..6dd849f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -524,6 +524,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Used to avoid all of the proxies and object copies in the metastore. Note, if this is " + "set, you MUST use a local metastore (hive.metastore.uris must be empty) otherwise " + "undefined and most likely undesired behavior will result"), + METASTORE_HANDLER_THREADS_COUNT("hive.metastore.handler.threads.count", 20, + "Number of threads to be allocated for metastore handler."), METASTORE_HBASE_CATALOG_CACHE_SIZE("hive.metastore.hbase.catalog.cache.size", 50000, "Maximum number of " + "objects we will place in the hbase metastore catalog cache. The objects will be divided up by " + "types that we need to cache."), diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 94dd72e..59f0208 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.cli.OptionBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -134,6 +135,13 @@ import java.util.Properties; import java.util.Set; import java.util.Timer; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -258,6 +266,8 @@ protected Configuration initialValue() { } }; + final ExecutorService threadPool; + public static final String AUDIT_FORMAT = "ugi=%s\t" + // ugi "ip=%s\t" + // remote IP @@ -351,6 +361,11 @@ public HMSHandler(String name, HiveConf conf) throws MetaException { public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException { super(name); hiveConf = conf; + int numThreads = HiveConf.getIntVar(conf, + ConfVars.METASTORE_HANDLER_THREADS_COUNT); + threadPool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("HMSHandler #%d").build()); if (init) { init(); } @@ -811,6 +826,9 @@ public void shutdown() { threadLocalMS.remove(); } } + if (threadPool != null) { + threadPool.shutdown(); + } logInfo("Metastore shutdown complete."); } @@ -2267,15 +2285,16 @@ public boolean equals(Object obj) { } } - private List add_partitions_core( - RawStore ms, String dbName, String tblName, List parts, boolean ifNotExists) + private List add_partitions_core(final RawStore ms, + String dbName, String tblName, List parts, final boolean ifNotExists) throws MetaException, InvalidObjectException, AlreadyExistsException, TException { logInfo("add_partitions"); boolean success = false; // Ensures that the list doesn't have dups, and keeps track of directories we have created. - Map addedPartitions = new HashMap(); - List result = new ArrayList(); - List existingParts = null; + final Map addedPartitions = + new HashMap(); + final List result = new ArrayList(); + final List existingParts = new ArrayList();; Table tbl = null; try { ms.openTransaction(); @@ -2289,29 +2308,50 @@ public boolean equals(Object obj) { firePreEvent(new PreAddPartitionEvent(tbl, parts, this)); } - for (Partition part : parts) { + CompletionService cs = new ExecutorCompletionService(threadPool); + List> partFutures = Lists.newArrayList(); + int numPartitions = parts.size(); + + final Table table = tbl; + for (final Partition part : parts) { if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { throw new MetaException("Partition does not belong to target table " + dbName + "." + tblName + ": " + part); } + boolean shouldAdd = startAddPartition(ms, part, ifNotExists); if (!shouldAdd) { - if (existingParts == null) { - existingParts = new ArrayList(); - } existingParts.add(part); LOG.info("Not adding partition " + part + " as it already exists"); continue; } - boolean madeDir = createLocationForAddedPartition(tbl, part); - if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); + + partFutures.add(cs.submit(new Callable() { + @Override + public Partition call() throws Exception { + boolean madeDir = createLocationForAddedPartition(table, part); + if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + initializeAddedPartition(table, part, madeDir); + return part; + } + })); + } + for(int i = 0; i < numPartitions; i++) { + try { + Partition part = cs.take().get(); + if (part != null) { + result.add(part); + } + } catch (InterruptedException e) { + throw new MetaException(e.getMessage()); + } catch (ExecutionException e) { + throw new MetaException(e.getMessage()); } - initializeAddedPartition(tbl, part, madeDir); - result.add(part); } if (!result.isEmpty()) { success = ms.addPartitions(dbName, tblName, result); @@ -2414,9 +2454,11 @@ private int add_partitions_pspec_core( throws TException { boolean success = false; // Ensures that the list doesn't have dups, and keeps track of directories we have created. - Map addedPartitions = new HashMap(); + final Map addedPartitions = + new HashMap(); PartitionSpecProxy partitionSpecProxy = PartitionSpecProxy.Factory.get(partSpecs); - PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy.getPartitionIterator(); + final PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy + .getPartitionIterator(); Table tbl = null; try { ms.openTransaction(); @@ -2428,10 +2470,15 @@ private int add_partitions_pspec_core( firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this)); + CompletionService cs = new ExecutorCompletionService(threadPool); + List> partFutures = Lists.newArrayList(); + int numPartitions = partSpecs.size(); + final Table table = tbl; + int nPartitions = 0; while(partitionIterator.hasNext()) { - Partition part = partitionIterator.getCurrent(); + final Partition part = partitionIterator.getCurrent(); if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { throw new MetaException("Partition does not belong to target table " @@ -2442,19 +2489,35 @@ private int add_partitions_pspec_core( LOG.info("Not adding partition " + part + " as it already exists"); continue; } - boolean madeDir = createLocationForAddedPartition(tbl, part); - if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - initializeAddedPartition(tbl, partitionIterator, madeDir); + partFutures.add(cs.submit(new Callable() { + @Override + public Partition call() throws Exception { + boolean madeDir = createLocationForAddedPartition(table, part); + if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + initializeAddedPartition(table, partitionIterator, madeDir); + return part; + } + })); ++nPartitions; partitionIterator.next(); } + for(int i = 0; i < numPartitions; i++) { + try { + cs.take().get(); + } catch (InterruptedException e) { + throw new MetaException(e.getMessage()); + } catch (ExecutionException e) { + throw new MetaException(e.getMessage()); + } + } + success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists) && ms.commitTransaction();