diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b13fc65..283ef2e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -569,6 +569,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Used to avoid all of the proxies and object copies in the metastore. Note, if this is " + "set, you MUST use a local metastore (hive.metastore.uris must be empty) otherwise " + "undefined and most likely undesired behavior will result"), + METASTORE_FS_HANDLER_THREADS_COUNT("hive.metastore.fshandler.threads", 20, + "Number of threads to be allocated for metastore handler for fs operations."), METASTORE_HBASE_CATALOG_CACHE_SIZE("hive.metastore.hbase.catalog.cache.size", 50000, "Maximum number of " + "objects we will place in the hbase metastore catalog cache. The objects will be divided up by " + "types that we need to cache."), diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index f45b90d..f14bc5d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -28,6 +28,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.cli.OptionBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -47,6 +48,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; @@ -131,10 +133,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.Timer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -262,6 +268,8 @@ protected Configuration initialValue() { } }; + private static ExecutorService threadPool; + public static final String AUDIT_FORMAT = "ugi=%s\t" + // ugi "ip=%s\t" + // remote IP @@ -355,6 +363,15 @@ public HMSHandler(String name, HiveConf conf) throws MetaException { public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException { super(name); hiveConf = conf; + synchronized (HMSHandler.class) { + if (threadPool == null) { + int numThreads = HiveConf.getIntVar(conf, + ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT); + threadPool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("HMSHandler #%d").build()); + } + } if (init) { init(); } @@ -2271,15 +2288,16 @@ public boolean equals(Object obj) { } } - private List add_partitions_core( - RawStore ms, String dbName, String tblName, List parts, boolean ifNotExists) + private List add_partitions_core(final RawStore ms, + String dbName, String tblName, List parts, final boolean ifNotExists) throws MetaException, InvalidObjectException, AlreadyExistsException, TException { logInfo("add_partitions"); boolean success = false; // Ensures that the list doesn't have dups, and keeps track of directories we have created. - Map addedPartitions = new HashMap(); - List result = new ArrayList(); - List existingParts = null; + final Map addedPartitions = + Collections.synchronizedMap(new HashMap()); + final List result = new ArrayList(); + final List existingParts = new ArrayList();; Table tbl = null; try { ms.openTransaction(); @@ -2293,29 +2311,51 @@ public boolean equals(Object obj) { firePreEvent(new PreAddPartitionEvent(tbl, parts, this)); } - for (Partition part : parts) { + List> partFutures = Lists.newArrayList(); + + final Table table = tbl; + for (final Partition part : parts) { if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { throw new MetaException("Partition does not belong to target table " + dbName + "." + tblName + ": " + part); } + boolean shouldAdd = startAddPartition(ms, part, ifNotExists); if (!shouldAdd) { - if (existingParts == null) { - existingParts = new ArrayList(); - } existingParts.add(part); LOG.info("Not adding partition " + part + " as it already exists"); continue; } - boolean madeDir = createLocationForAddedPartition(tbl, part); - if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); + + + partFutures.add(threadPool.submit(new Callable() { + @Override + public Partition call() throws Exception { + boolean madeDir = createLocationForAddedPartition(table, part); + if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + initializeAddedPartition(table, part, madeDir); + return part; + } + })); + } + try { + for (Future partFuture : partFutures) { + Partition part = partFuture.get(); + if (part != null) { + result.add(part); + } } - initializeAddedPartition(tbl, part, madeDir); - result.add(part); + } catch (InterruptedException | ExecutionException e) { + // cancel other tasks + for (Future partFuture : partFutures) { + partFuture.cancel(true); + } + throw new MetaException(e.getMessage()); } if (!result.isEmpty()) { success = ms.addPartitions(dbName, tblName, result); @@ -2326,10 +2366,10 @@ public boolean equals(Object obj) { } finally { if (!success) { ms.rollbackTransaction(); - for (Entry e : addedPartitions.entrySet()) { + for (Map.Entry e : addedPartitions.entrySet()) { if (e.getValue()) { + // we just created this directory - it's not a case of pre-creation, so we nuke. wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true); - // we just created this directory - it's not a case of pre-creation, so we nuke } } fireMetaStoreAddPartitionEvent(tbl, parts, null, false); @@ -2418,9 +2458,11 @@ private int add_partitions_pspec_core( throws TException { boolean success = false; // Ensures that the list doesn't have dups, and keeps track of directories we have created. - Map addedPartitions = new HashMap(); + final Map addedPartitions = + Collections.synchronizedMap(new HashMap()); PartitionSpecProxy partitionSpecProxy = PartitionSpecProxy.Factory.get(partSpecs); - PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy.getPartitionIterator(); + final PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy + .getPartitionIterator(); Table tbl = null; try { ms.openTransaction(); @@ -2432,10 +2474,12 @@ private int add_partitions_pspec_core( firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this)); - int nPartitions = 0; + List> partFutures = Lists.newArrayList(); + final Table table = tbl; + while(partitionIterator.hasNext()) { - Partition part = partitionIterator.getCurrent(); + final Partition part = partitionIterator.getCurrent(); if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { throw new MetaException("Partition does not belong to target table " @@ -2446,30 +2490,45 @@ private int add_partitions_pspec_core( LOG.info("Not adding partition " + part + " as it already exists"); continue; } - boolean madeDir = createLocationForAddedPartition(tbl, part); - if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - initializeAddedPartition(tbl, partitionIterator, madeDir); - - ++nPartitions; + partFutures.add(threadPool.submit(new Callable() { + @Override public Object call() throws Exception { + boolean madeDir = createLocationForAddedPartition(table, part); + if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + initializeAddedPartition(table, part, madeDir); + return part; + } + })); partitionIterator.next(); } + try { + for (Future partFuture : partFutures) { + Partition part = partFuture.get(); + } + } catch (InterruptedException | ExecutionException e) { + // cancel other tasks + for (Future partFuture : partFutures) { + partFuture.cancel(true); + } + throw new MetaException(e.getMessage()); + } + success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists) - && ms.commitTransaction(); + && ms.commitTransaction(); - return nPartitions; + return addedPartitions.size(); } finally { if (!success) { ms.rollbackTransaction(); - for (Entry e : addedPartitions.entrySet()) { + for (Map.Entry e : addedPartitions.entrySet()) { if (e.getValue()) { + // we just created this directory - it's not a case of pre-creation, so we nuke. wh.deleteDir(new Path(e.getKey().location), true); - // we just created this directory - it's not a case of pre-creation, so we nuke } } }