diff --git ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 56a9faa47c..6734ac50f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -687,7 +687,7 @@ private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry, Set tempTables = - SessionHiveMetaStoreClient.getTempTablesForDatabase(tableUsed.getDbName()); + SessionHiveMetaStoreClient.getTempTablesForDatabase(tableUsed.getDbName(), tableUsed.getTableName()); if (tempTables != null && tempTables.containsKey(tableUsed.getTableName())) { LOG.info("{} resolves to a temporary table in the current session. This query cannot use the cache.", tableUsed.getTableName()); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java index d89df48ffb..071756ddfc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java @@ -68,10 +68,22 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; +/** + * todo: This need review re: thread safety. Various places (see callsers of + * {@link SessionState#setCurrentSessionState(SessionState)}) pass SessionState to forked threads. + * Currently it looks like those threads only read metadata but this is fragile. + * Also, maps (in SessionState) where tempt table metadata is stored are concurrent and so + * any put/get crosses a memory barrier and so does using most {@code java.util.concurrent.*} + * so the readers of the objects in these maps should have the most recent view of the object. + * But again, could be fragile. + */ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements IMetaStoreClient { + private static final Logger LOG = LoggerFactory.getLogger(SessionHiveMetaStoreClient.class); SessionHiveMetaStoreClient(Configuration conf, Boolean allowEmbedded) throws MetaException { super(conf, null, allowEmbedded); @@ -174,7 +186,7 @@ public void truncateTable(String dbName, String tableName, List partName List tableNames = super.getAllTables(dbName); // May need to merge with list of temp tables - Map tables = getTempTablesForDatabase(dbName); + Map tables = getTempTablesForDatabase(dbName, "?"); if (tables == null || tables.size() == 0) { return tableNames; } @@ -198,7 +210,7 @@ public void truncateTable(String dbName, String tableName, List partName // May need to merge with list of temp tables dbName = dbName.toLowerCase(); tablePattern = tablePattern.toLowerCase(); - Map tables = getTempTablesForDatabase(dbName); + Map tables = getTempTablesForDatabase(dbName, tablePattern); if (tables == null || tables.size() == 0) { return tableNames; } @@ -224,7 +236,8 @@ public void truncateTable(String dbName, String tableName, List partName public List getTableMeta(String dbPatterns, String tablePatterns, List tableTypes) throws MetaException { List tableMetas = super.getTableMeta(dbPatterns, tablePatterns, tableTypes); - Map> tmpTables = getTempTables(); + Map> tmpTables = getTempTables("dbPatterns='" + dbPatterns + + "' tablePatterns='" + tablePatterns + "'"); if (tmpTables.isEmpty()) { return tableMetas; } @@ -426,7 +439,7 @@ private void createTempTable(org.apache.hadoop.hive.metastore.api.Table tbl, SessionState ss = SessionState.get(); if (ss == null) { - throw new MetaException("No current SessionState, cannot create temporary table" + throw new MetaException("No current SessionState, cannot create temporary table: " + Warehouse.getQualifiedName(tbl)); } @@ -435,7 +448,7 @@ private void createTempTable(org.apache.hadoop.hive.metastore.api.Table tbl, String dbName = tbl.getDbName(); String tblName = tbl.getTableName(); - Map tables = getTempTablesForDatabase(dbName); + Map tables = getTempTablesForDatabase(dbName, tblName); if (tables != null && tables.containsKey(tblName)) { throw new MetaException( "Temporary table " + StatsUtils.getFullyQualifiedTableName(dbName, tblName) + " already exists"); @@ -472,7 +485,8 @@ private void createTempTable(org.apache.hadoop.hive.metastore.api.Table tbl, } private org.apache.hadoop.hive.metastore.api.Table getTempTable(String dbName, String tableName) { - Map tables = getTempTablesForDatabase(dbName.toLowerCase()); + Map tables = getTempTablesForDatabase(dbName.toLowerCase(), + tableName.toLowerCase()); if (tables != null) { Table table = tables.get(tableName.toLowerCase()); if (table != null) { @@ -510,13 +524,13 @@ private void alterTempTable(String dbname, String tbl_name, // Remove old temp table entry, and add new entry to list of temp tables. // Note that for temp tables there is no need to rename directories - Map tables = getTempTablesForDatabase(dbname); + Map tables = getTempTablesForDatabase(dbname, tbl_name); if (tables == null || tables.remove(tbl_name) == null) { throw new MetaException("Could not find temp table entry for " + dbname + "." + tbl_name); } shouldDeleteColStats = true; - tables = getTempTablesForDatabase(newDbName); + tables = getTempTablesForDatabase(newDbName, tbl_name); if (tables == null) { tables = new HashMap(); SessionState.get().getTempTables().put(newDbName, tables); @@ -526,7 +540,7 @@ private void alterTempTable(String dbname, String tbl_name, if (haveTableColumnsChanged(oldt, newt)) { shouldDeleteColStats = true; } - getTempTablesForDatabase(dbname).put(tbl_name, newTable); + getTempTablesForDatabase(dbname, tbl_name).put(tbl_name, newTable); } if (shouldDeleteColStats) { @@ -652,7 +666,7 @@ private void dropTempTable(org.apache.hadoop.hive.metastore.api.Table table, boo } // Remove table entry from SessionState - Map tables = getTempTablesForDatabase(dbName); + Map tables = getTempTablesForDatabase(dbName, tableName); if (tables == null || tables.remove(tableName) == null) { throw new MetaException( "Could not find temp table entry for " + StatsUtils.getFullyQualifiedTableName(dbName, tableName)); @@ -682,14 +696,20 @@ private void dropTempTable(org.apache.hadoop.hive.metastore.api.Table table, boo return newCopy; } - public static Map getTempTablesForDatabase(String dbName) { - return getTempTables().get(dbName); + /** + * @param dbName actual database name + * @param tblName actual table name or search pattern (for error message) + */ + public static Map getTempTablesForDatabase(String dbName, + String tblName) { + return getTempTables(Warehouse.getQualifiedName(dbName, tblName)). + get(dbName); } - public static Map> getTempTables() { + private static Map> getTempTables(String msg) { SessionState ss = SessionState.get(); if (ss == null) { - LOG.debug("No current SessionState, skipping temp tables"); + LOG.warn("No current SessionState, skipping temp tables for " + msg); return Collections.emptyMap(); } return ss.getTempTables(); @@ -699,7 +719,8 @@ private void dropTempTable(org.apache.hadoop.hive.metastore.api.Table table, boo String tableName) { SessionState ss = SessionState.get(); if (ss == null) { - LOG.debug("No current SessionState, skipping temp tables"); + LOG.debug("No current SessionState, skipping temp tables for " + + Warehouse.getQualifiedName(dbName, tableName)); return null; } String lookupName = StatsUtils.getFullyQualifiedTableName(dbName.toLowerCase(), @@ -976,15 +997,17 @@ private static TempTable getTempTable(org.apache.hadoop.hive.metastore.api.Table getQualifiedName(t.getDbName().toLowerCase(), t.getTableName().toLowerCase()); SessionState ss = SessionState.get(); if (ss == null) { - LOG.debug("No current SessionState, skipping temp partitions"); + LOG.warn("No current SessionState, skipping temp partitions for " + qualifiedTableName); return null; } return ss.getTempPartitions().get(qualifiedTableName); } private static void removeTempTable(org.apache.hadoop.hive.metastore.api.Table t) { + String qualifiedTableName = Warehouse. + getQualifiedName(t.getDbName().toLowerCase(), t.getTableName().toLowerCase()); SessionState ss = SessionState.get(); if (ss == null) { - LOG.debug("No current SessionState, skipping temp partitions"); + LOG.warn("No current SessionState, skipping temp partitions for " + qualifiedTableName); return; } ss.getTempPartitions().remove(Warehouse.getQualifiedName(t)); @@ -994,15 +1017,16 @@ private static void createTempTable(org.apache.hadoop.hive.metastore.api.Table t //do nothing as it's not a partitioned table return; } + String qualifiedTableName = Warehouse. + getQualifiedName(t.getDbName().toLowerCase(), t.getTableName().toLowerCase()); SessionState ss = SessionState.get(); if (ss == null) { - LOG.debug("No current SessionState, skipping temp partitions"); + LOG.warn("No current SessionState, skipping temp partitions for " + qualifiedTableName); return; } TempTable tt = new TempTable(t); - String qualifiedName = Warehouse.getQualifiedName(t); - if(ss.getTempPartitions().putIfAbsent(qualifiedName, tt) != null) { - throw new IllegalStateException("TempTable for " + qualifiedName + " already exists"); + if(ss.getTempPartitions().putIfAbsent(qualifiedTableName, tt) != null) { + throw new IllegalStateException("TempTable for " + qualifiedTableName + " already exists"); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1dccf969ff..a5aa737f86 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -13116,7 +13116,8 @@ ASTNode analyzeCreateTable( throw new SemanticException("No current SessionState, cannot create temporary table " + dbName + "." + tblName); } - Map tables = SessionHiveMetaStoreClient.getTempTablesForDatabase(dbName); + Map tables = SessionHiveMetaStoreClient. + getTempTablesForDatabase(dbName, tblName); if (tables != null && tables.containsKey(tblName)) { throw new SemanticException("Temporary table " + dbName + "." + tblName + " already exists"); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 5844f3d97f..3ae030d37c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -20,11 +20,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +35,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.AuthEntities; import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths; /** @@ -49,6 +48,7 @@ private final String distCpDoAsUser; private final HiveConf hiveConf; private final int nThreads; + private final SessionState callersSession; private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); private BlockingQueue queue; @@ -61,11 +61,14 @@ this.hiveConf = hiveConf; this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM); this.queue = new ArrayBlockingQueue<>(2 * nThreads); + this.callersSession = SessionState.get(); } void write(final ReplicationSpec forReplicationSpec) throws InterruptedException { - ExecutorService producer = Executors.newFixedThreadPool(1); + ExecutorService producer = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build()); producer.submit(() -> { + SessionState.setCurrentSessionState(callersSession); for (Partition partition : partitionIterable) { try { queue.put(partition); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 60b63d4c87..27f0406853 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -41,6 +41,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; @@ -117,10 +118,14 @@ static final String LOCK_FILE_NAME = "inuse.lck"; static final String INFO_FILE_NAME = "inuse.info"; - private final Map> tempTables = new HashMap<>(); + /** + * Concurrent since SessionState is often propagated to workers in thread pools + */ + private final Map> tempTables = new ConcurrentHashMap<>(); private final Map> tempTableColStats = - new HashMap>(); - private final Map tempPartitions = new HashMap<>(); + new ConcurrentHashMap<>(); + private final Map tempPartitions = + new ConcurrentHashMap<>(); protected ClassLoader parentLoader; @@ -537,8 +542,7 @@ private void attach(HiveConf conf) { * Singleton Session object per thread. * **/ - private static InheritableThreadLocal tss = - new InheritableThreadLocal() { + private static ThreadLocal tss = new ThreadLocal() { @Override protected SessionStates initialValue() { return new SessionStates(); diff --git ql/src/test/results/clientnegative/check_constraint_tbl_level.q.out ql/src/test/results/clientnegative/check_constraint_tbl_level.q.out index c18f733b1f..3ed9c25625 100644 --- ql/src/test/results/clientnegative/check_constraint_tbl_level.q.out +++ ql/src/test/results/clientnegative/check_constraint_tbl_level.q.out @@ -11,5 +11,4 @@ PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table PREHOOK: Output: default@tti #### A masked pattern was here #### -Error during job, obtaining debugging information... FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask diff --git ql/src/test/results/clientnegative/check_constraint_violation.q.out ql/src/test/results/clientnegative/check_constraint_violation.q.out index 05e3ff5a52..82f367c31d 100644 --- ql/src/test/results/clientnegative/check_constraint_violation.q.out +++ ql/src/test/results/clientnegative/check_constraint_violation.q.out @@ -11,5 +11,4 @@ PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table PREHOOK: Output: default@tti #### A masked pattern was here #### -Error during job, obtaining debugging information... FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask diff --git ql/src/test/results/clientnegative/merge_constraint_notnull.q.out ql/src/test/results/clientnegative/merge_constraint_notnull.q.out index 0fd26ffb46..088e249a04 100644 --- ql/src/test/results/clientnegative/merge_constraint_notnull.q.out +++ ql/src/test/results/clientnegative/merge_constraint_notnull.q.out @@ -52,5 +52,4 @@ PREHOOK: Output: default@testt PREHOOK: Output: default@testt PREHOOK: Output: default@testt #### A masked pattern was here #### -Error during job, obtaining debugging information... FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask