diff --git ql/src/java/org/apache/hadoop/hive/ql/Compiler.java ql/src/java/org/apache/hadoop/hive/ql/Compiler.java new file mode 100644 index 0000000000..638d80d1e4 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/Compiler.java @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.VariableSubstitution; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.ql.exec.ExplainTask; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.hooks.HookUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.AuthorizationException; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl; +import org.apache.hadoop.hive.ql.parse.ParseException; +import org.apache.hadoop.hive.ql.parse.ParseUtils; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.processors.CommandProcessorException; +import org.apache.hadoop.hive.ql.security.authorization.command.CommandAuthorizer; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.WmContext; +import org.apache.hadoop.util.StringUtils; +import org.apache.thrift.TException; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; + +/** + * The compiler compiles the command, by creating a QueryPlan from a String command. + * Also opens a transaction if necessary. + */ +public class Compiler { + private static final String CLASS_NAME = Compiler.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + private static final LogHelper CONSOLE = new LogHelper(LOG); + + private final Driver driver; + private final DriverContext driverContext; + private final DriverState driverState; + + private PerfLogger perfLogger; + private String command; + private String queryStr; + private String queryId; + + private ASTNode tree; + private BaseSemanticAnalyzer sem; + private QueryPlan plan; + + private boolean parseError; + private boolean compileError; + + public Compiler(Driver driver, DriverContext driverContext, DriverState driverState) { + this.driver = driver; + this.driverContext = driverContext; + this.driverState = driverState; + } + + // deferClose indicates if the close/destroy should be deferred when the process has been + // interrupted, it should be set to true if the compile is called within another method like + // runInternal, which defers the close to the called in that method. + public void compile(String rawCommand, boolean resetTaskIds, boolean deferClose) throws CommandProcessorException { + perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); + driverState.compilingWithLocking(); + + initialize(rawCommand, resetTaskIds); + + try { + driver.checkInterrupted("before parsing and analysing the query", null, null); + + initializeContext(); + parse(); + analyze(); + + driver.checkInterrupted("after analyzing query.", null, null); + + createPlan(); + authorize(); + explainOutput(); + } catch (CommandProcessorException cpe) { + throw cpe; + } catch (Exception e) { + driver.checkInterrupted("during query compilation: " + e.getMessage(), null, null); + handleException(e); + } finally { + cleanUp(deferClose); + } + } + + private void initialize(String rawCommand, boolean resetTaskIds) throws CommandProcessorException { + this.command = new VariableSubstitution(new HiveVariableSource() { + @Override + public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }).substitute(driverContext.getConf(), rawCommand); + + queryStr = command; + try { + // command should be redacted to avoid to logging sensitive data + queryStr = HookUtils.redactLogString(driverContext.getConf(), command); + } catch (Exception e) { + LOG.warn("WARNING! Query command could not be redacted." + e); + } + + driver.checkInterrupted("at beginning of compilation.", null, null); + + if (driverContext.getContext() != null && driverContext.getContext().getExplainAnalyze() != AnalyzeState.RUNNING) { + // close the existing ctx etc before compiling a new query, but does not destroy driver + driver.closeInProcess(false); + } + + if (resetTaskIds) { + TaskFactory.resetId(); + } + + DriverState.setDriverState(driverState); + + queryId = Strings.isNullOrEmpty(driverContext.getQueryState().getQueryId()) ? + QueryPlan.makeQueryId() : driverContext.getQueryState().getQueryId(); + + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQuerySubmission(queryId); + } + if (driverContext.getContext() != null) { + setTriggerContext(queryId); + } + //save some info for webUI for use after plan is freed + driverContext.getQueryDisplay().setQueryStr(queryStr); + driverContext.getQueryDisplay().setQueryId(queryId); + + LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr); + + driverContext.getConf().setQueryString(queryStr); + // FIXME: side effect will leave the last query set at the session level + if (SessionState.get() != null) { + SessionState.get().getConf().setQueryString(queryStr); + SessionState.get().setupQueryCurrentTimestamp(); + } + } + + private void initializeContext() throws CommandProcessorException, IOException { + if (driverContext.getContext() == null) { + driverContext.setContext(new Context(driverContext.getConf())); + setTriggerContext(queryId); + } + + driverContext.getContext().setHiveTxnManager(driverContext.getTxnManager()); + driverContext.getContext().setStatsSource(driverContext.getStatsSource()); + driverContext.getContext().setCmd(command); + driverContext.getContext().setHDFSCleanup(true); + } + + private void setTriggerContext(final String queryId) { + final long queryStartTime; + // query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not + // used queryInfo will be null, in which case we take creation of Driver instance as query start time (which is also + // the time when query display object is created) + if (driverContext.getQueryInfo() != null) { + queryStartTime = driverContext.getQueryInfo().getBeginTime(); + } else { + queryStartTime = driverContext.getQueryDisplay().getQueryStartTime(); + } + WmContext wmContext = new WmContext(queryStartTime, queryId); + driverContext.getContext().setWmContext(wmContext); + } + + private void parse() throws ParseException { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE); + + // Trigger query hook before compilation + driverContext.getHookRunner().runBeforeParseHook(command); + + try { + tree = ParseUtils.parse(command, driverContext.getContext()); + } catch (ParseException e) { + parseError = true; + throw e; + } finally { + driverContext.getHookRunner().runAfterParseHook(command, parseError); + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE); + } + + private void analyze() throws Exception { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE); + + driverContext.getHookRunner().runBeforeCompileHook(command); + + // clear CurrentFunctionsInUse set, to capture new set of functions + // that SemanticAnalyzer finds are in use + SessionState.get().getCurrentFunctionsInUse().clear(); + + // Flush the metastore cache. This assures that we don't pick up objects from a previous + // query running in this same thread. This has to be done after we get our semantic + // analyzer (this is when the connection to the metastore is made) but before we analyze, + // because at that point we need access to the objects. + Hive.get().getMSC().flushCache(); + + driverContext.setBackupContext(new Context(driverContext.getContext())); + boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks(); + + HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); + if (executeHooks) { + hookCtx.setConf(driverContext.getConf()); + hookCtx.setUserName(driverContext.getUserName()); + hookCtx.setIpAddress(SessionState.get().getUserIpAddress()); + hookCtx.setCommand(command); + hookCtx.setHiveOperation(driverContext.getQueryState().getHiveOperation()); + + tree = driverContext.getHookRunner().runPreAnalyzeHooks(hookCtx, tree); + } + + if (!driverContext.isRetrial()) { + if ((driverContext.getQueryState().getHiveOperation() != null) && + driverContext.getQueryState().getHiveOperation().equals(HiveOperation.REPLDUMP)) { + setLastReplIdForDump(driverContext.getQueryState().getConf()); + } + driverContext.setTxnType(AcidUtils.getTxnType(tree)); + openTransaction(driverContext.getTxnType()); + + generateValidTxnList(); + } + + // Do semantic analysis and plan generation + sem = SemanticAnalyzerFactory.get(driverContext.getQueryState(), tree); + sem.analyze(tree, driverContext.getContext()); + + if (executeHooks) { + hookCtx.update(sem); + driverContext.getHookRunner().runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks()); + } + + LOG.info("Semantic Analysis Completed (retrial = {})", driverContext.isRetrial()); + + // Retrieve information about cache usage for the query. + if (driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { + driverContext.setCacheUsage(sem.getCacheUsage()); + } + + // validate the plan + sem.validate(); + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); + } + + /** + * Last repl id should be captured before opening txn by current REPL DUMP operation. + * This is needed to avoid losing data which are added/modified by concurrent txns when bootstrap + * dump in progress. + * @param conf Query configurations + * @throws HiveException + * @throws TException + */ + private void setLastReplIdForDump(HiveConf conf) throws HiveException, TException { + // Last logged notification event id would be the last repl Id for the current REPl DUMP. + Hive hiveDb = Hive.get(); + Long lastReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + conf.setLong(ReplUtils.LAST_REPL_ID_KEY, lastReplId); + LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId); + } + + private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException { + if (driver.checkConcurrency() && startImplicitTxn(driverContext.getTxnManager()) && + !driverContext.getTxnManager().isTxnOpen()) { + String userFromUGI = driver.getUserFromUGI(); + driverContext.getTxnManager().openTxn(driverContext.getContext(), userFromUGI, txnType); + } + } + + private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { + //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443 + HiveOperation hiveOperation = driverContext.getQueryState().getHiveOperation(); + switch (hiveOperation == null ? HiveOperation.QUERY : hiveOperation) { + case COMMIT: + case ROLLBACK: + if (!txnManager.isTxnOpen()) { + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, hiveOperation.getOperationName()); + } + case SWITCHDATABASE: + case SET_AUTOCOMMIT: + /** + * autocommit is here for completeness. TM doesn't use it. If we want to support JDBC + * semantics (or any other definition of autocommit) it should be done at session level. + */ + case SHOWDATABASES: + case SHOWTABLES: + case SHOWCOLUMNS: + case SHOWFUNCTIONS: + case SHOWPARTITIONS: + case SHOWLOCKS: + case SHOWVIEWS: + case SHOW_ROLES: + case SHOW_ROLE_PRINCIPALS: + case SHOW_COMPACTIONS: + case SHOW_TRANSACTIONS: + case ABORT_TRANSACTIONS: + case KILL_QUERY: + return false; + //this implies that no locks are needed for such a command + default: + return !driverContext.getContext().isExplainPlan(); + } + } + + private void generateValidTxnList() throws LockException { + // Record current valid txn list that will be used throughout the query + // compilation and processing. We only do this if 1) a transaction + // was already opened and 2) the list has not been recorded yet, + // e.g., by an explicit open transaction command. + driverContext.setValidTxnListsGenerated(false); + String currentTxnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); + if (driverContext.getTxnManager().isTxnOpen() && (currentTxnString == null || currentTxnString.isEmpty())) { + try { + recordValidTxns(driverContext.getTxnManager()); + driverContext.setValidTxnListsGenerated(true); + } catch (LockException e) { + LOG.error("Exception while acquiring valid txn list", e); + throw e; + } + } + } + + // Write the current set of valid transactions into the conf file + private void recordValidTxns(HiveTxnManager txnMgr) throws LockException { + String oldTxnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); + if ((oldTxnString != null) && (oldTxnString.length() > 0)) { + throw new IllegalStateException("calling recordValidTxn() more than once in the same " + + JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); + } + ValidTxnList txnList = txnMgr.getValidTxns(); + String txnStr = txnList.toString(); + driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, txnStr); + LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId()); + } + + private void createPlan() { + // get the output schema + setSchema(); + plan = new QueryPlan(queryStr, sem, driverContext.getQueryDisplay().getQueryStartTime(), queryId, + driverContext.getQueryState().getHiveOperation(), driverContext.getSchema()); + // save the optimized plan and sql for the explain + plan.setOptimizedCBOPlan(driverContext.getContext().getCalcitePlan()); + plan.setOptimizedQueryString(driverContext.getContext().getOptimizedSql()); + driverContext.setPlan(plan); + + driverContext.getConf().set("mapreduce.workflow.id", "hive_" + queryId); + driverContext.getConf().set("mapreduce.workflow.name", queryStr); + + // initialize FetchTask right here + if (plan.getFetchTask() != null) { + plan.getFetchTask().initialize(driverContext.getQueryState(), plan, null, driverContext.getContext()); + } + } + + /** + * Get a Schema with fields represented with native Hive types. + */ + private void setSchema() { + Schema schema = new Schema(); + + // If we have a plan, prefer its logical result schema if it's available; otherwise, try digging out a fetch task; + // failing that, give up. + if (sem == null) { + LOG.info("No semantic analyzer, using empty schema."); + } else if (sem.getResultSchema() != null) { + List lst = sem.getResultSchema(); + schema = new Schema(lst, null); + } else if (sem.getFetchTask() != null) { + FetchTask ft = sem.getFetchTask(); + TableDesc td = ft.getTblDesc(); + // partitioned tables don't have tableDesc set on the FetchTask. Instead they have a list of PartitionDesc + // objects, each with a table desc. Let's try to fetch the desc for the first partition and use it's deserializer. + if (td == null && ft.getWork() != null && ft.getWork().getPartDesc() != null) { + if (ft.getWork().getPartDesc().size() > 0) { + td = ft.getWork().getPartDesc().get(0).getTableDesc(); + } + } + + if (td == null) { + LOG.info("No returning schema, using empty schema"); + } else { + String tableName = "result"; + List lst = null; + try { + lst = HiveMetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(driverContext.getConf())); + } catch (Exception e) { + LOG.warn("Error getting schema: " + StringUtils.stringifyException(e)); + } + if (lst != null) { + schema = new Schema(lst, null); + } + } + } + + LOG.info("Created Hive schema: " + schema); + driverContext.setSchema(schema); + } + + private void authorize() throws HiveException, CommandProcessorException { + // do the authorization check + if (!sem.skipAuthorization() && + HiveConf.getBoolVar(driverContext.getConf(), HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) { + + try { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION); + // Authorization check for kill query will be in KillQueryImpl + // As both admin or operation owner can perform the operation. + // Which is not directly supported in authorizer + if (driverContext.getQueryState().getHiveOperation() != HiveOperation.KILL_QUERY) { + CommandAuthorizer.doAuthorization(driverContext.getQueryState().getHiveOperation(), sem, command); + } + } catch (AuthorizationException authExp) { + CONSOLE.printError("Authorization failed:" + authExp.getMessage() + ". Use SHOW GRANT to get more details."); + throw driver.createProcessorException(403, authExp.getMessage(), "42000", null); + } finally { + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION); + } + } + } + + private void explainOutput() throws IOException { + if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT) + || driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) { + String explainOutput = getExplainOutput(sem, plan, tree); + if (explainOutput != null) { + if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) { + LOG.info("EXPLAIN output for queryid " + queryId + " : " + explainOutput); + } + if (driverContext.getConf().isWebUiQueryInfoCacheEnabled() + && driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) { + driverContext.getQueryDisplay().setExplainPlan(explainOutput); + } + } + } + } + + /** + * Returns EXPLAIN EXTENDED output for a semantically + * analyzed query. + * + * @param sem semantic analyzer for analyzed query + * @param plan query plan + * @param astTree AST tree dump + * @throws java.io.IOException + */ + private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan, + ASTNode astTree) throws IOException { + String ret = null; + ExplainTask task = new ExplainTask(); + task.initialize(driverContext.getQueryState(), plan, null, driverContext.getContext()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + try { + List> rootTasks = sem.getAllRootTasks(); + if (driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_SHOW_GRAPH)) { + JSONObject jsonPlan = task.getJSONPlan( + null, rootTasks, sem.getFetchTask(), true, true, true, sem.getCboInfo(), + plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString()); + if (jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES) != null && + jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES).length() <= + driverContext.getConf().getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_GRAPH_SIZE)) { + ret = jsonPlan.toString(); + } else { + ret = null; + } + } else { + task.getJSONPlan(ps, rootTasks, sem.getFetchTask(), false, true, true, sem.getCboInfo(), + plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString()); + ret = baos.toString(); + } + } catch (Exception e) { + LOG.warn("Exception generating explain output: " + e, e); + } + + return ret; + } + + private void handleException(Exception e) throws CommandProcessorException { + compileError = true; + ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage()); + String errorMessage = "FAILED: " + e.getClass().getSimpleName(); + if (error != ErrorMsg.GENERIC_ERROR) { + errorMessage += " [Error " + error.getErrorCode() + "]:"; + } + + // HIVE-4889 + if ((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) { + errorMessage += " " + e.getCause().getMessage(); + } else { + errorMessage += " " + e.getMessage(); + } + + if (error == ErrorMsg.TXNMGR_NOT_ACID) { + errorMessage += ". Failed command: " + queryStr; + } + + CONSOLE.printError(errorMessage, "\n" + StringUtils.stringifyException(e)); + throw driver.createProcessorException(error.getErrorCode(), errorMessage, error.getSQLState(), e); + } + + private void cleanUp(boolean deferClose) { + // Trigger post compilation hook. Note that if the compilation fails here then + // before/after execution hook will never be executed. + if (!parseError) { + try { + driverContext.getHookRunner().runAfterCompilationHook(command, compileError); + } catch (Exception e) { + LOG.warn("Failed when invoking query after-compilation hook.", e); + } + } + + double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE)/1000.00; + ImmutableMap compileHMSTimings = driver.dumpMetaCallTimingWithoutEx("compilation"); + driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings); + + boolean isInterrupted = driverState.isAborted(); + if (isInterrupted && !deferClose) { + driver.closeInProcess(true); + } + + if (isInterrupted) { + driverState.compilationInterruptedWithLocking(deferClose); + LOG.info("Compiling command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); + } else { + driverState.compilationFinishedWithLocking(compileError); + LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index bb41c15bb4..7e1172ef7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -18,10 +18,8 @@ package org.apache.hadoop.hive.ql; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.IOException; -import java.io.PrintStream; import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; @@ -48,10 +46,6 @@ import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.conf.HiveVariableSource; -import org.apache.hadoop.hive.conf.VariableSubstitution; -import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.Schema; @@ -73,12 +67,10 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.HookContext; -import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hadoop.hive.ql.hooks.PrivateHookContext; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -89,22 +81,13 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.AuthorizationException; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter; -import org.apache.hadoop.hive.ql.parse.ASTNode; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; -import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; -import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl; import org.apache.hadoop.hive.ql.parse.HiveTableName; -import org.apache.hadoop.hive.ql.parse.ParseException; -import org.apache.hadoop.hive.ql.parse.ParseUtils; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -112,18 +95,14 @@ import org.apache.hadoop.hive.ql.plan.mapper.StatsSource; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.security.authorization.command.CommandAuthorizer; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.hive.common.util.TxnIdUtils; -import org.apache.thrift.TException; -import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,7 +122,6 @@ private ByteStream.Output bos = new ByteStream.Output(); private DataInput resStream; - private Context ctx; private final DriverContext driverContext; private TaskQueue taskQueue; private final List hiveLocks = new ArrayList(); @@ -153,7 +131,7 @@ private DriverState driverState = new DriverState(); - private boolean checkConcurrency() { + public boolean checkConcurrency() { return driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); } @@ -168,61 +146,11 @@ public Schema getExplainSchema() { @Override public Context getContext() { - return ctx; + return driverContext.getContext(); } public PlanMapper getPlanMapper() { - return ctx.getPlanMapper(); - } - - /** - * Get a Schema with fields represented with native Hive types - */ - private static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) { - Schema schema = null; - - // If we have a plan, prefer its logical result schema if it's - // available; otherwise, try digging out a fetch task; failing that, - // give up. - if (sem == null) { - // can't get any info without a plan - } else if (sem.getResultSchema() != null) { - List lst = sem.getResultSchema(); - schema = new Schema(lst, null); - } else if (sem.getFetchTask() != null) { - FetchTask ft = sem.getFetchTask(); - TableDesc td = ft.getTblDesc(); - // partitioned tables don't have tableDesc set on the FetchTask. Instead - // they have a list of PartitionDesc objects, each with a table desc. - // Let's - // try to fetch the desc for the first partition and use it's - // deserializer. - if (td == null && ft.getWork() != null && ft.getWork().getPartDesc() != null) { - if (ft.getWork().getPartDesc().size() > 0) { - td = ft.getWork().getPartDesc().get(0).getTableDesc(); - } - } - - if (td == null) { - LOG.info("No returning schema."); - } else { - String tableName = "result"; - List lst = null; - try { - lst = HiveMetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf)); - } catch (Exception e) { - LOG.warn("Error getting schema: " + StringUtils.stringifyException(e)); - } - if (lst != null) { - schema = new Schema(lst, null); - } - } - } - if (schema == null) { - schema = new Schema(); - } - LOG.info("Returning Hive schema: " + schema); - return schema; + return driverContext.getContext().getPlanMapper(); } /** @@ -242,7 +170,7 @@ public Driver(HiveConf conf) { // or compile another query public Driver(HiveConf conf, Context ctx, LineageState lineageState) { this(getNewQueryState(conf, lineageState), null, null); - this.ctx = ctx; + driverContext.setContext(ctx); } // Pass lineageState when a driver instantiates another Driver to run @@ -300,260 +228,8 @@ public int compile(String command, boolean resetTaskIds) { @VisibleForTesting public void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorException { createTransactionManager(); - - PerfLogger perfLogger = SessionState.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); - driverState.compilingWithLocking(); - - command = new VariableSubstitution(new HiveVariableSource() { - @Override - public Map getHiveVariable() { - return SessionState.get().getHiveVariables(); - } - }).substitute(driverContext.getConf(), command); - - String queryStr = command; - - try { - // command should be redacted to avoid to logging sensitive data - queryStr = HookUtils.redactLogString(driverContext.getConf(), command); - } catch (Exception e) { - LOG.warn("WARNING! Query command could not be redacted." + e); - } - - checkInterrupted("at beginning of compilation.", null, null); - - if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) { - // close the existing ctx etc before compiling a new query, but does not destroy driver - closeInProcess(false); - } - - if (resetTaskIds) { - TaskFactory.resetId(); - } - - DriverState.setDriverState(driverState); - - final String queryId = Strings.isNullOrEmpty(driverContext.getQueryState().getQueryId()) ? - QueryPlan.makeQueryId() : driverContext.getQueryState().getQueryId(); - - SparkSession ss = SessionState.get().getSparkSession(); - if (ss != null) { - ss.onQuerySubmission(queryId); - } - - if (ctx != null) { - setTriggerContext(queryId); - } - //save some info for webUI for use after plan is freed - driverContext.getQueryDisplay().setQueryStr(queryStr); - driverContext.getQueryDisplay().setQueryId(queryId); - - LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr); - - driverContext.getConf().setQueryString(queryStr); - // FIXME: sideeffect will leave the last query set at the session level - if (SessionState.get() != null) { - SessionState.get().getConf().setQueryString(queryStr); - SessionState.get().setupQueryCurrentTimestamp(); - } - - // Whether any error occurred during query compilation. Used for query lifetime hook. - boolean compileError = false; - boolean parseError = false; - - try { - checkInterrupted("before parsing and analysing the query", null, null); - - if (ctx == null) { - ctx = new Context(driverContext.getConf()); - setTriggerContext(queryId); - } - - ctx.setHiveTxnManager(driverContext.getTxnManager()); - ctx.setStatsSource(driverContext.getStatsSource()); - ctx.setCmd(command); - ctx.setHDFSCleanup(true); - - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE); - - // Trigger query hook before compilation - driverContext.getHookRunner().runBeforeParseHook(command); - - ASTNode tree; - try { - tree = ParseUtils.parse(command, ctx); - } catch (ParseException e) { - parseError = true; - throw e; - } finally { - driverContext.getHookRunner().runAfterParseHook(command, parseError); - } - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE); - - driverContext.getHookRunner().runBeforeCompileHook(command); - // clear CurrentFunctionsInUse set, to capture new set of functions - // that SemanticAnalyzer finds are in use - SessionState.get().getCurrentFunctionsInUse().clear(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE); - - // Flush the metastore cache. This assures that we don't pick up objects from a previous - // query running in this same thread. This has to be done after we get our semantic - // analyzer (this is when the connection to the metastore is made) but before we analyze, - // because at that point we need access to the objects. - Hive.get().getMSC().flushCache(); - - driverContext.setBackupContext(new Context(ctx)); - boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks(); - - HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); - if (executeHooks) { - hookCtx.setConf(driverContext.getConf()); - hookCtx.setUserName(driverContext.getUserName()); - hookCtx.setIpAddress(SessionState.get().getUserIpAddress()); - hookCtx.setCommand(command); - hookCtx.setHiveOperation(driverContext.getQueryState().getHiveOperation()); - - tree = driverContext.getHookRunner().runPreAnalyzeHooks(hookCtx, tree); - } - - // Do semantic analysis and plan generation - BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(driverContext.getQueryState(), tree); - - if (!driverContext.isRetrial()) { - if ((driverContext.getQueryState().getHiveOperation() != null) && - driverContext.getQueryState().getHiveOperation().equals(HiveOperation.REPLDUMP)) { - setLastReplIdForDump(driverContext.getQueryState().getConf()); - } - driverContext.setTxnType(AcidUtils.getTxnType(tree)); - openTransaction(driverContext.getTxnType()); - - generateValidTxnList(); - } - - sem.analyze(tree, ctx); - - if (executeHooks) { - hookCtx.update(sem); - driverContext.getHookRunner().runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks()); - } - - LOG.info("Semantic Analysis Completed (retrial = {})", driverContext.isRetrial()); - - // Retrieve information about cache usage for the query. - if (driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { - driverContext.setCacheUsage(sem.getCacheUsage()); - } - - // validate the plan - sem.validate(); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); - - checkInterrupted("after analyzing query.", null, null); - - // get the output schema - driverContext.setSchema(getSchema(sem, driverContext.getConf())); - QueryPlan plan = new QueryPlan(queryStr, sem, driverContext.getQueryDisplay().getQueryStartTime(), queryId, - driverContext.getQueryState().getHiveOperation(), driverContext.getSchema()); - // save the optimized plan and sql for the explain - plan.setOptimizedCBOPlan(ctx.getCalcitePlan()); - plan.setOptimizedQueryString(ctx.getOptimizedSql()); - driverContext.setPlan(plan); - - driverContext.getConf().set("mapreduce.workflow.id", "hive_" + queryId); - driverContext.getConf().set("mapreduce.workflow.name", queryStr); - - // initialize FetchTask right here - if (plan.getFetchTask() != null) { - plan.getFetchTask().initialize(driverContext.getQueryState(), plan, null, ctx); - } - - //do the authorization check - if (!sem.skipAuthorization() && - HiveConf.getBoolVar(driverContext.getConf(), HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) { - - try { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION); - // Authorization check for kill query will be in KillQueryImpl - // As both admin or operation owner can perform the operation. - // Which is not directly supported in authorizer - if (driverContext.getQueryState().getHiveOperation() != HiveOperation.KILL_QUERY) { - CommandAuthorizer.doAuthorization(driverContext.getQueryState().getHiveOperation(), sem, command); - } - } catch (AuthorizationException authExp) { - CONSOLE.printError("Authorization failed:" + authExp.getMessage() + ". Use SHOW GRANT to get more details."); - throw createProcessorException(403, authExp.getMessage(), "42000", null); - } finally { - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION); - } - } - - if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT) - || driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) { - String explainOutput = getExplainOutput(sem, plan, tree); - if (explainOutput != null) { - if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) { - LOG.info("EXPLAIN output for queryid " + queryId + " : " + explainOutput); - } - if (driverContext.getConf().isWebUiQueryInfoCacheEnabled() - && driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) { - driverContext.getQueryDisplay().setExplainPlan(explainOutput); - } - } - } - } catch (CommandProcessorException cpe) { - throw cpe; - } catch (Exception e) { - checkInterrupted("during query compilation: " + e.getMessage(), null, null); - - compileError = true; - ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage()); - String errorMessage = "FAILED: " + e.getClass().getSimpleName(); - if (error != ErrorMsg.GENERIC_ERROR) { - errorMessage += " [Error " + error.getErrorCode() + "]:"; - } - - // HIVE-4889 - if ((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) { - errorMessage += " " + e.getCause().getMessage(); - } else { - errorMessage += " " + e.getMessage(); - } - - if (error == ErrorMsg.TXNMGR_NOT_ACID) { - errorMessage += ". Failed command: " + queryStr; - } - - CONSOLE.printError(errorMessage, "\n" + StringUtils.stringifyException(e)); - throw createProcessorException(error.getErrorCode(), errorMessage, error.getSQLState(), e); - } finally { - // Trigger post compilation hook. Note that if the compilation fails here then - // before/after execution hook will never be executed. - if (!parseError) { - try { - driverContext.getHookRunner().runAfterCompilationHook(command, compileError); - } catch (Exception e) { - LOG.warn("Failed when invoking query after-compilation hook.", e); - } - } - - double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE)/1000.00; - ImmutableMap compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation"); - driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings); - - boolean isInterrupted = driverState.isAborted(); - if (isInterrupted && !deferClose) { - closeInProcess(true); - } - - if (isInterrupted) { - driverState.compilationInterruptedWithLocking(deferClose); - LOG.info("Compiling command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); - } else { - driverState.compilationFinishedWithLocking(compileError); - LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); - } - } + Compiler compiler = new Compiler(this, driverContext, driverState); + compiler.compile(command, resetTaskIds, deferClose); } private void createTransactionManager() throws CommandProcessorException { @@ -612,12 +288,12 @@ private boolean isValidTxnListState() throws LockException { // 2) Get locks that are relevant: // - Exclusive for INSERT OVERWRITE. // - Semi-shared for UPDATE/DELETE. - if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) { + if (driverContext.getContext().getHiveLocks() == null || driverContext.getContext().getHiveLocks().isEmpty()) { // Nothing to check return true; } Set nonSharedLocks = new HashSet<>(); - for (HiveLock lock : ctx.getHiveLocks()) { + for (HiveLock lock : driverContext.getContext().getHiveLocks()) { if (lock.mayContainComponents()) { // The lock may have multiple components, e.g., DbHiveLock, hence we need // to check for each of them @@ -690,98 +366,7 @@ private boolean isValidTxnListState() throws LockException { return true; } - private void setTriggerContext(final String queryId) { - final long queryStartTime; - // query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not - // used queryInfo will be null, in which case we take creation of Driver instance as query start time (which is also - // the time when query display object is created) - if (driverContext.getQueryInfo() != null) { - queryStartTime = driverContext.getQueryInfo().getBeginTime(); - } else { - queryStartTime = driverContext.getQueryDisplay().getQueryStartTime(); - } - WmContext wmContext = new WmContext(queryStartTime, queryId); - ctx.setWmContext(wmContext); - } - - /** - * Last repl id should be captured before opening txn by current REPL DUMP operation. - * This is needed to avoid losing data which are added/modified by concurrent txns when bootstrap - * dump in progress. - * @param conf Query configurations - * @throws HiveException - * @throws TException - */ - private void setLastReplIdForDump(HiveConf conf) throws HiveException, TException { - // Last logged notification event id would be the last repl Id for the current REPl DUMP. - Hive hiveDb = Hive.get(); - Long lastReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); - conf.setLong(ReplUtils.LAST_REPL_ID_KEY, lastReplId); - LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId); - } - - private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException { - if (checkConcurrency() && startImplicitTxn(driverContext.getTxnManager()) && - !driverContext.getTxnManager().isTxnOpen()) { - String userFromUGI = getUserFromUGI(); - driverContext.getTxnManager().openTxn(ctx, userFromUGI, txnType); - } - } - - private void generateValidTxnList() throws LockException { - // Record current valid txn list that will be used throughout the query - // compilation and processing. We only do this if 1) a transaction - // was already opened and 2) the list has not been recorded yet, - // e.g., by an explicit open transaction command. - driverContext.setValidTxnListsGenerated(false); - String currentTxnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); - if (driverContext.getTxnManager().isTxnOpen() && (currentTxnString == null || currentTxnString.isEmpty())) { - try { - recordValidTxns(driverContext.getTxnManager()); - driverContext.setValidTxnListsGenerated(true); - } catch (LockException e) { - LOG.error("Exception while acquiring valid txn list", e); - throw e; - } - } - } - - private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { - boolean shouldOpenImplicitTxn = !ctx.isExplainPlan(); - //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443 - HiveOperation hiveOperation = driverContext.getQueryState().getHiveOperation(); - switch (hiveOperation == null ? HiveOperation.QUERY : hiveOperation) { - case COMMIT: - case ROLLBACK: - if (!txnManager.isTxnOpen()) { - throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, hiveOperation.getOperationName()); - } - case SWITCHDATABASE: - case SET_AUTOCOMMIT: - /** - * autocommit is here for completeness. TM doesn't use it. If we want to support JDBC - * semantics (or any other definition of autocommit) it should be done at session level. - */ - case SHOWDATABASES: - case SHOWTABLES: - case SHOWCOLUMNS: - case SHOWFUNCTIONS: - case SHOWPARTITIONS: - case SHOWLOCKS: - case SHOWVIEWS: - case SHOW_ROLES: - case SHOW_ROLE_PRINCIPALS: - case SHOW_COMPACTIONS: - case SHOW_TRANSACTIONS: - case ABORT_TRANSACTIONS: - case KILL_QUERY: - shouldOpenImplicitTxn = false; - //this implies that no locks are needed for such a command - } - return shouldOpenImplicitTxn; - } - - private void checkInterrupted(String msg, HookContext hookContext, PerfLogger perfLogger) + public void checkInterrupted(String msg, HookContext hookContext, PerfLogger perfLogger) throws CommandProcessorException { if (driverState.isAborted()) { String errorMessage = "FAILED: command has been interrupted: " + msg; @@ -797,7 +382,7 @@ private void checkInterrupted(String msg, HookContext hookContext, PerfLogger pe } } - private ImmutableMap dumpMetaCallTimingWithoutEx(String phase) { + public ImmutableMap dumpMetaCallTimingWithoutEx(String phase) { try { return Hive.get().dumpAndClearMetaCallTiming(phase); } catch (HiveException he) { @@ -806,47 +391,6 @@ private void checkInterrupted(String msg, HookContext hookContext, PerfLogger pe return null; } - /** - * Returns EXPLAIN EXTENDED output for a semantically - * analyzed query. - * - * @param sem semantic analyzer for analyzed query - * @param plan query plan - * @param astTree AST tree dump - * @throws java.io.IOException - */ - private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan, - ASTNode astTree) throws IOException { - String ret = null; - ExplainTask task = new ExplainTask(); - task.initialize(driverContext.getQueryState(), plan, null, ctx); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); - try { - List> rootTasks = sem.getAllRootTasks(); - if (driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_SHOW_GRAPH)) { - JSONObject jsonPlan = task.getJSONPlan( - null, rootTasks, sem.getFetchTask(), true, true, true, sem.getCboInfo(), - plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString()); - if (jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES) != null && - jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES).length() <= - driverContext.getConf().getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_GRAPH_SIZE)) { - ret = jsonPlan.toString(); - } else { - ret = null; - } - } else { - task.getJSONPlan(ps, rootTasks, sem.getFetchTask(), false, true, true, sem.getCboInfo(), - plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString()); - ret = baos.toString(); - } - } catch (Exception e) { - LOG.warn("Exception generating explain output: " + e, e); - } - - return ret; - } - @Override public HiveConf getConf() { return driverContext.getConf(); @@ -868,19 +412,6 @@ public FetchTask getFetchTask() { return driverContext.getFetchTask(); } - // Write the current set of valid transactions into the conf file - private void recordValidTxns(HiveTxnManager txnMgr) throws LockException { - String oldTxnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); - if ((oldTxnString != null) && (oldTxnString.length() > 0)) { - throw new IllegalStateException("calling recordValidTxn() more than once in the same " + - JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); - } - ValidTxnList txnList = txnMgr.getValidTxns(); - String txnStr = txnList.toString(); - driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, txnStr); - LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId()); - } - // Write the current set of valid write ids for the operated acid tables into the conf file so // that it can be read by the input format. private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws LockException { @@ -985,7 +516,7 @@ private void addTableFromEntity(Entity entity, Map tables) { tables.put(fullTableName, tbl); } - private String getUserFromUGI() throws CommandProcessorException { + String getUserFromUGI() throws CommandProcessorException { // Don't use the userName member, as it may or may not have been set. Get the value from // conf, which calls into getUGI to figure out who the process is running as. try { @@ -1067,8 +598,9 @@ private void acquireLocks() throws CommandProcessorException { /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ - driverContext.getTxnManager().acquireLocks(driverContext.getPlan(), ctx, userFromUGI, driverState); - final List locks = ctx.getHiveLocks(); + driverContext.getTxnManager().acquireLocks(driverContext.getPlan(), driverContext.getContext(), userFromUGI, + driverState); + final List locks = driverContext.getContext().getHiveLocks(); LOG.info("Operation {} obtained {} locks", driverContext.getPlan().getOperation(), ((locks == null) ? 0 : locks.size())); // This check is for controlling the correctness of the current state @@ -1135,14 +667,14 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa } } else { //since there is no tx, we only have locks for current query (if any) - if (ctx != null && ctx.getHiveLocks() != null) { - hiveLocks.addAll(ctx.getHiveLocks()); + if (driverContext.getContext() != null && driverContext.getContext().getHiveLocks() != null) { + hiveLocks.addAll(driverContext.getContext().getHiveLocks()); } txnMgr.releaseLocks(hiveLocks); } hiveLocks.clear(); - if (ctx != null) { - ctx.setHiveLocks(null); + if (driverContext.getContext() != null) { + driverContext.getContext().setHiveLocks(null); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS); @@ -1321,7 +853,7 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command boolean isFinishedWithError = true; try { HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(driverContext.getConf(), - alreadyCompiled ? ctx.getCmd() : command); + alreadyCompiled ? driverContext.getContext().getCmd() : command); // Get all the driver run hooks and pre-execute them. try { driverContext.getHookRunner().runPreDriverHooks(hookContext); @@ -1346,7 +878,7 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - ctx.setHiveTxnManager(driverContext.getTxnManager()); + driverContext.getContext().setHiveTxnManager(driverContext.getTxnManager()); checkInterrupted("at acquiring the lock.", null, null); @@ -1362,9 +894,9 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command // and then, we acquire locks. If snapshot is still valid, we continue as usual. // But if snapshot is not valid, we recompile the query. driverContext.setRetrial(true); - driverContext.getBackupContext().addRewrittenStatementContext(ctx); - driverContext.getBackupContext().setHiveLocks(ctx.getHiveLocks()); - ctx = driverContext.getBackupContext(); + driverContext.getBackupContext().addRewrittenStatementContext(driverContext.getContext()); + driverContext.getBackupContext().setHiveLocks(driverContext.getContext().getHiveLocks()); + driverContext.setContext(driverContext.getBackupContext()); driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, driverContext.getTxnManager().getValidTxns().toString()); if (driverContext.getPlan().hasAcidResourcesInQuery()) { @@ -1390,7 +922,7 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - ctx.setHiveTxnManager(driverContext.getTxnManager()); + driverContext.getContext().setHiveTxnManager(driverContext.getTxnManager()); } } catch (LockException e) { throw handleHiveException(e, 13); @@ -1532,7 +1064,7 @@ private boolean isExplicitLockOperation() { return false; } - private CommandProcessorException createProcessorException(int ret, String errorMessage, String sqlState, + CommandProcessorException createProcessorException(int ret, String errorMessage, String sqlState, Throwable downstreamError) { SessionState.getPerfLogger().cleanupPerfLogMetrics(); driverContext.getQueryDisplay().setErrorMessage(errorMessage); @@ -1548,7 +1080,8 @@ private CommandProcessorException createProcessorException(int ret, String error private void useFetchFromCache(CacheEntry cacheEntry) { // Change query FetchTask to use new location specified in results cache. FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork()); - fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, ctx); + fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, + driverContext.getContext()); driverContext.getPlan().setFetchTask(fetchTaskFromCache); driverContext.setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry)); } @@ -1665,11 +1198,11 @@ private void execute() throws CommandProcessorException { SessionState ss = SessionState.get(); // TODO: should this use getUserFromAuthenticator? - hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(), ctx.getPathToCS(), - SessionState.get().getUserName(), + hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(), + driverContext.getContext().getPathToCS(), SessionState.get().getUserName(), ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger, - driverContext.getQueryInfo(), ctx); + driverContext.getQueryInfo(), driverContext.getContext()); hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); driverContext.getHookRunner().runPreHooks(hookContext); @@ -1701,10 +1234,10 @@ private void execute() throws CommandProcessorException { checkInterrupted("before running tasks.", hookContext, perfLogger); - taskQueue = new TaskQueue(ctx); // for canceling the query (should be bound to session?) + taskQueue = new TaskQueue(driverContext.getContext()); // for canceling the query (should be bound to session?) taskQueue.prepare(driverContext.getPlan()); - ctx.setHDFSCleanup(true); + driverContext.getContext().setHDFSCleanup(true); SessionState.get().setMapRedStats(new LinkedHashMap<>()); SessionState.get().setStackTraces(new HashMap<>()); @@ -1800,7 +1333,7 @@ private void execute() throws CommandProcessorException { taskQueue.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value - ctx.restoreOriginalTracker(); + driverContext.getContext().restoreOriginalTracker(); throw createProcessorException(exitVal, errorMessage, sqlState, result.getTaskError()); } } @@ -1827,7 +1360,7 @@ private void execute() throws CommandProcessorException { // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value - ctx.restoreOriginalTracker(); + driverContext.getContext().restoreOriginalTracker(); if (taskQueue.isShutdown()) { String errorMessage = "FAILED: Operation cancelled"; @@ -1869,7 +1402,7 @@ private void execute() throws CommandProcessorException { checkInterrupted("during query execution: \n" + e.getMessage(), hookContext, perfLogger); - ctx.restoreOriginalTracker(); + driverContext.getContext().restoreOriginalTracker(); if (SessionState.get() != null) { SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, String.valueOf(12)); @@ -2061,7 +1594,7 @@ private TaskRunner launchTask(Task tsk, String queryId, boolean noName, taskQueue.incCurJobNo(1); CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of " + jobs); } - tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, ctx); + tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, driverContext.getContext()); TaskRunner tskRun = new TaskRunner(tsk, taskQueue); taskQueue.launching(tskRun); @@ -2107,7 +1640,7 @@ public boolean getResults(List res) throws IOException { } if (resStream == null) { - resStream = ctx.getStream(); + resStream = driverContext.getContext().getStream(); } if (resStream == null) { return false; @@ -2146,7 +1679,7 @@ public boolean getResults(List res) throws IOException { } if (ss == Utilities.StreamStatus.EOF) { - resStream = ctx.getStream(); + resStream = driverContext.getContext().getStream(); } } return true; @@ -2164,9 +1697,9 @@ public void resetFetch() throws IOException { throw new IOException("Error closing the current fetch task", e); } // FetchTask should not depend on the plan. - driverContext.getFetchTask().initialize(driverContext.getQueryState(), null, null, ctx); + driverContext.getFetchTask().initialize(driverContext.getQueryState(), null, null, driverContext.getContext()); } else { - ctx.resetStream(); + driverContext.getContext().resetStream(); resStream = null; } } @@ -2205,7 +1738,7 @@ private void releasePlan() { private void releaseContext() { try { - if (ctx != null) { + if (driverContext.getContext() != null) { boolean deleteResultDir = true; // don't let context delete result dirs and scratch dirs if result was cached if (driverContext.getCacheUsage() != null @@ -2213,12 +1746,12 @@ private void releaseContext() { deleteResultDir = false; } - ctx.clear(deleteResultDir); - if (ctx.getHiveLocks() != null) { - hiveLocks.addAll(ctx.getHiveLocks()); - ctx.setHiveLocks(null); + driverContext.getContext().clear(deleteResultDir); + if (driverContext.getContext().getHiveLocks() != null) { + hiveLocks.addAll(driverContext.getContext().getHiveLocks()); + driverContext.getContext().setHiveLocks(null); } - ctx = null; + driverContext.setContext(null); } } catch (Exception e) { LOG.debug("Exception while clearing the context ", e); @@ -2277,7 +1810,7 @@ private void releaseCachedResult() { // Close and release resources within a running query process. Since it runs under // driver state COMPILING, EXECUTING or INTERRUPT, it would not have race condition // with the releases probably running in the other closing thread. - private int closeInProcess(boolean destroyed) { + public int closeInProcess(boolean destroyed) { releaseTaskQueue(); releasePlan(); releaseCachedResult(); diff --git ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index 1afcfc8969..c435792e37 100644 --- ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -50,6 +50,7 @@ // transaction manager. private final HiveTxnManager initTxnManager; + private Context context; private QueryPlan plan; private Schema schema; @@ -110,6 +111,14 @@ public HiveTxnManager getInitTxnManager() { return initTxnManager; } + public Context getContext() { + return context; + } + + public void setContext(Context context) { + this.context = context; + } + public QueryPlan getPlan() { return plan; }