diff --git ql/src/java/org/apache/hadoop/hive/ql/Compiler.java ql/src/java/org/apache/hadoop/hive/ql/Compiler.java new file mode 100644 index 0000000000..096fbdc85e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/Compiler.java @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveVariableSource; +import org.apache.hadoop.hive.conf.VariableSubstitution; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.ql.exec.ExplainTask; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.hooks.HookUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.AuthorizationException; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl; +import org.apache.hadoop.hive.ql.parse.ParseException; +import org.apache.hadoop.hive.ql.parse.ParseUtils; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.processors.CommandProcessorException; +import org.apache.hadoop.hive.ql.security.authorization.command.CommandAuthorizer; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.util.StringUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableMap; + +/** + * The compiler compiles the command, by creating a QueryPlan from a String command. + * Also opens a transaction if necessary. + */ +public class Compiler { + private static final String CLASS_NAME = Driver.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + private static final LogHelper CONSOLE = new LogHelper(LOG); + + private final Context context; + private final DriverContext driverContext; + private final DriverState driverState; + + private ASTNode tree; + + public Compiler(Context context, DriverContext driverContext, DriverState driverState) { + this.context = context; + this.driverContext = driverContext; + this.driverState = driverState; + } + + /** + * @param deferClose indicates if the close/destroy should be deferred when the process has been interrupted + * it should be set to true if the compile is called within another method like runInternal, + * which defers the close to the called in that method. + */ + public QueryPlan compile(String rawCommand, boolean deferClose) throws CommandProcessorException { + initialize(rawCommand); + + boolean compileError = false; + boolean parsed = false; + QueryPlan plan = null; + try { + DriverUtils.checkInterrupted(driverState, driverContext, "before parsing and analysing the query", null, null); + + parse(); + parsed = true; + BaseSemanticAnalyzer sem = analyze(); + + DriverUtils.checkInterrupted(driverState, driverContext, "after analyzing query.", null, null); + + plan = createPlan(sem); + authorize(sem); + explainOutput(sem, plan); + } catch (CommandProcessorException cpe) { + compileError = true; + throw cpe; + } catch (Exception e) { + compileError = true; + DriverUtils.checkInterrupted(driverState, driverContext, "during query compilation: " + e.getMessage(), null, + null); + handleException(e); + } finally { + cleanUp(compileError, parsed, deferClose); + } + + return plan; + } + + private void initialize(String rawCommand) throws CommandProcessorException { + SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); + driverState.compilingWithLocking(); + + VariableSubstitution variableSubstitution = new VariableSubstitution(new HiveVariableSource() { + @Override + public Map getHiveVariable() { + return SessionState.get().getHiveVariables(); + } + }); + String command = variableSubstitution.substitute(driverContext.getConf(), rawCommand); + + String queryStr = command; + try { + // command should be redacted to avoid to logging sensitive data + queryStr = HookUtils.redactLogString(driverContext.getConf(), command); + } catch (Exception e) { + LOG.warn("WARNING! Query command could not be redacted." + e); + } + + DriverUtils.checkInterrupted(driverState, driverContext, "at beginning of compilation.", null, null); + + context.setCmd(command); + driverContext.getQueryDisplay().setQueryStr(queryStr); + LOG.info("Compiling command(queryId=" + driverContext.getQueryId() + "): " + queryStr); + + driverContext.getConf().setQueryString(queryStr); + // FIXME: side effect will leave the last query set at the session level + if (SessionState.get() != null) { + SessionState.get().getConf().setQueryString(queryStr); + SessionState.get().setupQueryCurrentTimestamp(); + } + } + + private void parse() throws ParseException { + SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.PARSE); + + // Trigger query hook before compilation + driverContext.getHookRunner().runBeforeParseHook(context.getCmd()); + + boolean success = false; + try { + tree = ParseUtils.parse(context.getCmd(), context); + success = true; + } finally { + driverContext.getHookRunner().runAfterParseHook(context.getCmd(), !success); + } + SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.PARSE); + } + + private BaseSemanticAnalyzer analyze() throws Exception { + SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE); + + driverContext.getHookRunner().runBeforeCompileHook(context.getCmd()); + + // clear CurrentFunctionsInUse set, to capture new set of functions + // that SemanticAnalyzer finds are in use + SessionState.get().getCurrentFunctionsInUse().clear(); + + // Flush the metastore cache. This assures that we don't pick up objects from a previous + // query running in this same thread. This has to be done after we get our semantic + // analyzer (this is when the connection to the metastore is made) but before we analyze, + // because at that point we need access to the objects. + Hive.get().getMSC().flushCache(); + + driverContext.setBackupContext(new Context(context)); + boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks(); + + HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); + if (executeHooks) { + hookCtx.setConf(driverContext.getConf()); + hookCtx.setUserName(driverContext.getUserName()); + hookCtx.setIpAddress(SessionState.get().getUserIpAddress()); + hookCtx.setCommand(context.getCmd()); + hookCtx.setHiveOperation(driverContext.getQueryState().getHiveOperation()); + + tree = driverContext.getHookRunner().runPreAnalyzeHooks(hookCtx, tree); + } + + // SemanticAnalyzerFactory also sets the hive operation in query state + BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(driverContext.getQueryState(), tree); + + if (!driverContext.isRetrial()) { + if ((driverContext.getQueryState().getHiveOperation() != null) && + driverContext.getQueryState().getHiveOperation().equals(HiveOperation.REPLDUMP)) { + setLastReplIdForDump(driverContext.getQueryState().getConf()); + } + driverContext.setTxnType(AcidUtils.getTxnType(driverContext.getConf(), tree)); + openTransaction(driverContext.getTxnType()); + + generateValidTxnList(); + } + + // Do semantic analysis and plan generation + sem.analyze(tree, context); + + if (executeHooks) { + hookCtx.update(sem); + driverContext.getHookRunner().runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks()); + } + + LOG.info("Semantic Analysis Completed (retrial = {})", driverContext.isRetrial()); + + // Retrieve information about cache usage for the query. + if (driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { + driverContext.setCacheUsage(sem.getCacheUsage()); + } + + // validate the plan + sem.validate(); + + SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); + + return sem; + } + + /** + * Last repl id should be captured before opening txn by current REPL DUMP operation. + * This is needed to avoid losing data which are added/modified by concurrent txns when bootstrap + * dump in progress. + * @param conf Query configurations + * @throws HiveException + * @throws TException + */ + private void setLastReplIdForDump(HiveConf conf) throws HiveException, TException { + // Last logged notification event id would be the last repl Id for the current REPl DUMP. + Hive hiveDb = Hive.get(); + Long lastReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); + conf.setLong(ReplUtils.LAST_REPL_ID_KEY, lastReplId); + LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId); + } + + private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException { + if (DriverUtils.checkConcurrency(driverContext) && startImplicitTxn(driverContext.getTxnManager()) && + !driverContext.getTxnManager().isTxnOpen()) { + String userFromUGI = DriverUtils.getUserFromUGI(driverContext); + driverContext.getTxnManager().openTxn(context, userFromUGI, txnType); + } + } + + private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { + //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443 + HiveOperation hiveOperation = driverContext.getQueryState().getHiveOperation(); + switch (hiveOperation == null ? HiveOperation.QUERY : hiveOperation) { + case COMMIT: + case ROLLBACK: + if (!txnManager.isTxnOpen()) { + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, hiveOperation.getOperationName()); + } + case SWITCHDATABASE: + case SET_AUTOCOMMIT: + /** + * autocommit is here for completeness. TM doesn't use it. If we want to support JDBC + * semantics (or any other definition of autocommit) it should be done at session level. + */ + case SHOWDATABASES: + case SHOWTABLES: + case SHOWCOLUMNS: + case SHOWFUNCTIONS: + case SHOWPARTITIONS: + case SHOWLOCKS: + case SHOWVIEWS: + case SHOW_ROLES: + case SHOW_ROLE_PRINCIPALS: + case SHOW_COMPACTIONS: + case SHOW_TRANSACTIONS: + case ABORT_TRANSACTIONS: + case KILL_QUERY: + return false; + //this implies that no locks are needed for such a command + default: + return !context.isExplainPlan(); + } + } + + private void generateValidTxnList() throws LockException { + // Record current valid txn list that will be used throughout the query + // compilation and processing. We only do this if 1) a transaction + // was already opened and 2) the list has not been recorded yet, + // e.g., by an explicit open transaction command. + driverContext.setValidTxnListsGenerated(false); + String currentTxnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); + if (driverContext.getTxnManager().isTxnOpen() && (currentTxnString == null || currentTxnString.isEmpty())) { + try { + recordValidTxns(driverContext.getTxnManager()); + driverContext.setValidTxnListsGenerated(true); + } catch (LockException e) { + LOG.error("Exception while acquiring valid txn list", e); + throw e; + } + } + } + + // Write the current set of valid transactions into the conf file + private void recordValidTxns(HiveTxnManager txnMgr) throws LockException { + String oldTxnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); + if ((oldTxnString != null) && (oldTxnString.length() > 0)) { + throw new IllegalStateException("calling recordValidTxn() more than once in the same " + + JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); + } + ValidTxnList txnList = txnMgr.getValidTxns(); + String txnStr = txnList.toString(); + driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, txnStr); + LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId()); + } + + private QueryPlan createPlan(BaseSemanticAnalyzer sem) { + // get the output schema + setSchema(sem); + QueryPlan plan = new QueryPlan(driverContext.getQueryString(), sem, + driverContext.getQueryDisplay().getQueryStartTime(), driverContext.getQueryId(), + driverContext.getQueryState().getHiveOperation(), driverContext.getSchema()); + // save the optimized plan and sql for the explain + plan.setOptimizedCBOPlan(context.getCalcitePlan()); + plan.setOptimizedQueryString(context.getOptimizedSql()); + + driverContext.getConf().set("mapreduce.workflow.id", "hive_" + driverContext.getQueryId()); + driverContext.getConf().set("mapreduce.workflow.name", driverContext.getQueryString()); + + // initialize FetchTask right here + if (plan.getFetchTask() != null) { + plan.getFetchTask().initialize(driverContext.getQueryState(), plan, null, context); + } + + return plan; + } + + /** + * Get a Schema with fields represented with native Hive types. + */ + private void setSchema(BaseSemanticAnalyzer sem) { + Schema schema = new Schema(); + + // If we have a plan, prefer its logical result schema if it's available; otherwise, try digging out a fetch task; + // failing that, give up. + if (sem == null) { + LOG.info("No semantic analyzer, using empty schema."); + } else if (sem.getResultSchema() != null) { + List lst = sem.getResultSchema(); + schema = new Schema(lst, null); + } else if (sem.getFetchTask() != null) { + FetchTask ft = sem.getFetchTask(); + TableDesc td = ft.getTblDesc(); + // partitioned tables don't have tableDesc set on the FetchTask. Instead they have a list of PartitionDesc + // objects, each with a table desc. Let's try to fetch the desc for the first partition and use it's deserializer. + if (td == null && ft.getWork() != null && ft.getWork().getPartDesc() != null) { + if (ft.getWork().getPartDesc().size() > 0) { + td = ft.getWork().getPartDesc().get(0).getTableDesc(); + } + } + + if (td == null) { + LOG.info("No returning schema, using empty schema"); + } else { + String tableName = "result"; + List lst = null; + try { + lst = HiveMetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(driverContext.getConf())); + } catch (Exception e) { + LOG.warn("Error getting schema: " + StringUtils.stringifyException(e)); + } + if (lst != null) { + schema = new Schema(lst, null); + } + } + } + + LOG.info("Created Hive schema: " + schema); + driverContext.setSchema(schema); + } + + private void authorize(BaseSemanticAnalyzer sem) throws HiveException, CommandProcessorException { + // do the authorization check + if (!sem.skipAuthorization() && + HiveConf.getBoolVar(driverContext.getConf(), HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) { + + try { + SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION); + // Authorization check for kill query will be in KillQueryImpl + // As both admin or operation owner can perform the operation. + // Which is not directly supported in authorizer + if (driverContext.getQueryState().getHiveOperation() != HiveOperation.KILL_QUERY) { + CommandAuthorizer.doAuthorization(driverContext.getQueryState().getHiveOperation(), sem, context.getCmd()); + } + } catch (AuthorizationException authExp) { + CONSOLE.printError("Authorization failed:" + authExp.getMessage() + ". Use SHOW GRANT to get more details."); + throw DriverUtils.createProcessorException(driverContext, 403, authExp.getMessage(), "42000", null); + } finally { + SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION); + } + } + } + + private void explainOutput(BaseSemanticAnalyzer sem, QueryPlan plan) throws IOException { + if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT) || + driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) { + String explainOutput = ExplainTask.getExplainOutput(sem, plan, tree, driverContext.getQueryState(), + context, driverContext.getConf()); + if (explainOutput != null) { + if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) { + LOG.info("EXPLAIN output for queryid " + driverContext.getQueryId() + " : " + explainOutput); + } + if (driverContext.getConf().isWebUiQueryInfoCacheEnabled() && + driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) { + driverContext.getQueryDisplay().setExplainPlan(explainOutput); + } + } + } + } + + private void handleException(Exception e) throws CommandProcessorException { + ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage()); + String errorMessage = "FAILED: " + e.getClass().getSimpleName(); + if (error != ErrorMsg.GENERIC_ERROR) { + errorMessage += " [Error " + error.getErrorCode() + "]:"; + } + + // HIVE-4889 + if ((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) { + errorMessage += " " + e.getCause().getMessage(); + } else { + errorMessage += " " + e.getMessage(); + } + + if (error == ErrorMsg.TXNMGR_NOT_ACID) { + errorMessage += ". Failed command: " + driverContext.getQueryString(); + } + + CONSOLE.printError(errorMessage, "\n" + StringUtils.stringifyException(e)); + throw DriverUtils.createProcessorException(driverContext, error.getErrorCode(), errorMessage, error.getSQLState(), + e); + } + + private void cleanUp(boolean compileError, boolean parsed, boolean deferClose) { + // Trigger post compilation hook. Note that if the compilation fails here then + // before/after execution hook will never be executed. + if (parsed) { + try { + driverContext.getHookRunner().runAfterCompilationHook(context.getCmd(), compileError); + } catch (Exception e) { + LOG.warn("Failed when invoking query after-compilation hook.", e); + } + } + + double duration = SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE) / 1000.00; + ImmutableMap compileHMSTimings = Hive.dumpMetaCallTimingWithoutEx("compilation"); + driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings); + + if (driverState.isAborted()) { + driverState.compilationInterruptedWithLocking(deferClose); + LOG.info("Compiling command(queryId={}) has been interrupted after {} seconds", driverContext.getQueryId(), + duration); + } else { + driverState.compilationFinishedWithLocking(compileError); + LOG.info("Completed compiling command(queryId={}); Time taken: {} seconds", driverContext.getQueryId(), + duration); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 54d12baa96..b9c076e399 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -18,10 +18,8 @@ package org.apache.hadoop.hive.ql; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.IOException; -import java.io.PrintStream; import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; @@ -48,10 +46,6 @@ import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.conf.HiveVariableSource; -import org.apache.hadoop.hive.conf.VariableSubstitution; -import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.Schema; @@ -73,12 +67,10 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.HookContext; -import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hadoop.hive.ql.hooks.PrivateHookContext; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -89,22 +81,14 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.AuthorizationException; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter; -import org.apache.hadoop.hive.ql.parse.ASTNode; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; -import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; -import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl; import org.apache.hadoop.hive.ql.parse.HiveTableName; -import org.apache.hadoop.hive.ql.parse.ParseException; -import org.apache.hadoop.hive.ql.parse.ParseUtils; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -112,7 +96,6 @@ import org.apache.hadoop.hive.ql.plan.mapper.StatsSource; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.security.authorization.command.CommandAuthorizer; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -122,8 +105,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.hive.common.util.TxnIdUtils; -import org.apache.thrift.TException; -import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,7 +124,7 @@ private ByteStream.Output bos = new ByteStream.Output(); private DataInput resStream; - private Context ctx; + private Context context; private final DriverContext driverContext; private TaskQueue taskQueue; private final List hiveLocks = new ArrayList(); @@ -153,10 +134,6 @@ private DriverState driverState = new DriverState(); - private boolean checkConcurrency() { - return driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); - } - @Override public Schema getSchema() { return driverContext.getSchema(); @@ -168,61 +145,11 @@ public Schema getExplainSchema() { @Override public Context getContext() { - return ctx; + return context; } public PlanMapper getPlanMapper() { - return ctx.getPlanMapper(); - } - - /** - * Get a Schema with fields represented with native Hive types - */ - private static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) { - Schema schema = null; - - // If we have a plan, prefer its logical result schema if it's - // available; otherwise, try digging out a fetch task; failing that, - // give up. - if (sem == null) { - // can't get any info without a plan - } else if (sem.getResultSchema() != null) { - List lst = sem.getResultSchema(); - schema = new Schema(lst, null); - } else if (sem.getFetchTask() != null) { - FetchTask ft = sem.getFetchTask(); - TableDesc td = ft.getTblDesc(); - // partitioned tables don't have tableDesc set on the FetchTask. Instead - // they have a list of PartitionDesc objects, each with a table desc. - // Let's - // try to fetch the desc for the first partition and use it's - // deserializer. - if (td == null && ft.getWork() != null && ft.getWork().getPartDesc() != null) { - if (ft.getWork().getPartDesc().size() > 0) { - td = ft.getWork().getPartDesc().get(0).getTableDesc(); - } - } - - if (td == null) { - LOG.info("No returning schema."); - } else { - String tableName = "result"; - List lst = null; - try { - lst = HiveMetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf)); - } catch (Exception e) { - LOG.warn("Error getting schema: " + StringUtils.stringifyException(e)); - } - if (lst != null) { - schema = new Schema(lst, null); - } - } - } - if (schema == null) { - schema = new Schema(); - } - LOG.info("Returning Hive schema: " + schema); - return schema; + return context.getPlanMapper(); } /** @@ -242,7 +169,7 @@ public Driver(HiveConf conf) { // or compile another query public Driver(HiveConf conf, Context ctx, LineageState lineageState) { this(getNewQueryState(conf, lineageState), null, null); - this.ctx = ctx; + context = ctx; } // Pass lineageState when a driver instantiates another Driver to run @@ -299,261 +226,30 @@ public int compile(String command, boolean resetTaskIds) { // runInternal, which defers the close to the called in that method. @VisibleForTesting public void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorException { - createTransactionManager(); + preparForCompile(resetTaskIds); - PerfLogger perfLogger = SessionState.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); - driverState.compilingWithLocking(); + Compiler compiler = new Compiler(context, driverContext, driverState); + QueryPlan plan = compiler.compile(command, deferClose); + driverContext.setPlan(plan); - command = new VariableSubstitution(new HiveVariableSource() { - @Override - public Map getHiveVariable() { - return SessionState.get().getHiveVariables(); - } - }).substitute(driverContext.getConf(), command); - - String queryStr = command; + compileFinished(deferClose); + } - try { - // command should be redacted to avoid to logging sensitive data - queryStr = HookUtils.redactLogString(driverContext.getConf(), command); - } catch (Exception e) { - LOG.warn("WARNING! Query command could not be redacted." + e); + private void compileFinished(boolean deferClose) { + if (DriverState.getDriverState().isAborted() && !deferClose) { + closeInProcess(true); } + } - checkInterrupted("at beginning of compilation.", null, null); - - if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) { - // close the existing ctx etc before compiling a new query, but does not destroy driver - closeInProcess(false); - } + private void preparForCompile(boolean resetTaskIds) throws CommandProcessorException { + createTransactionManager(); + DriverState.setDriverState(driverState); + prepareContext(); + setQueryId(); if (resetTaskIds) { TaskFactory.resetId(); } - - DriverState.setDriverState(driverState); - - final String queryId = Strings.isNullOrEmpty(driverContext.getQueryState().getQueryId()) ? - QueryPlan.makeQueryId() : driverContext.getQueryState().getQueryId(); - - SparkSession ss = SessionState.get().getSparkSession(); - if (ss != null) { - ss.onQuerySubmission(queryId); - } - - if (ctx != null) { - setTriggerContext(queryId); - } - //save some info for webUI for use after plan is freed - driverContext.getQueryDisplay().setQueryStr(queryStr); - driverContext.getQueryDisplay().setQueryId(queryId); - - LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr); - - driverContext.getConf().setQueryString(queryStr); - // FIXME: sideeffect will leave the last query set at the session level - if (SessionState.get() != null) { - SessionState.get().getConf().setQueryString(queryStr); - SessionState.get().setupQueryCurrentTimestamp(); - } - - // Whether any error occurred during query compilation. Used for query lifetime hook. - boolean compileError = false; - boolean parseError = false; - - try { - checkInterrupted("before parsing and analysing the query", null, null); - - if (ctx == null) { - ctx = new Context(driverContext.getConf()); - setTriggerContext(queryId); - } - - ctx.setHiveTxnManager(driverContext.getTxnManager()); - ctx.setStatsSource(driverContext.getStatsSource()); - ctx.setCmd(command); - ctx.setHDFSCleanup(true); - - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE); - - // Trigger query hook before compilation - driverContext.getHookRunner().runBeforeParseHook(command); - - ASTNode tree; - try { - tree = ParseUtils.parse(command, ctx); - } catch (ParseException e) { - parseError = true; - throw e; - } finally { - driverContext.getHookRunner().runAfterParseHook(command, parseError); - } - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE); - - driverContext.getHookRunner().runBeforeCompileHook(command); - // clear CurrentFunctionsInUse set, to capture new set of functions - // that SemanticAnalyzer finds are in use - SessionState.get().getCurrentFunctionsInUse().clear(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE); - - // Flush the metastore cache. This assures that we don't pick up objects from a previous - // query running in this same thread. This has to be done after we get our semantic - // analyzer (this is when the connection to the metastore is made) but before we analyze, - // because at that point we need access to the objects. - Hive.get().getMSC().flushCache(); - - driverContext.setBackupContext(new Context(ctx)); - boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks(); - - HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); - if (executeHooks) { - hookCtx.setConf(driverContext.getConf()); - hookCtx.setUserName(driverContext.getUserName()); - hookCtx.setIpAddress(SessionState.get().getUserIpAddress()); - hookCtx.setCommand(command); - hookCtx.setHiveOperation(driverContext.getQueryState().getHiveOperation()); - - tree = driverContext.getHookRunner().runPreAnalyzeHooks(hookCtx, tree); - } - - // Do semantic analysis and plan generation - BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(driverContext.getQueryState(), tree); - - if (!driverContext.isRetrial()) { - if ((driverContext.getQueryState().getHiveOperation() != null) && - driverContext.getQueryState().getHiveOperation().equals(HiveOperation.REPLDUMP)) { - setLastReplIdForDump(driverContext.getQueryState().getConf()); - } - driverContext.setTxnType(AcidUtils.getTxnType(driverContext.getConf(), tree)); - openTransaction(driverContext.getTxnType()); - - generateValidTxnList(); - } - - sem.analyze(tree, ctx); - - if (executeHooks) { - hookCtx.update(sem); - driverContext.getHookRunner().runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks()); - } - - LOG.info("Semantic Analysis Completed (retrial = {})", driverContext.isRetrial()); - - // Retrieve information about cache usage for the query. - if (driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { - driverContext.setCacheUsage(sem.getCacheUsage()); - } - - // validate the plan - sem.validate(); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); - - checkInterrupted("after analyzing query.", null, null); - - // get the output schema - driverContext.setSchema(getSchema(sem, driverContext.getConf())); - QueryPlan plan = new QueryPlan(queryStr, sem, driverContext.getQueryDisplay().getQueryStartTime(), queryId, - driverContext.getQueryState().getHiveOperation(), driverContext.getSchema()); - // save the optimized plan and sql for the explain - plan.setOptimizedCBOPlan(ctx.getCalcitePlan()); - plan.setOptimizedQueryString(ctx.getOptimizedSql()); - driverContext.setPlan(plan); - - driverContext.getConf().set("mapreduce.workflow.id", "hive_" + queryId); - driverContext.getConf().set("mapreduce.workflow.name", queryStr); - - // initialize FetchTask right here - if (plan.getFetchTask() != null) { - plan.getFetchTask().initialize(driverContext.getQueryState(), plan, null, ctx); - } - - //do the authorization check - if (!sem.skipAuthorization() && - HiveConf.getBoolVar(driverContext.getConf(), HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) { - - try { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION); - // Authorization check for kill query will be in KillQueryImpl - // As both admin or operation owner can perform the operation. - // Which is not directly supported in authorizer - if (driverContext.getQueryState().getHiveOperation() != HiveOperation.KILL_QUERY) { - CommandAuthorizer.doAuthorization(driverContext.getQueryState().getHiveOperation(), sem, command); - } - } catch (AuthorizationException authExp) { - CONSOLE.printError("Authorization failed:" + authExp.getMessage() + ". Use SHOW GRANT to get more details."); - throw createProcessorException(403, authExp.getMessage(), "42000", null); - } finally { - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION); - } - } - - if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT) - || driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) { - String explainOutput = getExplainOutput(sem, plan, tree); - if (explainOutput != null) { - if (driverContext.getConf().getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) { - LOG.info("EXPLAIN output for queryid " + queryId + " : " + explainOutput); - } - if (driverContext.getConf().isWebUiQueryInfoCacheEnabled() - && driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_EXPLAIN_OUTPUT)) { - driverContext.getQueryDisplay().setExplainPlan(explainOutput); - } - } - } - } catch (CommandProcessorException cpe) { - throw cpe; - } catch (Exception e) { - checkInterrupted("during query compilation: " + e.getMessage(), null, null); - - compileError = true; - ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage()); - String errorMessage = "FAILED: " + e.getClass().getSimpleName(); - if (error != ErrorMsg.GENERIC_ERROR) { - errorMessage += " [Error " + error.getErrorCode() + "]:"; - } - - // HIVE-4889 - if ((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) { - errorMessage += " " + e.getCause().getMessage(); - } else { - errorMessage += " " + e.getMessage(); - } - - if (error == ErrorMsg.TXNMGR_NOT_ACID) { - errorMessage += ". Failed command: " + queryStr; - } - - CONSOLE.printError(errorMessage, "\n" + StringUtils.stringifyException(e)); - throw createProcessorException(error.getErrorCode(), errorMessage, error.getSQLState(), e); - } finally { - // Trigger post compilation hook. Note that if the compilation fails here then - // before/after execution hook will never be executed. - if (!parseError) { - try { - driverContext.getHookRunner().runAfterCompilationHook(command, compileError); - } catch (Exception e) { - LOG.warn("Failed when invoking query after-compilation hook.", e); - } - } - - double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE)/1000.00; - ImmutableMap compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation"); - driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings); - - boolean isInterrupted = driverState.isAborted(); - if (isInterrupted && !deferClose) { - closeInProcess(true); - } - - if (isInterrupted) { - driverState.compilationInterruptedWithLocking(deferClose); - LOG.info("Compiling command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); - } else { - driverState.compilationFinishedWithLocking(compileError); - LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); - } - } } private void createTransactionManager() throws CommandProcessorException { @@ -588,8 +284,55 @@ public void run() { String errorMessage = "FAILED: " + e.getClass().getSimpleName() + " [Error " + error.getErrorCode() + "]:"; CONSOLE.printError(errorMessage, "\n" + StringUtils.stringifyException(e)); - throw createProcessorException(error.getErrorCode(), errorMessage, error.getSQLState(), e); + throw DriverUtils.createProcessorException(driverContext, error.getErrorCode(), errorMessage, error.getSQLState(), + e); + } + } + + private void prepareContext() throws CommandProcessorException { + if (context != null && context.getExplainAnalyze() != AnalyzeState.RUNNING) { + // close the existing ctx etc before compiling a new query, but does not destroy driver + closeInProcess(false); + } + + try { + if (context == null) { + context = new Context(driverContext.getConf()); + } + } catch (IOException e) { + throw new CommandProcessorException(e); + } + + context.setHiveTxnManager(driverContext.getTxnManager()); + context.setStatsSource(driverContext.getStatsSource()); + context.setHDFSCleanup(true); + } + + private void setQueryId() { + String queryId = Strings.isNullOrEmpty(driverContext.getQueryState().getQueryId()) ? + QueryPlan.makeQueryId() : driverContext.getQueryState().getQueryId(); + + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQuerySubmission(queryId); } + driverContext.getQueryDisplay().setQueryId(queryId); + + setTriggerContext(queryId); + } + + private void setTriggerContext(String queryId) { + long queryStartTime; + // query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not + // used queryInfo will be null, in which case we take creation of Driver instance as query start time (which is also + // the time when query display object is created) + if (driverContext.getQueryInfo() != null) { + queryStartTime = driverContext.getQueryInfo().getBeginTime(); + } else { + queryStartTime = driverContext.getQueryDisplay().getQueryStartTime(); + } + WmContext wmContext = new WmContext(queryStartTime, queryId); + context.setWmContext(wmContext); } // Checks whether txn list has been invalidated while planning the query. @@ -612,12 +355,12 @@ private boolean isValidTxnListState() throws LockException { // 2) Get locks that are relevant: // - Exclusive for INSERT OVERWRITE. // - Semi-shared for UPDATE/DELETE. - if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) { + if (context.getHiveLocks() == null || context.getHiveLocks().isEmpty()) { // Nothing to check return true; } Set nonSharedLocks = new HashSet<>(); - for (HiveLock lock : ctx.getHiveLocks()) { + for (HiveLock lock : context.getHiveLocks()) { if (lock.mayContainComponents()) { // The lock may have multiple components, e.g., DbHiveLock, hence we need // to check for each of them @@ -690,163 +433,6 @@ private boolean isValidTxnListState() throws LockException { return true; } - private void setTriggerContext(final String queryId) { - final long queryStartTime; - // query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not - // used queryInfo will be null, in which case we take creation of Driver instance as query start time (which is also - // the time when query display object is created) - if (driverContext.getQueryInfo() != null) { - queryStartTime = driverContext.getQueryInfo().getBeginTime(); - } else { - queryStartTime = driverContext.getQueryDisplay().getQueryStartTime(); - } - WmContext wmContext = new WmContext(queryStartTime, queryId); - ctx.setWmContext(wmContext); - } - - /** - * Last repl id should be captured before opening txn by current REPL DUMP operation. - * This is needed to avoid losing data which are added/modified by concurrent txns when bootstrap - * dump in progress. - * @param conf Query configurations - * @throws HiveException - * @throws TException - */ - private void setLastReplIdForDump(HiveConf conf) throws HiveException, TException { - // Last logged notification event id would be the last repl Id for the current REPl DUMP. - Hive hiveDb = Hive.get(); - Long lastReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId(); - conf.setLong(ReplUtils.LAST_REPL_ID_KEY, lastReplId); - LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId); - } - - private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException { - if (checkConcurrency() && startImplicitTxn(driverContext.getTxnManager()) && - !driverContext.getTxnManager().isTxnOpen()) { - String userFromUGI = getUserFromUGI(); - driverContext.getTxnManager().openTxn(ctx, userFromUGI, txnType); - } - } - - private void generateValidTxnList() throws LockException { - // Record current valid txn list that will be used throughout the query - // compilation and processing. We only do this if 1) a transaction - // was already opened and 2) the list has not been recorded yet, - // e.g., by an explicit open transaction command. - driverContext.setValidTxnListsGenerated(false); - String currentTxnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); - if (driverContext.getTxnManager().isTxnOpen() && (currentTxnString == null || currentTxnString.isEmpty())) { - try { - recordValidTxns(driverContext.getTxnManager()); - driverContext.setValidTxnListsGenerated(true); - } catch (LockException e) { - LOG.error("Exception while acquiring valid txn list", e); - throw e; - } - } - } - - private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { - boolean shouldOpenImplicitTxn = !ctx.isExplainPlan(); - //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443 - HiveOperation hiveOperation = driverContext.getQueryState().getHiveOperation(); - switch (hiveOperation == null ? HiveOperation.QUERY : hiveOperation) { - case COMMIT: - case ROLLBACK: - if (!txnManager.isTxnOpen()) { - throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, hiveOperation.getOperationName()); - } - case SWITCHDATABASE: - case SET_AUTOCOMMIT: - /** - * autocommit is here for completeness. TM doesn't use it. If we want to support JDBC - * semantics (or any other definition of autocommit) it should be done at session level. - */ - case SHOWDATABASES: - case SHOWTABLES: - case SHOWCOLUMNS: - case SHOWFUNCTIONS: - case SHOWPARTITIONS: - case SHOWLOCKS: - case SHOWVIEWS: - case SHOW_ROLES: - case SHOW_ROLE_PRINCIPALS: - case SHOW_COMPACTIONS: - case SHOW_TRANSACTIONS: - case ABORT_TRANSACTIONS: - case KILL_QUERY: - shouldOpenImplicitTxn = false; - //this implies that no locks are needed for such a command - } - return shouldOpenImplicitTxn; - } - - private void checkInterrupted(String msg, HookContext hookContext, PerfLogger perfLogger) - throws CommandProcessorException { - if (driverState.isAborted()) { - String errorMessage = "FAILED: command has been interrupted: " + msg; - CONSOLE.printError(errorMessage); - if (hookContext != null) { - try { - invokeFailureHooks(perfLogger, hookContext, errorMessage, null); - } catch (Exception e) { - LOG.warn("Caught exception attempting to invoke Failure Hooks", e); - } - } - throw createProcessorException(1000, errorMessage, "HY008", null); - } - } - - private ImmutableMap dumpMetaCallTimingWithoutEx(String phase) { - try { - return Hive.get().dumpAndClearMetaCallTiming(phase); - } catch (HiveException he) { - LOG.warn("Caught exception attempting to write metadata call information " + he, he); - } - return null; - } - - /** - * Returns EXPLAIN EXTENDED output for a semantically - * analyzed query. - * - * @param sem semantic analyzer for analyzed query - * @param plan query plan - * @param astTree AST tree dump - * @throws java.io.IOException - */ - private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan, - ASTNode astTree) throws IOException { - String ret = null; - ExplainTask task = new ExplainTask(); - task.initialize(driverContext.getQueryState(), plan, null, ctx); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); - try { - List> rootTasks = sem.getAllRootTasks(); - if (driverContext.getConf().getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_SHOW_GRAPH)) { - JSONObject jsonPlan = task.getJSONPlan( - null, rootTasks, sem.getFetchTask(), true, true, true, sem.getCboInfo(), - plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString()); - if (jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES) != null && - jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES).length() <= - driverContext.getConf().getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_GRAPH_SIZE)) { - ret = jsonPlan.toString(); - } else { - ret = null; - } - } else { - task.getJSONPlan(ps, rootTasks, sem.getFetchTask(), false, true, true, sem.getCboInfo(), - plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString()); - ret = baos.toString(); - } - } catch (Exception e) { - LOG.warn("Exception generating explain output: " + e, e); - } - - return ret; - } - @Override public HiveConf getConf() { return driverContext.getConf(); @@ -868,19 +454,6 @@ public FetchTask getFetchTask() { return driverContext.getFetchTask(); } - // Write the current set of valid transactions into the conf file - private void recordValidTxns(HiveTxnManager txnMgr) throws LockException { - String oldTxnString = driverContext.getConf().get(ValidTxnList.VALID_TXNS_KEY); - if ((oldTxnString != null) && (oldTxnString.length() > 0)) { - throw new IllegalStateException("calling recordValidTxn() more than once in the same " + - JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); - } - ValidTxnList txnList = txnMgr.getValidTxns(); - String txnStr = txnList.toString(); - driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, txnStr); - LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId()); - } - // Write the current set of valid write ids for the operated acid tables into the conf file so // that it can be read by the input format. private ValidTxnWriteIdList recordValidWriteIds(HiveTxnManager txnMgr) throws LockException { @@ -985,18 +558,6 @@ private void addTableFromEntity(Entity entity, Map tables) { tables.put(fullTableName, tbl); } - private String getUserFromUGI() throws CommandProcessorException { - // Don't use the userName member, as it may or may not have been set. Get the value from - // conf, which calls into getUGI to figure out who the process is running as. - try { - return driverContext.getConf().getUser(); - } catch (IOException e) { - String errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage(); - CONSOLE.printError(errorMessage, "\n" + StringUtils.stringifyException(e)); - throw createProcessorException(10, errorMessage, ErrorMsg.findSQLState(e.getMessage()), e); - } - } - /** * Acquire read and write locks needed by the statement. The list of objects to be locked are * obtained from the inputs and outputs populated by the compiler. Locking strategy depends on @@ -1018,7 +579,7 @@ private void acquireLocks() throws CommandProcessorException { return; } try { - String userFromUGI = getUserFromUGI(); + String userFromUGI = DriverUtils.getUserFromUGI(driverContext); // Set the table write id in all of the acid file sinks if (!driverContext.getPlan().getAcidSinks().isEmpty()) { @@ -1067,8 +628,8 @@ private void acquireLocks() throws CommandProcessorException { /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ - driverContext.getTxnManager().acquireLocks(driverContext.getPlan(), ctx, userFromUGI, driverState); - final List locks = ctx.getHiveLocks(); + driverContext.getTxnManager().acquireLocks(driverContext.getPlan(), context, userFromUGI, driverState); + final List locks = context.getHiveLocks(); LOG.info("Operation {} obtained {} locks", driverContext.getPlan().getOperation(), ((locks == null) ? 0 : locks.size())); // This check is for controlling the correctness of the current state @@ -1087,7 +648,8 @@ private void acquireLocks() throws CommandProcessorException { } catch (Exception e) { String errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); CONSOLE.printError(errorMessage, "\n" + StringUtils.stringifyException(e)); - throw createProcessorException(10, errorMessage, ErrorMsg.findSQLState(e.getMessage()), e); + throw DriverUtils.createProcessorException(driverContext, 10, errorMessage, ErrorMsg.findSQLState(e.getMessage()), + e); } finally { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); } @@ -1118,7 +680,7 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa // releasing the locks. driverContext.getConf().unset(ValidTxnList.VALID_TXNS_KEY); driverContext.getConf().unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY); - if(!checkConcurrency()) { + if (!DriverUtils.checkConcurrency(driverContext)) { return; } if (txnMgr.isTxnOpen()) { @@ -1135,14 +697,14 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa } } else { //since there is no tx, we only have locks for current query (if any) - if (ctx != null && ctx.getHiveLocks() != null) { - hiveLocks.addAll(ctx.getHiveLocks()); + if (context != null && context.getHiveLocks() != null) { + hiveLocks.addAll(context.getHiveLocks()); } txnMgr.releaseLocks(hiveLocks); } hiveLocks.clear(); - if (ctx != null) { - ctx.setHiveLocks(null); + if (context != null) { + context.setHiveLocks(null); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RELEASE_LOCKS); @@ -1275,7 +837,8 @@ private void compileInternal(String command, boolean deferClose) throws CommandP } if (!success) { String errorMessage = ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg(); - throw createProcessorException(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(), errorMessage, null, null); + throw DriverUtils.createProcessorException(driverContext, ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(), + errorMessage, null, null); } try { @@ -1307,7 +870,7 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command } else { String errorMessage = "FAILED: Precompiled query has been cancelled or closed."; CONSOLE.printError(errorMessage); - throw createProcessorException(12, errorMessage, null, null); + throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, null, null); } } else { driverState.compiling(); @@ -1321,14 +884,15 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command boolean isFinishedWithError = true; try { HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(driverContext.getConf(), - alreadyCompiled ? ctx.getCmd() : command); + alreadyCompiled ? context.getCmd() : command); // Get all the driver run hooks and pre-execute them. try { driverContext.getHookRunner().runPreDriverHooks(hookContext); } catch (Exception e) { String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e)); - throw createProcessorException(12, errorMessage, ErrorMsg.findSQLState(e.getMessage()), e); + throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, + ErrorMsg.findSQLState(e.getMessage()), e); } if (!alreadyCompiled) { @@ -1346,9 +910,9 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - ctx.setHiveTxnManager(driverContext.getTxnManager()); + context.setHiveTxnManager(driverContext.getTxnManager()); - checkInterrupted("at acquiring the lock.", null, null); + DriverUtils.checkInterrupted(driverState, driverContext, "at acquiring the lock.", null, null); lockAndRespond(); @@ -1362,9 +926,9 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command // and then, we acquire locks. If snapshot is still valid, we continue as usual. // But if snapshot is not valid, we recompile the query. driverContext.setRetrial(true); - driverContext.getBackupContext().addRewrittenStatementContext(ctx); - driverContext.getBackupContext().setHiveLocks(ctx.getHiveLocks()); - ctx = driverContext.getBackupContext(); + driverContext.getBackupContext().addRewrittenStatementContext(context); + driverContext.getBackupContext().setHiveLocks(context.getHiveLocks()); + context = driverContext.getBackupContext(); driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, driverContext.getTxnManager().getValidTxns().toString()); if (driverContext.getPlan().hasAcidResourcesInQuery()) { @@ -1390,7 +954,7 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - ctx.setHiveTxnManager(driverContext.getTxnManager()); + context.setHiveTxnManager(driverContext.getTxnManager()); } } catch (LockException e) { throw handleHiveException(e, 13); @@ -1433,7 +997,8 @@ else if(driverContext.getPlan().getOperation() == HiveOperation.ROLLBACK) { } catch (Exception e) { String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e)); - throw createProcessorException(12, errorMessage, ErrorMsg.findSQLState(e.getMessage()), e); + throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, + ErrorMsg.findSQLState(e.getMessage()), e); } isFinishedWithError = false; } finally { @@ -1479,11 +1044,11 @@ private CommandProcessorException handleHiveException(HiveException e, int ret, String sqlState = e.getCanonicalErrorMsg() != null ? e.getCanonicalErrorMsg().getSQLState() : ErrorMsg.findSQLState(e.getMessage()); CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e)); - throw createProcessorException(ret, errorMessage, sqlState, e); + throw DriverUtils.createProcessorException(driverContext, ret, errorMessage, sqlState, e); } private boolean requiresLock() { - if (!checkConcurrency()) { + if (!DriverUtils.checkConcurrency(driverContext)) { LOG.info("Concurrency mode is disabled, not creating a lock manager"); return false; } @@ -1532,23 +1097,10 @@ private boolean isExplicitLockOperation() { return false; } - private CommandProcessorException createProcessorException(int ret, String errorMessage, String sqlState, - Throwable downstreamError) { - SessionState.getPerfLogger().cleanupPerfLogMetrics(); - driverContext.getQueryDisplay().setErrorMessage(errorMessage); - if (downstreamError != null && downstreamError instanceof HiveException) { - ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg(); - if (em != null) { - return new CommandProcessorException(ret, em.getErrorCode(), errorMessage, sqlState, downstreamError); - } - } - return new CommandProcessorException(ret, -1, errorMessage, sqlState, downstreamError); - } - private void useFetchFromCache(CacheEntry cacheEntry) { // Change query FetchTask to use new location specified in results cache. FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork()); - fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, ctx); + fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, context); driverContext.getPlan().setFetchTask(fetchTaskFromCache); driverContext.setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry)); } @@ -1635,7 +1187,7 @@ private void execute() throws CommandProcessorException { if (!driverState.isCompiled() && !driverState.isExecuting()) { String errorMessage = "FAILED: unexpected driverstate: " + driverState + ", for query " + queryStr; CONSOLE.printError(errorMessage); - throw createProcessorException(1000, errorMessage, "HY008", null); + throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null); } else { driverState.executing(); } @@ -1665,11 +1217,10 @@ private void execute() throws CommandProcessorException { SessionState ss = SessionState.get(); // TODO: should this use getUserFromAuthenticator? - hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(), ctx.getPathToCS(), - SessionState.get().getUserName(), - ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId, - ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger, - driverContext.getQueryInfo(), ctx); + hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(), + context.getPathToCS(), SessionState.get().getUserName(), ss.getUserIpAddress(), + InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(), Thread.currentThread().getName(), + ss.isHiveServerQuery(), perfLogger, driverContext.getQueryInfo(), context); hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); driverContext.getHookRunner().runPreHooks(hookContext); @@ -1699,12 +1250,12 @@ private void execute() throws CommandProcessorException { // At any time, at most maxthreads tasks can be running // The main thread polls the TaskRunners to check if they have finished. - checkInterrupted("before running tasks.", hookContext, perfLogger); + DriverUtils.checkInterrupted(driverState, driverContext, "before running tasks.", hookContext, perfLogger); - taskQueue = new TaskQueue(ctx); // for canceling the query (should be bound to session?) + taskQueue = new TaskQueue(context); // for canceling the query (should be bound to session?) taskQueue.prepare(driverContext.getPlan()); - ctx.setHDFSCleanup(true); + context.setHDFSCleanup(true); SessionState.get().setMapRedStats(new LinkedHashMap<>()); SessionState.get().setStackTraces(new HashMap<>()); @@ -1761,7 +1312,8 @@ private void execute() throws CommandProcessorException { TaskResult result = tskRun.getTaskResult(); int exitVal = result.getExitVal(); - checkInterrupted("when checking the execution result.", hookContext, perfLogger); + DriverUtils.checkInterrupted(driverState, driverContext, "when checking the execution result.", hookContext, + perfLogger); if (exitVal != 0) { Task backupTask = tsk.getAndInitBackupTask(); @@ -1782,7 +1334,7 @@ private void execute() throws CommandProcessorException { if (taskQueue.isShutdown()) { errorMessage = "FAILED: Operation cancelled. " + errorMessage; } - invokeFailureHooks(perfLogger, hookContext, + DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError()); String sqlState = "08S01"; @@ -1800,8 +1352,9 @@ private void execute() throws CommandProcessorException { taskQueue.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value - ctx.restoreOriginalTracker(); - throw createProcessorException(exitVal, errorMessage, sqlState, result.getTaskError()); + context.restoreOriginalTracker(); + throw DriverUtils.createProcessorException(driverContext, exitVal, errorMessage, sqlState, + result.getTaskError()); } } @@ -1827,13 +1380,13 @@ private void execute() throws CommandProcessorException { // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value - ctx.restoreOriginalTracker(); + context.restoreOriginalTracker(); if (taskQueue.isShutdown()) { String errorMessage = "FAILED: Operation cancelled"; - invokeFailureHooks(perfLogger, hookContext, errorMessage, null); + DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, null); CONSOLE.printError(errorMessage); - throw createProcessorException(1000, errorMessage, "HY008", null); + throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null); } // remove incomplete outputs. @@ -1867,9 +1420,10 @@ private void execute() throws CommandProcessorException { } catch (Throwable e) { executionError = true; - checkInterrupted("during query execution: \n" + e.getMessage(), hookContext, perfLogger); + DriverUtils.checkInterrupted(driverState, driverContext, "during query execution: \n" + e.getMessage(), + hookContext, perfLogger); - ctx.restoreOriginalTracker(); + context.restoreOriginalTracker(); if (SessionState.get() != null) { SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, String.valueOf(12)); @@ -1878,13 +1432,13 @@ private void execute() throws CommandProcessorException { String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); if (hookContext != null) { try { - invokeFailureHooks(perfLogger, hookContext, errorMessage, e); + DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, e); } catch (Exception t) { LOG.warn("Failed to invoke failure hook", t); } } CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e)); - throw createProcessorException(12, errorMessage, "08S01", e); + throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, "08S01", e); } finally { // Trigger query hooks after query completes its execution. try { @@ -1901,7 +1455,7 @@ private void execute() throws CommandProcessorException { } double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE)/1000.00; - ImmutableMap executionHMSTimings = dumpMetaCallTimingWithoutEx("execution"); + ImmutableMap executionHMSTimings = Hive.dumpMetaCallTimingWithoutEx("execution"); driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings); Map stats = SessionState.get().getMapRedStats(); @@ -2022,15 +1576,6 @@ private String getErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task return errorMessage; } - private void invokeFailureHooks(PerfLogger perfLogger, - HookContext hookContext, String errorMessage, Throwable exception) throws Exception { - hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK); - hookContext.setErrorMessage(errorMessage); - hookContext.setException(exception); - // Get all the failure execution hooks and execute them. - driverContext.getHookRunner().runFailureHooks(hookContext); - } - /** * Launches a new task * @@ -2061,7 +1606,7 @@ private TaskRunner launchTask(Task tsk, String queryId, boolean noName, taskQueue.incCurJobNo(1); CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of " + jobs); } - tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, ctx); + tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, context); TaskRunner tskRun = new TaskRunner(tsk, taskQueue); taskQueue.launching(tskRun); @@ -2107,7 +1652,7 @@ public boolean getResults(List res) throws IOException { } if (resStream == null) { - resStream = ctx.getStream(); + resStream = context.getStream(); } if (resStream == null) { return false; @@ -2146,7 +1691,7 @@ public boolean getResults(List res) throws IOException { } if (ss == Utilities.StreamStatus.EOF) { - resStream = ctx.getStream(); + resStream = context.getStream(); } } return true; @@ -2164,9 +1709,9 @@ public void resetFetch() throws IOException { throw new IOException("Error closing the current fetch task", e); } // FetchTask should not depend on the plan. - driverContext.getFetchTask().initialize(driverContext.getQueryState(), null, null, ctx); + driverContext.getFetchTask().initialize(driverContext.getQueryState(), null, null, context); } else { - ctx.resetStream(); + context.resetStream(); resStream = null; } } @@ -2205,7 +1750,7 @@ private void releasePlan() { private void releaseContext() { try { - if (ctx != null) { + if (context != null) { boolean deleteResultDir = true; // don't let context delete result dirs and scratch dirs if result was cached if (driverContext.getCacheUsage() != null @@ -2213,12 +1758,12 @@ private void releaseContext() { deleteResultDir = false; } - ctx.clear(deleteResultDir); - if (ctx.getHiveLocks() != null) { - hiveLocks.addAll(ctx.getHiveLocks()); - ctx.setHiveLocks(null); + context.clear(deleteResultDir); + if (context.getHiveLocks() != null) { + hiveLocks.addAll(context.getHiveLocks()); + context.setHiveLocks(null); } - ctx = null; + context = null; } } catch (Exception e) { LOG.debug("Exception while clearing the context ", e); @@ -2277,14 +1822,14 @@ private void releaseCachedResult() { // Close and release resources within a running query process. Since it runs under // driver state COMPILING, EXECUTING or INTERRUPT, it would not have race condition // with the releases probably running in the other closing thread. - private int closeInProcess(boolean destroyed) { + public int closeInProcess(boolean destroyed) { releaseTaskQueue(); releasePlan(); releaseCachedResult(); releaseFetchTask(); releaseResStream(); releaseContext(); - if(destroyed) { + if (destroyed) { if (!hiveLocks.isEmpty()) { try { releaseLocksAndCommitOrRollback(false); diff --git ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index 1afcfc8969..f483f1a9a2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -86,6 +86,14 @@ public QueryDisplay getQueryDisplay() { return queryDisplay; } + public String getQueryId() { + return queryDisplay.getQueryId(); + } + + public String getQueryString() { + return queryDisplay.getQueryString(); + } + public QueryState getQueryState() { return queryState; } diff --git ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java index 26e904af0b..7c6e63cc3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java @@ -15,19 +15,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.ql; +import java.io.IOException; + import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DriverUtils { - private static final Logger LOG = LoggerFactory.getLogger(DriverUtils.class); +/** + * Utility functions for the Driver. + */ +public final class DriverUtils { + private static final String CLASS_NAME = Driver.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + private static final LogHelper CONSOLE = new LogHelper(LOG); + + private DriverUtils() { + throw new UnsupportedOperationException("DriverUtils should not be instantiated!"); + } public static void runOnDriver(HiveConf conf, String user, SessionState sessionState, String query) throws HiveException { @@ -89,4 +105,58 @@ public static SessionState setUpSessionState(HiveConf conf, String user, boolean } return sessionState; } + + public static void checkInterrupted(DriverState driverState, DriverContext driverContext, String msg, + HookContext hookContext, PerfLogger perfLogger) throws CommandProcessorException { + if (driverState.isAborted()) { + String errorMessage = "FAILED: command has been interrupted: " + msg; + CONSOLE.printError(errorMessage); + if (hookContext != null) { + try { + invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, null); + } catch (Exception e) { + LOG.warn("Caught exception attempting to invoke Failure Hooks", e); + } + } + throw createProcessorException(driverContext, 1000, errorMessage, "HY008", null); + } + } + + public static void invokeFailureHooks(DriverContext driverContext, PerfLogger perfLogger, HookContext hookContext, + String errorMessage, Throwable exception) throws Exception { + hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK); + hookContext.setErrorMessage(errorMessage); + hookContext.setException(exception); + // Get all the failure execution hooks and execute them. + driverContext.getHookRunner().runFailureHooks(hookContext); + } + + public static CommandProcessorException createProcessorException(DriverContext driverContext, int ret, + String errorMessage, String sqlState, Throwable downstreamError) { + SessionState.getPerfLogger().cleanupPerfLogMetrics(); + driverContext.getQueryDisplay().setErrorMessage(errorMessage); + if (downstreamError != null && downstreamError instanceof HiveException) { + ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg(); + if (em != null) { + return new CommandProcessorException(ret, em.getErrorCode(), errorMessage, sqlState, downstreamError); + } + } + return new CommandProcessorException(ret, -1, errorMessage, sqlState, downstreamError); + } + + public static boolean checkConcurrency(DriverContext driverContext) { + return driverContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + } + + public static String getUserFromUGI(DriverContext driverContext) throws CommandProcessorException { + // Don't use the userName member, as it may or may not have been set. Get the value from + // conf, which calls into getUGI to figure out who the process is running as. + try { + return driverContext.getConf().getUser(); + } catch (IOException e) { + String errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage(); + CONSOLE.printError(errorMessage, "\n" + StringUtils.stringifyException(e)); + throw createProcessorException(driverContext, 10, errorMessage, ErrorMsg.findSQLState(e.getMessage()), e); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index 4d79ebc933..c1f94d165b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -20,6 +20,8 @@ import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; import java.io.Serializable; @@ -46,10 +48,14 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger; +import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.VectorizationDetailLevel; import org.apache.hadoop.hive.ql.plan.Explain; @@ -81,17 +87,13 @@ * **/ public class ExplainTask extends Task implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(ExplainTask.class.getName()); + public static final String STAGE_DEPENDENCIES = "STAGE DEPENDENCIES"; private static final long serialVersionUID = 1L; public static final String EXPL_COLUMN_NAME = "Explain"; private final Set> visitedOps = new HashSet>(); private boolean isLogical = false; - protected final Logger LOG; - - public ExplainTask() { - super(); - LOG = LoggerFactory.getLogger(this.getClass().getName()); - } /* * Below method returns the dependencies for the passed in query to EXPLAIN. @@ -1292,4 +1294,44 @@ public String getName() { public boolean canExecuteInParallel() { return false; } + + + /** + * Returns EXPLAIN EXTENDED output for a semantically analyzed query. + * + * @param sem semantic analyzer for analyzed query + * @param plan query plan + * @param astTree AST tree dump + */ + public static String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan, ASTNode astTree, + QueryState queryState, Context context, HiveConf conf) throws IOException { + String ret = null; + ExplainTask task = new ExplainTask(); + task.initialize(queryState, plan, null, context); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + try { + List> rootTasks = sem.getAllRootTasks(); + if (conf.getBoolVar(ConfVars.HIVE_SERVER2_WEBUI_SHOW_GRAPH)) { + JSONObject jsonPlan = task.getJSONPlan( + null, rootTasks, sem.getFetchTask(), true, true, true, sem.getCboInfo(), + plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString()); + if (jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES) != null && + jsonPlan.getJSONObject(ExplainTask.STAGE_DEPENDENCIES).length() <= + conf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_GRAPH_SIZE)) { + ret = jsonPlan.toString(); + } else { + ret = null; + } + } else { + task.getJSONPlan(ps, rootTasks, sem.getFetchTask(), false, true, true, sem.getCboInfo(), + plan.getOptimizedCBOPlan(), plan.getOptimizedQueryString()); + ret = baos.toString(); + } + } catch (Exception e) { + LOG.warn("Exception generating explain output: " + e, e); + } + + return ret; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index fa6c9d03ec..f4bd0f9399 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -5389,6 +5389,15 @@ public void clearMetaCallTiming() { metaCallTimeMap.clear(); } + public static ImmutableMap dumpMetaCallTimingWithoutEx(String phase) { + try { + return get().dumpAndClearMetaCallTiming(phase); + } catch (HiveException he) { + LOG.warn("Caught exception attempting to write metadata call information " + he, he); + } + return null; + } + public ImmutableMap dumpAndClearMetaCallTiming(String phase) { boolean phaseInfoLogged = false; if (LOG.isDebugEnabled()) {