diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aa7647b..f7beae4 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2475,6 +2475,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 15, new SizeValidator(0L, true, 1024L, true), "Number of threads" + " used to move files in move task. Set it to 0 to disable multi-threaded file moves. This parameter is also used by" + " MSCK to check tables."), + HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT("hive.load.dynamic.partitions.thread", 15, + new SizeValidator(1L, true, 1024L, true), + "Number of threads used to load dynamic partitions."), // If this is set all move tasks at the end of a multi-insert query will only begin once all // outputs are ready HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES( diff --git metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 5adfa02..83a3e39 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -2270,7 +2270,8 @@ private Collection getPartitionPsQueryResults(String dbName, String tableName, List partCols = table.getPartitionKeys(); int numPartKeys = partCols.size(); if (part_vals.size() > numPartKeys) { - throw new MetaException("Incorrect number of partition values"); + throw new MetaException("Incorrect number of partition values." + + " numPartKeys=" + numPartKeys + ", part_val=" + part_vals.size()); } partCols = partCols.subList(0, part_vals.size()); // Construct a pattern of the form: partKey=partVal/partKey2=partVal2/... diff --git metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java index d624d1b..7e878da 100755 --- metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -32,7 +32,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; - +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java new file mode 100644 index 0000000..f5d2c76 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.thrift.TException; + + +/** + * Synchronized MetaStoreClient wrapper + */ +public final class SynchronizedMetaStoreClient { + + private final IMetaStoreClient client; + + public SynchronizedMetaStoreClient(IMetaStoreClient client) { + this.client = client; + } + + public synchronized long openTxn(String user) throws TException { + return client.openTxn(user); + } + + public synchronized void commitTxn(long txnid) throws TException { + client.commitTxn(txnid); + } + + public synchronized void rollbackTxn(long txnid) throws TException { + client.rollbackTxn(txnid); + } + + public synchronized void heartbeat(long txnid, long lockid) throws TException { + client.heartbeat(txnid, lockid); + } + + public synchronized ValidTxnList getValidTxns(long currentTxn) throws TException { + return client.getValidTxns(currentTxn); + } + + public synchronized LockResponse lock(LockRequest request) throws TException { + return client.lock(request); + } + + public synchronized Partition add_partition(Partition partition) throws TException { + return client.add_partition(partition); + } + + public synchronized void alter_partition(String dbName, String tblName, + Partition newPart, EnvironmentContext environmentContext) throws TException { + client.alter_partition(dbName, tblName, newPart, environmentContext); + } + + public synchronized LockResponse checkLock(long lockid) throws TException { + return client.checkLock(lockid); + } + + public synchronized void unlock(long lockid) throws TException { + client.unlock(lockid); + } + + public synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { + return client.showLocks(showLocksRequest); + } + + public synchronized void close() { + client.close(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index b4ae1d1..45ead16 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,8 +26,6 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.thrift.TException; @@ -54,11 +53,11 @@ private long MAX_SLEEP; //longer term we should always have a txn id and then we won't need to track locks here at all private Set locks; - private DbTxnManager.SynchronizedMetaStoreClient client; + private SynchronizedMetaStoreClient client; private long nextSleep = 50; private final HiveConf conf; - DbLockManager(DbTxnManager.SynchronizedMetaStoreClient client, HiveConf conf) { + DbLockManager(SynchronizedMetaStoreClient client, HiveConf conf) { locks = new HashSet<>(); this.client = client; this.conf = conf; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 02c17b5..a446999 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; @@ -26,7 +27,6 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.api.*; @@ -711,54 +711,4 @@ public void run() { } } } - - /** - * Synchronized MetaStoreClient wrapper - */ - final class SynchronizedMetaStoreClient { - private final IMetaStoreClient client; - SynchronizedMetaStoreClient(IMetaStoreClient client) { - this.client = client; - } - - synchronized long openTxn(String user) throws TException { - return client.openTxn(user); - } - - synchronized void commitTxn(long txnid) throws TException { - client.commitTxn(txnid); - } - - synchronized void rollbackTxn(long txnid) throws TException { - client.rollbackTxn(txnid); - } - - synchronized void heartbeat(long txnid, long lockid) throws TException { - client.heartbeat(txnid, lockid); - } - - synchronized ValidTxnList getValidTxns(long currentTxn) throws TException { - return client.getValidTxns(currentTxn); - } - - synchronized LockResponse lock(LockRequest request) throws TException { - return client.lock(request); - } - - synchronized LockResponse checkLock(long lockid) throws TException { - return client.checkLock(lockid); - } - - synchronized void unlock(long lockid) throws TException { - client.unlock(lockid); - } - - synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { - return client.showLocks(showLocksRequest); - } - - synchronized void close() { - client.close(); - } - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 9d927bd..d1e84b6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -55,6 +55,8 @@ import javax.jdo.JDODataStoreException; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -80,7 +82,6 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.Database; @@ -126,6 +127,7 @@ import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DropTableDesc; @@ -163,6 +165,7 @@ private HiveConf conf = null; private IMetaStoreClient metaStoreClient; + private SynchronizedMetaStoreClient syncMetaStoreClient; private UserGroupInformation owner; // metastore calls timing information @@ -1499,8 +1502,10 @@ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { + Path tblDataLocationPath = tbl.getDataLocation(); try { + Partition oldPart = getPartition(tbl, partSpec, false); /** * Move files before creating the partition since down stream processes * check for existence of partition in metadata before accessing the data. @@ -1508,12 +1513,7 @@ public Partition loadPartition(Path loadPath, Table tbl, * processes might move forward with partial data */ - Partition oldPart = getPartition(tbl, partSpec, false); - Path oldPartPath = null; - if(oldPart != null) { - oldPartPath = oldPart.getDataLocation(); - } - + Path oldPartPath = (oldPart != null) ? oldPart.getDataLocation() : null; Path newPartPath = null; if (inheritTableSpecs) { @@ -1584,14 +1584,19 @@ public Partition loadPartition(Path loadPath, Table tbl, StatsSetupConst.TRUE); } MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath, -1, newPartPath.getFileSystem(conf)), newTPart.getParameters()); - getMSC().add_partition(newTPart.getTPartition()); + LOG.debug("Adding new partition " + newTPart.getSpec()); + getSychronizedMSC().add_partition(newTPart.getTPartition()); } else { EnvironmentContext environmentContext = null; if (hasFollowingStatsTask) { environmentContext = new EnvironmentContext(); environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); } - alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newTPart.getTPartition()), environmentContext); + Partition newPart = new Partition(tbl, newTPart.getTPartition()); + validatePartition(newPart); + LOG.debug("Altering existing partition " + newTPart.getSpec()); + getSychronizedMSC().alter_partition(tbl.getDbName(), tbl.getTableName(), + newPart.getTPartition(), environmentContext); } return newTPart; } catch (IOException e) { @@ -1691,6 +1696,92 @@ private void constructOneLBLocationMap(FileStatus fSta, return skewedColValueLocationMaps; } + /** + * Get the valid partitions from the path + * @param numDP number of dynamic partitions + * @param loadPath + * @return Set of valid partitions + * @throws HiveException + */ + private Set getValidPartitionsInPath(int numDP, Path loadPath) throws HiveException { + Set validPartitions = new HashSet(); + try { + FileSystem fs = loadPath.getFileSystem(conf); + FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); + // Check for empty partitions + for (FileStatus s : leafStatus) { + if (!s.isDirectory()) { + throw new HiveException("partition " + s.getPath() + " is not a directory!"); + } + validPartitions.add(s.getPath()); + } + } catch (IOException e) { + throw new HiveException(e); + } + + int partsToLoad = validPartitions.size(); + if (partsToLoad == 0) { + LOG.warn("No partition is generated by dynamic partitioning"); + } + + if (partsToLoad > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) { + throw new HiveException("Number of dynamic partitions created is " + partsToLoad + + ", which is more than " + + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS) + +". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + " to at least " + partsToLoad + '.'); + } + return validPartitions; + } + + /** + * Get set of partitions from path + * + * @param tbl + * @param partitionPaths + * @param partSpec + * @return Map containing spec and partition details + * @throws HiveException + */ + private Map getPartitionsForPath(Table tbl, + Set partitionPaths, Map partSpec) throws HiveException { + + Map tablePartitions = Maps.newLinkedHashMap(); + + List partVals = Lists.newLinkedList(); + for(Path partPath: partitionPaths) { + Map fullPartSpec = Maps.newLinkedHashMap(partSpec); + Warehouse.makeSpecFromName(fullPartSpec, partPath); + + //Construct the set of partition values to be fetched + for(Map.Entry entry : fullPartSpec.entrySet()) { + partVals.add(entry.getKey().toLowerCase() + "=" + entry.getValue().toLowerCase()); + } + } + + //Get partitions which are already present + if (LOG.isDebugEnabled()) { + LOG.debug("Getting partitions by names for " + partVals); + } + try { + List partitions = getPartitionsByNames(tbl, partVals); + for (Partition part : partitions) { + tablePartitions.put(part.getSpec().toString(), part); + } + } catch(HiveException he) { + if (he.getCause() instanceof NoSuchObjectException) { + //no valid partitions. + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to find partitions for " + partVals + " in " + tbl.getTableName()); + } + } else { + throw he; + } + } + LOG.info("partitionPaths count=" + partitionPaths.size() + + ", partitions available=" + tablePartitions.size()); + return tablePartitions; + } /** * Given a source directory name of the load path, load all dynamically generated partitions @@ -1707,67 +1798,97 @@ private void constructOneLBLocationMap(FileStatus fSta, * @return partition map details (PartitionSpec and Partition) * @throws HiveException */ - public Map, Partition> loadDynamicPartitions(Path loadPath, - String tableName, Map partSpec, boolean replace, - int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, boolean hasFollowingStatsTask, - AcidUtils.Operation operation) + public Map, Partition> loadDynamicPartitions(final Path loadPath, + final String tableName, final Map partSpec, final boolean replace, + final int numDP, final boolean listBucketingEnabled, final boolean isAcid, final long txnId, + final boolean hasFollowingStatsTask, final AcidUtils.Operation operation) throws HiveException { - Set validPartitions = new HashSet(); - try { - Map, Partition> partitionsMap = new - LinkedHashMap, Partition>(); + final Map, Partition> partitionsMap = + Collections.synchronizedMap(new LinkedHashMap, Partition>()); - FileSystem fs = loadPath.getFileSystem(conf); - FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); - // Check for empty partitions - for (FileStatus s : leafStatus) { - validPartitions.add(s.getPath()); - } + int poolSize = conf.getInt(ConfVars.HIVE_LOAD_DYNAMIC_PARTITIONS_THREAD_COUNT.varname, 1); + final ExecutorService pool = Executors.newFixedThreadPool(poolSize, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("load-dynamic-partitions-%d") + .build()); - int partsToLoad = validPartitions.size(); - if (partsToLoad == 0) { - LOG.warn("No partition is generated by dynamic partitioning"); - } + // Get all valid partition paths and existing partitions for them (if any) + final Table tbl = getTable(tableName); + final Set validPartitions = getValidPartitionsInPath(numDP, loadPath); - if (partsToLoad > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) { - throw new HiveException("Number of dynamic partitions created is " + partsToLoad - + ", which is more than " - + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS) - +". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname - + " to at least " + partsToLoad + '.'); - } + final int partsToLoad = validPartitions.size(); + final AtomicInteger partitionsLoaded = new AtomicInteger(0); + + final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 + && InPlaceUpdates.inPlaceEligible(conf); + final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null; + final SessionState parentSession = SessionState.get(); - Table tbl = getTable(tableName); + final List> futures = Lists.newLinkedList(); + try { // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that - Iterator iter = validPartitions.iterator(); - LOG.info("Going to load " + partsToLoad + " partitions."); - PrintStream ps = null; - boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0 - && InPlaceUpdates.inPlaceEligible(conf); - if(inPlaceEligible) { - ps = SessionState.getConsole().getInfoStream(); - } - int partitionsLoaded = 0; - while (iter.hasNext()) { - // get the dynamically created directory - Path partPath = iter.next(); - assert fs.getFileStatus(partPath).isDir(): - "partitions " + partPath + " is not a directory !"; - + for(final Path partPath : validPartitions) { // generate a full partition specification - LinkedHashMap fullPartSpec = new LinkedHashMap(partSpec); + final LinkedHashMap fullPartSpec = Maps.newLinkedHashMap(partSpec); Warehouse.makeSpecFromName(fullPartSpec, partPath); - Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, - true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); - partitionsMap.put(fullPartSpec, newPartition); - if (inPlaceEligible) { - InPlaceUpdates.rePositionCursor(ps); - InPlaceUpdates.reprintLine(ps, "Loaded : " + ++partitionsLoaded + "/" + partsToLoad +" partitions."); - } - LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws Exception { + try { + // move file would require session details (needCopy() invokes SessionState.get) + SessionState.setCurrentSessionState(parentSession); + LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); + + // load the partition + Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, + replace, true, listBucketingEnabled, + false, isAcid, hasFollowingStatsTask); + partitionsMap.put(fullPartSpec, newPartition); + + if (inPlaceEligible) { + synchronized (ps) { + InPlaceUpdates.rePositionCursor(ps); + partitionsLoaded.incrementAndGet(); + InPlaceUpdates.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/" + + partsToLoad + " partitions."); + } + } + return null; + } catch (Exception t) { + LOG.error("Exception when loading partition with parameters " + + " partPath=" + partPath + ", " + + " table=" + tbl.getTableName() + ", " + + " partSpec=" + fullPartSpec + ", " + + " replace=" + replace + ", " + + " listBucketingEnabled=" + listBucketingEnabled + ", " + + " isAcid=" + isAcid + ", " + + " hasFollowingStatsTask=" + hasFollowingStatsTask, t); + throw t; + } + } + })); } + pool.shutdown(); + LOG.debug("Number of partitions to be added is " + futures.size()); + + for (Future future : futures) { + future.get(); + } + } catch (InterruptedException | ExecutionException e) { + LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks"); + //cancel other futures + for (Future future : futures) { + future.cancel(true); + } + throw new HiveException("Exception when loading " + + partsToLoad + " in table " + tbl.getTableName() + + " with loadPath=" + loadPath, e); + } + + try { if (isAcid) { List partNames = new ArrayList<>(partitionsMap.size()); for (Partition p : partitionsMap.values()) { @@ -1776,11 +1897,11 @@ private void constructOneLBLocationMap(FileStatus fSta, metaStoreClient.addDynamicPartitions(txnId, tbl.getDbName(), tbl.getTableName(), partNames, AcidUtils.toDataOperationType(operation)); } + LOG.info("Loaded " + partitionsMap.size() + " partitions"); return partitionsMap; - } catch (IOException e) { - throw new HiveException(e); } catch (TException te) { - throw new HiveException(te); + throw new HiveException("Exception updating metastore for acid table " + + tableName + " with partitions " + partitionsMap.values(), te); } } @@ -3378,6 +3499,19 @@ public synchronized IMetaStoreClient getMSC() throws MetaException { } /** + * @return synchronized metastore client + * @throws MetaException + */ + @LimitedPrivate(value = {"Hive"}) + @Unstable + public synchronized SynchronizedMetaStoreClient getSychronizedMSC() throws MetaException { + if (syncMetaStoreClient == null) { + syncMetaStoreClient = new SynchronizedMetaStoreClient(getMSC(true, false)); + } + return syncMetaStoreClient; + } + + /** * @return the metastore client for the current thread * @throws MetaException */