diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 1921ea7ca8..9eda4ed501 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -291,6 +291,7 @@ public DestClausePrefix getDestNamePrefix(ASTNode curNode, QB queryBlock) { public DestClausePrefix addDestNamePrefix(int pos, DestClausePrefix prefix) { return insertBranchToNamePrefix.put(pos, prefix); } + public Context(Configuration conf) throws IOException { this(conf, generateExecutionId()); } @@ -315,6 +316,48 @@ private Context(Configuration conf, String executionId) { viewsTokenRewriteStreams = new HashMap<>(); } + protected Context(Context ctx) { + // This method creates a deep copy of context, but the copy is partial, + // hence it needs to be used carefully. In particular, following objects + // are ignored: + // opContext, pathToCS, cboInfo, cboSucceeded, tokenRewriteStream, viewsTokenRewriteStreams, + // rewrittenStatementContexts, cteTables, loadTableOutputMap, planMapper, insertBranchToNamePrefix + this.isHDFSCleanup = ctx.isHDFSCleanup; + this.resFile = ctx.resFile; + this.resDir = ctx.resDir; + this.resFs = ctx.resFs; + this.resDirPaths = ctx.resDirPaths; + this.resDirFilesNum = ctx.resDirFilesNum; + this.initialized = ctx.initialized; + this.originalTracker = ctx.originalTracker; + this.nonLocalScratchPath = ctx.nonLocalScratchPath; + this.localScratchDir = ctx.localScratchDir; + this.scratchDirPermission = ctx.scratchDirPermission; + this.fsScratchDirs.putAll(ctx.fsScratchDirs); + this.conf = ctx.conf; + this.pathid = ctx.pathid; + this.explainConfig = ctx.explainConfig; + this.cmd = ctx.cmd; + this.executionId = ctx.executionId; + this.hiveLocks = ctx.hiveLocks; + this.hiveTxnManager = ctx.hiveTxnManager; + this.needLockMgr = ctx.needLockMgr; + this.sequencer = ctx.sequencer; + this.outputLockObjects.putAll(ctx.outputLockObjects); + this.stagingDir = ctx.stagingDir; + this.heartbeater = ctx.heartbeater; + this.skipTableMasking = ctx.skipTableMasking; + this.isUpdateDeleteMerge = ctx.isUpdateDeleteMerge; + this.isLoadingMaterializedView = ctx.isLoadingMaterializedView; + this.operation = ctx.operation; + this.wmContext = ctx.wmContext; + this.isExplainPlan = ctx.isExplainPlan; + this.statsSource = ctx.statsSource; + this.executionIndex = ctx.executionIndex; + this.viewsTokenRewriteStreams = new HashMap<>(); + this.rewrittenStatementContexts = new HashSet<>(); + } + public Map getFsScratchDirs() { return fsScratchDirs; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 52799b30c3..560328c05f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -39,15 +39,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; -import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -60,11 +61,11 @@ import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.ColumnType; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; @@ -92,6 +93,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -141,6 +143,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.hive.common.util.TxnIdUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -624,53 +627,50 @@ public void run() { // because at that point we need access to the objects. Hive.get().getMSC().flushCache(); - BaseSemanticAnalyzer sem; // Do semantic analysis and plan generation + Context backupContext = new Context(ctx); + BaseSemanticAnalyzer sem; if (hookRunner.hasPreAnalyzeHooks()) { - HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); - hookCtx.setConf(conf); - hookCtx.setUserName(userName); - hookCtx.setIpAddress(SessionState.get().getUserIpAddress()); - hookCtx.setCommand(command); - hookCtx.setHiveOperation(queryState.getHiveOperation()); - - tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree); - sem = SemanticAnalyzerFactory.get(queryState, tree); - openTransaction(); - // TODO: Lock acquisition should be moved before this method call - // when we want to implement lock-based concurrency control - generateValidTxnList(); - sem.analyze(tree, ctx); - hookCtx.update(sem); - - hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks()); + sem = semanticAnalyzeWithPreHooks(command, tree, false); + if (sem == null) { + // We could not generate the plan because the snapshot was outdated + // when locks were acquired, hence regenerate context, txn list and + // retry + backupContext.addRewrittenStatementContext(ctx); + backupContext.setHiveLocks(ctx.getHiveLocks()); + ctx = backupContext; + conf.set(ValidTxnList.VALID_TXNS_KEY, txnMgr.getValidTxns().toString()); + if (plan.hasAcidResourcesInQuery()) { + recordValidWriteIds(queryTxnMgr); + } + sem = semanticAnalyzeWithPreHooks(command, tree, true); + if (sem == null) { + // Throw exception + throw new HiveException("Operation could not be executed"); + } + } } else { - sem = SemanticAnalyzerFactory.get(queryState, tree); - openTransaction(); - // TODO: Lock acquisition should be moved before this method call - // when we want to implement lock-based concurrency control - generateValidTxnList(); - sem.analyze(tree, ctx); + sem = semanticAnalyze(command, tree, false); + if (sem == null) { + // We could not generate the plan because the snapshot was outdated + // when locks were acquired, hence regenerate context, txn list and + // retry + backupContext.addRewrittenStatementContext(ctx); + backupContext.setHiveLocks(ctx.getHiveLocks()); + ctx = backupContext; + conf.set(ValidTxnList.VALID_TXNS_KEY, txnMgr.getValidTxns().toString()); + if (plan.hasAcidResourcesInQuery()) { + recordValidWriteIds(queryTxnMgr); + } + sem = semanticAnalyze(command, tree, true); + if (sem == null) { + // Throw exception + throw new HiveException("Operation could not be executed"); + } + } } LOG.info("Semantic Analysis Completed"); - // Retrieve information about cache usage for the query. - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { - cacheUsage = sem.getCacheUsage(); - } - - // validate the plan - sem.validate(); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); - - checkInterrupted("after analyzing query.", null, null); - - // get the output schema - schema = getSchema(sem, conf); - plan = new QueryPlan(queryStr, sem, queryDisplay.getQueryStartTime(), queryId, - queryState.getHiveOperation(), schema); - - conf.set("mapreduce.workflow.id", "hive_" + queryId); conf.set("mapreduce.workflow.name", queryStr); @@ -776,6 +776,180 @@ public void run() { } } + private BaseSemanticAnalyzer semanticAnalyzeWithPreHooks(String command, ASTNode tree, boolean retrial) + throws HiveException, CommandProcessorResponse { + // TODO: Lock acquisition should be moved before analyze, this is a bit hackish. + // Currently, we acquire a snapshot, we compile the query wrt that snapshot, + // and then, we acquire locks. If snapshot is still valid, we continue as usual. + // But if snapshot is not valid, we recompile the query. + PerfLogger perfLogger = SessionState.getPerfLogger(); + + HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); + hookCtx.setConf(conf); + hookCtx.setUserName(userName); + hookCtx.setIpAddress(SessionState.get().getUserIpAddress()); + hookCtx.setCommand(command); + hookCtx.setHiveOperation(queryState.getHiveOperation()); + + tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree); + BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree); + if (!retrial) { + openTransaction(); + generateValidTxnList(); + } + sem.analyze(tree, ctx); + checkInterrupted("after analyzing query.", null, null); + + // Retrieve information about cache usage for the query. + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { + cacheUsage = sem.getCacheUsage(); + } + + // validate the plan + sem.validate(); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); + + schema = getSchema(sem, conf); + plan = new QueryPlan(conf.getQueryString(), sem, queryDisplay.getQueryStartTime(), queryDisplay.getQueryId(), + queryState.getHiveOperation(), schema); + + // the reason that we set the txn manager for the cxt here is because each + // query has its own ctx object. The txn mgr is shared across the + // same instance of Driver, which can run multiple queries. + ctx.setHiveTxnManager(queryTxnMgr); + checkInterrupted("at acquiring the lock.", null, null); + if (!retrial) { + lockAndRespond(); + } + if (!isValidTxnListState()) { + // We could not validate state, we return null + return null; + } + hookCtx.update(sem); + hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks()); + return sem; + } + + private BaseSemanticAnalyzer semanticAnalyze(String command, ASTNode tree, boolean retrial) + throws HiveException, CommandProcessorResponse { + // TODO: Lock acquisition should be moved before analyze, this is a bit hackish. + // Currently, we acquire a snapshot, we compile the query wrt that snapshot, + // and then, we acquire locks. If snapshot is still valid, we continue as usual. + // But if snapshot is not valid, we recompile the query. + PerfLogger perfLogger = SessionState.getPerfLogger(); + + BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree); + if (!retrial) { + openTransaction(); + generateValidTxnList(); + } + sem.analyze(tree, ctx); + checkInterrupted("after analyzing query.", null, null); + + // Retrieve information about cache usage for the query. + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { + cacheUsage = sem.getCacheUsage(); + } + + // validate the plan + sem.validate(); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); + + schema = getSchema(sem, conf); + plan = new QueryPlan(conf.getQueryString(), sem, queryDisplay.getQueryStartTime(), queryDisplay.getQueryId(), + queryState.getHiveOperation(), schema); + + // the reason that we set the txn manager for the cxt here is because each + // query has its own ctx object. The txn mgr is shared across the + // same instance of Driver, which can run multiple queries. + ctx.setHiveTxnManager(queryTxnMgr); + if (!retrial) { + checkInterrupted("at acquiring the lock.", null, null); + lockAndRespond(); + } + if (!isValidTxnListState()) { + // We could not validate state, we return null + return null; + } + return sem; + } + + // Checks whether txn list has been invalidated while planning the query. + // This would happen if query requires exclusive/semi-shared lock, and there + // has been a committed transaction on the table over which the lock is + // required. + private boolean isValidTxnListState() throws LockException, HiveException { + // 1) Get valid txn list. + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + ValidTxnList currentTxnList = queryTxnMgr.getValidTxns(); + String currentTxnString = currentTxnList.toString(); + if (currentTxnString.equals(txnString)) { + // Still valid, nothing more to do + return true; + } + // 2) Get locks that are relevant: + // - Exclusive for INSERT OVERWRITE. + // - Semi-shared for UPDATE/DELETE. + if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) { + // Nothing to check + return true; + } + Set nonSharedLocks = new HashSet<>(); + for (HiveLock lock : ctx.getHiveLocks()) { + if (lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE || + lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) { + if (lock.getHiveLockObject().getPaths().length == 2) { + // Pos 0 of lock paths array contains dbname, pos 1 contains tblname + nonSharedLocks.add( + Warehouse.getQualifiedName( + lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1])); + } + } + } + // 3) Get txn tables that are being written + ValidTxnWriteIdList txnWriteIdList = + new ValidTxnWriteIdList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); + if (txnWriteIdList == null) { + // Nothing to check + return true; + } + List> writtenTables = getWrittenTableList(plan); + ValidTxnWriteIdList currentTxnWriteIds = + queryTxnMgr.getValidWriteIds( + writtenTables.stream() + .filter(e -> AcidUtils.isTransactionalTable(e.getRight())) + .map(e -> e.getLeft()) + .collect(Collectors.toList()), + currentTxnString); + for (Pair tableInfo : writtenTables) { + String fullQNameForLock = Warehouse.getQualifiedName( + tableInfo.getRight().getDbName(), + MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName())); + if (nonSharedLocks.contains(fullQNameForLock)) { + // Check if table is transactional + if (AcidUtils.isTransactionalTable(tableInfo.getRight())) { + // Check that write id is still valid + if (!TxnIdUtils.checkEquivalentWriteIds( + txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()), + currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) { + // Write id has changed, it is not valid anymore, + // we need to recompile + return false; + } + } + nonSharedLocks.remove(fullQNameForLock); + } + } + if (!nonSharedLocks.isEmpty()) { + throw new HiveException("Wrong state: non-shared locks contain information for tables that have not" + + " been visited when trying to validate the locks from query tables.\n" + + "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" + + "Remaining locks after check: " + nonSharedLocks); + } + // It passes the test, it is valid + return true; + } + private void setTriggerContext(final String queryId) { final long queryStartTime; // query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not @@ -1394,6 +1568,34 @@ private void addTableFromEntity(Entity entity, Collection tableList) { tableList.add(fullTableName); } + // Make the list of transactional tables list which are getting written by current txn + private List> getWrittenTableList(QueryPlan plan) { + List> result = new ArrayList<>(); + Set tableList = new HashSet<>(); + for (WriteEntity output : plan.getOutputs()) { + Table tbl; + switch (output.getType()) { + case TABLE: { + tbl = output.getTable(); + break; + } + case PARTITION: + case DUMMYPARTITION: { + tbl = output.getPartition().getTable(); + break; + } + default: { + continue; + } + } + String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName()); + if (tableList.add(fullTableName)) { + result.add(new ImmutablePair(fullTableName, tbl)); + } + } + return result; + } + private String getUserFromUGI() { // Don't use the userName member, as it may or may not have been set. Get the value from // conf, which calls into getUGI to figure out who the process is running as. @@ -1464,7 +1666,6 @@ private void acquireLocks() throws CommandProcessorResponse { acidDdlDesc.setWriteId(writeId); } - /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); @@ -1813,15 +2014,6 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command // Any value from compilation phase can be obtained through the map set in queryDisplay during compilation. PerfLogger perfLogger = SessionState.getPerfLogger(true); - // the reason that we set the txn manager for the cxt here is because each - // query has its own ctx object. The txn mgr is shared across the - // same instance of Driver, which can run multiple queries. - ctx.setHiveTxnManager(queryTxnMgr); - - checkInterrupted("at acquiring the lock.", null, null); - - lockAndRespond(); - try { execute(); } catch (CommandProcessorResponse cpr) {