diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cc95008..0830c0e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -524,6 +524,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Used to avoid all of the proxies and object copies in the metastore. Note, if this is " + "set, you MUST use a local metastore (hive.metastore.uris must be empty) otherwise " + "undefined and most likely undesired behavior will result"), + METASTORE_FS_HANDLER_THREADS_COUNT("hive.metastore.fshandler.threads", 20, + "Number of threads to be allocated for metastore handler for fs operations."), METASTORE_HBASE_CATALOG_CACHE_SIZE("hive.metastore.hbase.catalog.cache.size", 50000, "Maximum number of " + "objects we will place in the hbase metastore catalog cache. The objects will be divided up by " + "types that we need to cache."), diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index c0827ea..6d59581 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -28,6 +28,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.cli.OptionBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -47,6 +48,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.events.AddIndexEvent; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; @@ -131,10 +133,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.Timer; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -259,6 +265,8 @@ protected Configuration initialValue() { } }; + private static ExecutorService threadPool; + public static final String AUDIT_FORMAT = "ugi=%s\t" + // ugi "ip=%s\t" + // remote IP @@ -352,6 +360,15 @@ public HMSHandler(String name, HiveConf conf) throws MetaException { public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException { super(name); hiveConf = conf; + synchronized (HMSHandler.class) { + if (threadPool == null) { + int numThreads = HiveConf.getIntVar(conf, + ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT); + threadPool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("HMSHandler #%d").build()); + } + } if (init) { init(); } @@ -2268,15 +2285,16 @@ public boolean equals(Object obj) { } } - private List add_partitions_core( - RawStore ms, String dbName, String tblName, List parts, boolean ifNotExists) + private List add_partitions_core(final RawStore ms, + String dbName, String tblName, List parts, final boolean ifNotExists) throws MetaException, InvalidObjectException, AlreadyExistsException, TException { logInfo("add_partitions"); boolean success = false; // Ensures that the list doesn't have dups, and keeps track of directories we have created. - Map addedPartitions = new HashMap(); - List result = new ArrayList(); - List existingParts = null; + final Set addedPartitions = + Collections.synchronizedSet(new HashSet()); + final List result = new ArrayList(); + final List existingParts = new ArrayList();; Table tbl = null; try { ms.openTransaction(); @@ -2290,29 +2308,51 @@ public boolean equals(Object obj) { firePreEvent(new PreAddPartitionEvent(tbl, parts, this)); } - for (Partition part : parts) { + List> partFutures = Lists.newArrayList(); + + final Table table = tbl; + for (final Partition part : parts) { if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { throw new MetaException("Partition does not belong to target table " + dbName + "." + tblName + ": " + part); } + boolean shouldAdd = startAddPartition(ms, part, ifNotExists); if (!shouldAdd) { - if (existingParts == null) { - existingParts = new ArrayList(); - } existingParts.add(part); LOG.info("Not adding partition " + part + " as it already exists"); continue; } - boolean madeDir = createLocationForAddedPartition(tbl, part); - if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); + + + partFutures.add(threadPool.submit(new Callable() { + @Override + public Partition call() throws Exception { + if (!addedPartitions.add(new PartValEqWrapper(part))) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + boolean madeDir = createLocationForAddedPartition(table, part); + initializeAddedPartition(table, part, madeDir); + return part; + } + })); + } + try { + for (Future partFuture : partFutures) { + Partition part = partFuture.get(); + if (part != null) { + result.add(part); + } + } + } catch (InterruptedException | ExecutionException e) { + // cancel other tasks + for (Future partFuture : partFutures) { + partFuture.cancel(true); } - initializeAddedPartition(tbl, part, madeDir); - result.add(part); + throw new MetaException(e.getMessage()); } if (!result.isEmpty()) { success = ms.addPartitions(dbName, tblName, result); @@ -2323,11 +2363,10 @@ public boolean equals(Object obj) { } finally { if (!success) { ms.rollbackTransaction(); - for (Entry e : addedPartitions.entrySet()) { - if (e.getValue()) { - wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true); - // we just created this directory - it's not a case of pre-creation, so we nuke - } + for (PartValEqWrapper partWrapper : addedPartitions) { + // we just created this directory - it's not a case of pre-creation, so we nuke. + // in case dir is not created due to threadsafety, no harm in deleting non-existing dir + wh.deleteDir(new Path(partWrapper.partition.getSd().getLocation()), true); } fireMetaStoreAddPartitionEvent(tbl, parts, null, false); } else { @@ -2415,9 +2454,11 @@ private int add_partitions_pspec_core( throws TException { boolean success = false; // Ensures that the list doesn't have dups, and keeps track of directories we have created. - Map addedPartitions = new HashMap(); + final Set addedPartitions = + Collections.synchronizedSet(new HashSet()); PartitionSpecProxy partitionSpecProxy = PartitionSpecProxy.Factory.get(partSpecs); - PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy.getPartitionIterator(); + final PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy + .getPartitionIterator(); Table tbl = null; try { ms.openTransaction(); @@ -2429,10 +2470,12 @@ private int add_partitions_pspec_core( firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this)); - int nPartitions = 0; + List> partFutures = Lists.newArrayList(); + final Table table = tbl; + while(partitionIterator.hasNext()) { - Partition part = partitionIterator.getCurrent(); + final Partition part = partitionIterator.getCurrent(); if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) { throw new MetaException("Partition does not belong to target table " @@ -2443,31 +2486,46 @@ private int add_partitions_pspec_core( LOG.info("Not adding partition " + part + " as it already exists"); continue; } - boolean madeDir = createLocationForAddedPartition(tbl, part); - if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) { - // Technically, for ifNotExists case, we could insert one and discard the other - // because the first one now "exists", but it seems better to report the problem - // upstream as such a command doesn't make sense. - throw new MetaException("Duplicate partitions in the list: " + part); - } - initializeAddedPartition(tbl, partitionIterator, madeDir); + partFutures.add(threadPool.submit(new Callable() { + @Override public Object call() throws Exception { + if (!addedPartitions.add(new PartValEqWrapperLite(part))) { + // Technically, for ifNotExists case, we could insert one and discard the other + // because the first one now "exists", but it seems better to report the problem + // upstream as such a command doesn't make sense. + throw new MetaException("Duplicate partitions in the list: " + part); + } + boolean madeDir = createLocationForAddedPartition(table, part); + initializeAddedPartition(table, part, madeDir); + return part; + } + })); - ++nPartitions; partitionIterator.next(); } + try { + for (Future partFuture : partFutures) { + Partition part = partFuture.get(); + } + } catch (InterruptedException | ExecutionException e) { + // cancel other tasks + for (Future partFuture : partFutures) { + partFuture.cancel(true); + } + throw new MetaException(e.getMessage()); + } + success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists) - && ms.commitTransaction(); + && ms.commitTransaction(); - return nPartitions; + return addedPartitions.size(); } finally { if (!success) { ms.rollbackTransaction(); - for (Entry e : addedPartitions.entrySet()) { - if (e.getValue()) { - wh.deleteDir(new Path(e.getKey().location), true); - // we just created this directory - it's not a case of pre-creation, so we nuke - } + for (PartValEqWrapperLite part : addedPartitions) { + // we just created this directory - it's not a case of pre-creation, so we nuke. + // in case dir is not created due to threadsafety, no harm in deleting non-existing dir + wh.deleteDir(new Path(part.location), true); } } fireMetaStoreAddPartitionEvent(tbl, partitionSpecProxy, null, true);