diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java index d89d67c423..7ba0d3ee2a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java @@ -109,6 +109,7 @@ private void addReplChangeManagerConfigs() throws Exception { MetastoreConf.setBoolVar(conf, ConfVars.REPLCMENABLED, true); String cmroot = "hdfs://" + miniDFS.getNameNode().getHostAndPort() + "/cmroot"; MetastoreConf.setVar(conf, ConfVars.REPLCMDIR, cmroot); + MetastoreConf.setVar(conf, ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR, cmroot); threadNames.put(ReplChangeManager.CM_THREAD_NAME_PREFIX, false); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java index 51bb78733a..41a1ce9e1d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java @@ -1364,8 +1364,10 @@ public void testCmrootEncrypted() throws Exception { "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); - String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmroot"; + String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootDirEncrypted"; encryptedHiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootdirEncrypted); + FileSystem cmrootdirEncryptedFs = new Path(cmrootdirEncrypted).getFileSystem(hiveConf); + cmrootdirEncryptedFs.mkdirs(new Path(cmrootdirEncrypted)); encryptedHiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmrootFallBack); //Create cm in encrypted zone @@ -1410,10 +1412,89 @@ public void testCmrootEncrypted() throws Exception { exceptionThrown = true; } assertFalse(exceptionThrown); + cmrootdirEncryptedFs.delete(new Path(cmrootdirEncrypted), true); ReplChangeManager.resetReplChangeManagerInstance(); initReplChangeManager(); } + @Test + public void testCmrootFallbackEncrypted() throws Exception { + HiveConf encryptedHiveConf = new HiveConf(TestReplChangeManager.class); + encryptedHiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); + encryptedHiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); + encryptedHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); + String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootIsEncrypted"; + String cmRootFallbackEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + + "/cmrootFallbackEncrypted"; + FileSystem cmrootdirEncryptedFs = new Path(cmrootdirEncrypted).getFileSystem(encryptedHiveConf); + try { + cmrootdirEncryptedFs.mkdirs(new Path(cmrootdirEncrypted)); + cmrootdirEncryptedFs.mkdirs(new Path(cmRootFallbackEncrypted)); + encryptedHiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootdirEncrypted); + encryptedHiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmRootFallbackEncrypted); + + //Create cm in encrypted zone + HadoopShims.HdfsEncryptionShim shimCmEncrypted = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf); + shimCmEncrypted.createEncryptionZone(new Path(cmrootdirEncrypted), "test_key_db"); + shimCmEncrypted.createEncryptionZone(new Path(cmRootFallbackEncrypted), "test_key_db"); + ReplChangeManager.resetReplChangeManagerInstance(); + boolean exceptionThrown = false; + try { + new Warehouse(encryptedHiveConf); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("should not be encrypted")); + } + assertTrue(exceptionThrown); + } finally { + cmrootdirEncryptedFs.delete(new Path(cmrootdirEncrypted), true); + cmrootdirEncryptedFs.delete(new Path(cmRootFallbackEncrypted), true); + ReplChangeManager.resetReplChangeManagerInstance(); + initReplChangeManager(); + } + } + + @Test + public void testCmrootFallbackRelative() throws Exception { + HiveConf encryptedHiveConf = new HiveConf(TestReplChangeManager.class); + encryptedHiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); + encryptedHiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); + encryptedHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); + String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootIsEncrypted"; + String cmRootFallbackEncrypted = "cmrootFallbackEncrypted"; + FileSystem cmrootdirEncryptedFs = new Path(cmrootdirEncrypted).getFileSystem(encryptedHiveConf); + try { + cmrootdirEncryptedFs.mkdirs(new Path(cmrootdirEncrypted)); + cmrootdirEncryptedFs.mkdirs(new Path(cmRootFallbackEncrypted)); + encryptedHiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootdirEncrypted); + encryptedHiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmRootFallbackEncrypted); + + //Create cm in encrypted zone + HadoopShims.HdfsEncryptionShim shimCmEncrypted = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf); + shimCmEncrypted.createEncryptionZone(new Path(cmrootdirEncrypted), "test_key_db"); + + ReplChangeManager.resetReplChangeManagerInstance(); + boolean exceptionThrown = false; + try { + new Warehouse(encryptedHiveConf); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("should be absolute")); + } + assertTrue(exceptionThrown); + } finally { + cmrootdirEncryptedFs.delete(new Path(cmrootdirEncrypted), true); + cmrootdirEncryptedFs.delete(new Path(cmRootFallbackEncrypted), true); + ReplChangeManager.resetReplChangeManagerInstance(); + initReplChangeManager(); + } + } + + private void createFile(Path path, String content) throws IOException { FSDataOutputStream output = path.getFileSystem(hiveConf).create(path); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 1f8bc12fde..5191800f32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hive.ql; +import java.io.DataInput; import java.io.IOException; +import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -49,18 +53,26 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; import org.apache.hadoop.hive.ql.ddl.DDLDesc.DDLDescWithWriteId; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.TaskResult; +import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.PrivateHookContext; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lock.CompileLock; import org.apache.hadoop.hive.ql.lock.CompileLockFactory; @@ -70,6 +82,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter; @@ -89,6 +102,7 @@ import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.hive.common.util.TxnIdUtils; @@ -97,6 +111,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; public class Driver implements IDriver { @@ -109,11 +124,15 @@ private int maxRows = 100; private ByteStream.Output bos = new ByteStream.Output(); + private DataInput resStream; private Context context; private final DriverContext driverContext; private TaskQueue taskQueue; private final List hiveLocks = new ArrayList(); + // HS2 operation handle guid string + private String operationId; + private DriverState driverState = new DriverState(); @Override @@ -926,9 +945,7 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command } try { - taskQueue = new TaskQueue(context); // for canceling the query (should be bound to session?) - Executor executor = new Executor(context, driverContext, driverState, taskQueue); - executor.execute(); + execute(); } catch (CommandProcessorException cpe) { rollback(cpe); throw cpe; @@ -1064,6 +1081,535 @@ private boolean isExplicitLockOperation() { return false; } + private void useFetchFromCache(CacheEntry cacheEntry) { + // Change query FetchTask to use new location specified in results cache. + FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork()); + fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, context); + driverContext.getPlan().setFetchTask(fetchTaskFromCache); + driverContext.setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry)); + } + + private void preExecutionCacheActions() throws Exception { + if (driverContext.getCacheUsage() != null) { + if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && + driverContext.getPlan().getFetchTask() != null) { + ValidTxnWriteIdList txnWriteIdList = null; + if (driverContext.getPlan().hasAcidResourcesInQuery()) { + txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf()); + } + // The results of this query execution might be cacheable. + // Add a placeholder entry in the cache so other queries know this result is pending. + CacheEntry pendingCacheEntry = + QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList); + if (pendingCacheEntry != null) { + // Update cacheUsage to reference the pending entry. + this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry); + } + } + } + } + + private void postExecutionCacheActions() throws Exception { + if (driverContext.getCacheUsage() != null) { + if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) { + // Using a previously cached result. + CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry(); + + // Reader count already incremented during cache lookup. + // Save to usedCacheEntry to ensure reader is released after query. + driverContext.setUsedCacheEntry(cacheEntry); + } else if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && + driverContext.getCacheUsage().getCacheEntry() != null && + driverContext.getPlan().getFetchTask() != null) { + // Save results to the cache for future queries to use. + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); + + ValidTxnWriteIdList txnWriteIdList = null; + if (driverContext.getPlan().hasAcidResourcesInQuery()) { + txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf()); + } + CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry(); + boolean savedToCache = QueryResultsCache.getInstance().setEntryValid( + cacheEntry, + driverContext.getPlan().getFetchTask().getWork()); + LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry); + if (savedToCache) { + useFetchFromCache(driverContext.getCacheUsage().getCacheEntry()); + // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released. + driverContext.setUsedCacheEntry(driverContext.getCacheUsage().getCacheEntry()); + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); + } + } + } + + private void execute() throws CommandProcessorException { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE); + + boolean noName = Strings.isNullOrEmpty(driverContext.getConf().get(MRJobConfig.JOB_NAME)); + + int maxlen; + if ("spark".equals(driverContext.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE))) { + maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH); + } else { + maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); + } + Metrics metrics = MetricsFactory.getInstance(); + + String queryId = driverContext.getPlan().getQueryId(); + // Get the query string from the conf file as the compileInternal() method might + // hide sensitive information during query redaction. + String queryStr = driverContext.getConf().getQueryString(); + + driverState.lock(); + try { + // if query is not in compiled state, or executing state which is carried over from + // a combined compile/execute in runInternal, throws the error + if (!driverState.isCompiled() && !driverState.isExecuting()) { + String errorMessage = "FAILED: unexpected driverstate: " + driverState + ", for query " + queryStr; + CONSOLE.printError(errorMessage); + throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null); + } else { + driverState.executing(); + } + } finally { + driverState.unlock(); + } + + HookContext hookContext = null; + + // Whether there's any error occurred during query execution. Used for query lifetime hook. + boolean executionError = false; + + try { + LOG.info("Executing command(queryId=" + queryId + "): " + queryStr); + // compile and execute can get called from different threads in case of HS2 + // so clear timing in this thread's Hive object before proceeding. + Hive.get().clearMetaCallTiming(); + + driverContext.getPlan().setStarted(); + + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().startQuery(queryStr, queryId); + SessionState.get().getHiveHistory().logPlanProgress(driverContext.getPlan()); + } + resStream = null; + + SessionState ss = SessionState.get(); + + // TODO: should this use getUserFromAuthenticator? + hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(), + context.getPathToCS(), SessionState.get().getUserName(), ss.getUserIpAddress(), + InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(), Thread.currentThread().getName(), + ss.isHiveServerQuery(), perfLogger, driverContext.getQueryInfo(), context); + hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); + + driverContext.getHookRunner().runPreHooks(hookContext); + + // Trigger query hooks before query execution. + driverContext.getHookRunner().runBeforeExecutionHook(queryStr, hookContext); + + setQueryDisplays(driverContext.getPlan().getRootTasks()); + int mrJobs = Utilities.getMRTasks(driverContext.getPlan().getRootTasks()).size(); + int jobs = mrJobs + Utilities.getTezTasks(driverContext.getPlan().getRootTasks()).size() + + Utilities.getSparkTasks(driverContext.getPlan().getRootTasks()).size(); + if (jobs > 0) { + logMrWarning(mrJobs); + CONSOLE.printInfo("Query ID = " + queryId); + CONSOLE.printInfo("Total jobs = " + jobs); + } + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS, + String.valueOf(jobs)); + SessionState.get().getHiveHistory().setIdToTableMap(driverContext.getPlan().getIdToTableNameMap()); + } + String jobname = Utilities.abbreviate(queryStr, maxlen - 6); + + // A runtime that launches runnable tasks as separate Threads through + // TaskRunners + // As soon as a task isRunnable, it is put in a queue + // At any time, at most maxthreads tasks can be running + // The main thread polls the TaskRunners to check if they have finished. + + DriverUtils.checkInterrupted(driverState, driverContext, "before running tasks.", hookContext, perfLogger); + + taskQueue = new TaskQueue(context); // for canceling the query (should be bound to session?) + taskQueue.prepare(driverContext.getPlan()); + + context.setHDFSCleanup(true); + + SessionState.get().setMapRedStats(new LinkedHashMap<>()); + SessionState.get().setStackTraces(new HashMap<>()); + SessionState.get().setLocalMapRedErrors(new HashMap<>()); + + // Add root Tasks to runnable + for (Task tsk : driverContext.getPlan().getRootTasks()) { + // This should never happen, if it does, it's a bug with the potential to produce + // incorrect results. + assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty(); + taskQueue.addToRunnable(tsk); + + if (metrics != null) { + tsk.updateTaskMetrics(metrics); + } + } + + preExecutionCacheActions(); + + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); + // Loop while you either have tasks running, or tasks queued up + while (taskQueue.isRunning()) { + // Launch upto maxthreads tasks + Task task; + int maxthreads = HiveConf.getIntVar(driverContext.getConf(), HiveConf.ConfVars.EXECPARALLETHREADNUMBER); + while ((task = taskQueue.getRunnable(maxthreads)) != null) { + TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, taskQueue); + if (!runner.isRunning()) { + break; + } + } + + // poll the Tasks to see which one completed + TaskRunner tskRun = taskQueue.pollFinished(); + if (tskRun == null) { + continue; + } + /* + This should be removed eventually. HIVE-17814 gives more detail + explanation of whats happening and HIVE-17815 as to why this is done. + Briefly for replication the graph is huge and so memory pressure is going to be huge if + we keep a lot of references around. + */ + String opName = driverContext.getPlan().getOperationName(); + boolean isReplicationOperation = opName.equals(HiveOperation.REPLDUMP.getOperationName()) + || opName.equals(HiveOperation.REPLLOAD.getOperationName()); + if (!isReplicationOperation) { + hookContext.addCompleteTask(tskRun); + } + + driverContext.getQueryDisplay().setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult()); + + Task tsk = tskRun.getTask(); + TaskResult result = tskRun.getTaskResult(); + + int exitVal = result.getExitVal(); + DriverUtils.checkInterrupted(driverState, driverContext, "when checking the execution result.", hookContext, + perfLogger); + + if (exitVal != 0) { + Task backupTask = tsk.getAndInitBackupTask(); + if (backupTask != null) { + String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); + CONSOLE.printError(errorMessage); + errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); + CONSOLE.printError(errorMessage); + + // add backup task to runnable + if (TaskQueue.isLaunchable(backupTask)) { + taskQueue.addToRunnable(backupTask); + } + continue; + + } else { + String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); + if (taskQueue.isShutdown()) { + errorMessage = "FAILED: Operation cancelled. " + errorMessage; + } + DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, + errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError()); + String sqlState = "08S01"; + + // 08S01 (Communication error) is the default sql state. Override the sqlstate + // based on the ErrorMsg set in HiveException. + if (result.getTaskError() instanceof HiveException) { + ErrorMsg errorMsg = ((HiveException) result.getTaskError()). + getCanonicalErrorMsg(); + if (errorMsg != ErrorMsg.GENERIC_ERROR) { + sqlState = errorMsg.getSQLState(); + } + } + + CONSOLE.printError(errorMessage); + taskQueue.shutdown(); + // in case we decided to run everything in local mode, restore the + // the jobtracker setting to its initial value + context.restoreOriginalTracker(); + throw DriverUtils.createProcessorException(driverContext, exitVal, errorMessage, sqlState, + result.getTaskError()); + } + } + + taskQueue.finished(tskRun); + + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(), + Keys.TASK_RET_CODE, String.valueOf(exitVal)); + SessionState.get().getHiveHistory().endTask(queryId, tsk); + } + + if (tsk.getChildTasks() != null) { + for (Task child : tsk.getChildTasks()) { + if (TaskQueue.isLaunchable(child)) { + taskQueue.addToRunnable(child); + } + } + } + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS); + + postExecutionCacheActions(); + + // in case we decided to run everything in local mode, restore the + // the jobtracker setting to its initial value + context.restoreOriginalTracker(); + + if (taskQueue.isShutdown()) { + String errorMessage = "FAILED: Operation cancelled"; + DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, null); + CONSOLE.printError(errorMessage); + throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null); + } + + // remove incomplete outputs. + // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions. + // remove them + HashSet remOutputs = new LinkedHashSet(); + for (WriteEntity output : driverContext.getPlan().getOutputs()) { + if (!output.isComplete()) { + remOutputs.add(output); + } + } + + for (WriteEntity output : remOutputs) { + driverContext.getPlan().getOutputs().remove(output); + } + + + hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK); + + driverContext.getHookRunner().runPostExecHooks(hookContext); + + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, + String.valueOf(0)); + SessionState.get().getHiveHistory().printRowCount(queryId); + } + releasePlan(driverContext.getPlan()); + } catch (CommandProcessorException cpe) { + executionError = true; + throw cpe; + } catch (Throwable e) { + executionError = true; + + DriverUtils.checkInterrupted(driverState, driverContext, "during query execution: \n" + e.getMessage(), + hookContext, perfLogger); + + context.restoreOriginalTracker(); + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, + String.valueOf(12)); + } + // TODO: do better with handling types of Exception here + String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); + if (hookContext != null) { + try { + DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, e); + } catch (Exception t) { + LOG.warn("Failed to invoke failure hook", t); + } + } + CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e)); + throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, "08S01", e); + } finally { + // Trigger query hooks after query completes its execution. + try { + driverContext.getHookRunner().runAfterExecutionHook(queryStr, hookContext, executionError); + } catch (Exception e) { + LOG.warn("Failed when invoking query after execution hook", e); + } + + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().endQuery(queryId); + } + if (noName) { + driverContext.getConf().set(MRJobConfig.JOB_NAME, ""); + } + double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE)/1000.00; + + ImmutableMap executionHMSTimings = Hive.dumpMetaCallTimingWithoutEx("execution"); + driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings); + + Map stats = SessionState.get().getMapRedStats(); + if (stats != null && !stats.isEmpty()) { + long totalCpu = 0; + long numModifiedRows = 0; + CONSOLE.printInfo("MapReduce Jobs Launched: "); + for (Map.Entry entry : stats.entrySet()) { + CONSOLE.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue()); + totalCpu += entry.getValue().getCpuMSec(); + + if (numModifiedRows > -1) { + //if overflow, then numModifiedRows is set as -1. Else update numModifiedRows with the sum. + numModifiedRows = addWithOverflowCheck(numModifiedRows, entry.getValue().getNumModifiedRows()); + } + } + driverContext.getQueryState().setNumModifiedRows(numModifiedRows); + CONSOLE.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); + } + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQueryCompletion(queryId); + } + driverState.lock(); + try { + driverState.executionFinished(executionError); + } finally { + driverState.unlock(); + } + if (driverState.isAborted()) { + LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); + } else { + LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); + } + } + } + + private long addWithOverflowCheck(long val1, long val2) { + try { + return Math.addExact(val1, val2); + } catch (ArithmeticException e) { + return -1; + } + } + + private void releasePlan(QueryPlan plan) { + // Plan maybe null if Driver.close is called in another thread for the same Driver object + driverState.lock(); + try { + if (plan != null) { + plan.setDone(); + if (SessionState.get() != null) { + try { + SessionState.get().getHiveHistory().logPlanProgress(plan); + } catch (Exception e) { + // Log and ignore + LOG.warn("Could not log query plan progress", e); + } + } + } + } finally { + driverState.unlock(); + } + } + + private void setQueryDisplays(List> tasks) { + if (tasks != null) { + Set> visited = new HashSet>(); + while (!tasks.isEmpty()) { + tasks = setQueryDisplays(tasks, visited); + } + } + } + + private List> setQueryDisplays( + List> tasks, + Set> visited) { + List> childTasks = new ArrayList<>(); + for (Task task : tasks) { + if (visited.contains(task)) { + continue; + } + task.setQueryDisplay(driverContext.getQueryDisplay()); + if (task.getDependentTasks() != null) { + childTasks.addAll(task.getDependentTasks()); + } + visited.add(task); + } + return childTasks; + } + + private void logMrWarning(int mrJobs) { + if (mrJobs <= 0 || !("mr".equals(HiveConf.getVar(driverContext.getConf(), ConfVars.HIVE_EXECUTION_ENGINE)))) { + return; + } + String warning = HiveConf.generateMrDeprecationWarning(); + LOG.warn(warning); + } + + private String getErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task tsk) { + String errorMessage = "FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName(); + if (downstreamError != null) { + //here we assume that upstream code may have parametrized the msg from ErrorMsg + //so we want to keep it + if (downstreamError.getMessage() != null) { + errorMessage += ". " + downstreamError.getMessage(); + } else { + errorMessage += ". " + StringUtils.stringifyException(downstreamError); + } + } + else { + ErrorMsg em = ErrorMsg.getErrorMsg(exitVal); + if (em != null) { + errorMessage += ". " + em.getMsg(); + } + } + + return errorMessage; + } + + /** + * Launches a new task + * + * @param tsk + * task being launched + * @param queryId + * Id of the query containing the task + * @param noName + * whether the task has a name set + * @param jobname + * name of the task, if it is a map-reduce job + * @param jobs + * number of map-reduce jobs + * @param taskQueue + * the task queue + */ + private TaskRunner launchTask(Task tsk, String queryId, boolean noName, + String jobname, int jobs, TaskQueue taskQueue) throws HiveException { + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName()); + } + if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) { + if (noName) { + driverContext.getConf().set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")"); + } + driverContext.getConf().set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId()); + Utilities.setWorkflowAdjacencies(driverContext.getConf(), driverContext.getPlan()); + taskQueue.incCurJobNo(1); + CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of " + jobs); + } + tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, context); + TaskRunner tskRun = new TaskRunner(tsk, taskQueue); + + taskQueue.launching(tskRun); + // Launch Task + if (HiveConf.getBoolVar(tsk.getConf(), HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) { + // Launch it in the parallel mode, as a separate thread only for MR tasks + if (LOG.isInfoEnabled()){ + LOG.info("Starting task [" + tsk + "] in parallel"); + } + tskRun.start(); + } else { + if (LOG.isInfoEnabled()){ + LOG.info("Starting task [" + tsk + "] in serial mode"); + } + tskRun.runSequential(); + } + return tskRun; + } + @Override public boolean isFetchingTable() { return driverContext.getFetchTask() != null; @@ -1089,10 +1635,10 @@ public boolean getResults(List res) throws IOException { return driverContext.getFetchTask().fetch(res); } - if (driverContext.getResStream() == null) { - driverContext.setResStream(context.getStream()); + if (resStream == null) { + resStream = context.getStream(); } - if (driverContext.getResStream() == null) { + if (resStream == null) { return false; } @@ -1100,7 +1646,7 @@ public boolean getResults(List res) throws IOException { String row = null; while (numRows < maxRows) { - if (driverContext.getResStream() == null) { + if (resStream == null) { if (numRows > 0) { return true; } else { @@ -1111,7 +1657,7 @@ public boolean getResults(List res) throws IOException { bos.reset(); Utilities.StreamStatus ss; try { - ss = Utilities.readColumn(driverContext.getResStream(), bos); + ss = Utilities.readColumn(resStream, bos); if (bos.getLength() > 0) { row = new String(bos.getData(), 0, bos.getLength(), "UTF-8"); } else if (ss == Utilities.StreamStatus.TERMINATED) { @@ -1129,7 +1675,7 @@ public boolean getResults(List res) throws IOException { } if (ss == Utilities.StreamStatus.EOF) { - driverContext.setResStream(context.getStream()); + resStream = context.getStream(); } } return true; @@ -1150,7 +1696,7 @@ public void resetFetch() throws IOException { driverContext.getFetchTask().initialize(driverContext.getQueryState(), null, null, context); } else { context.resetStream(); - driverContext.setResStream(null); + resStream = null; } } @@ -1210,9 +1756,9 @@ private void releaseContext() { private void releaseResStream() { try { - if (driverContext.getResStream() != null) { - ((FSDataInputStream) driverContext.getResStream()).close(); - driverContext.setResStream(null); + if (resStream != null) { + ((FSDataInputStream) resStream).close(); + resStream = null; } } catch (Exception e) { LOG.debug(" Exception while closing the resStream ", e); @@ -1337,11 +1883,11 @@ public QueryDisplay getQueryDisplay() { /** * Set the HS2 operation handle's guid string - * @param operationId base64 encoded guid string + * @param opId base64 encoded guid string */ @Override - public void setOperationId(String operationId) { - driverContext.setOperationId(operationId); + public void setOperationId(String opId) { + this.operationId = opId; } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index a8c83fc504..bbf7fe5379 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql; -import java.io.DataInput; - import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Schema; @@ -73,11 +71,6 @@ private Context backupContext = null; private boolean retrial = false; - private DataInput resStream; - - // HS2 operation handle guid string - private String operationId; - public DriverContext(QueryState queryState, QueryInfo queryInfo, HookRunner hookRunner, HiveTxnManager initTxnManager) { this.queryState = queryState; @@ -222,20 +215,4 @@ public boolean isRetrial() { public void setRetrial(boolean retrial) { this.retrial = retrial; } - - public DataInput getResStream() { - return resStream; - } - - public void setResStream(DataInput resStream) { - this.resStream = resStream; - } - - public String getOperationId() { - return operationId; - } - - public void setOperationId(String operationId) { - this.operationId = operationId; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Executor.java b/ql/src/java/org/apache/hadoop/hive/ql/Executor.java deleted file mode 100644 index e9909a9943..0000000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/Executor.java +++ /dev/null @@ -1,593 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.metrics.common.Metrics; -import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.cache.results.CacheUsage; -import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; -import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; -import org.apache.hadoop.hive.ql.exec.ConditionalTask; -import org.apache.hadoop.hive.ql.exec.DagUtils; -import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.TaskResult; -import org.apache.hadoop.hive.ql.exec.TaskRunner; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; -import org.apache.hadoop.hive.ql.hooks.HookContext; -import org.apache.hadoop.hive.ql.hooks.PrivateHookContext; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.HiveOperation; -import org.apache.hadoop.hive.ql.processors.CommandProcessorException; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; - -/** - * Executes the Query Plan. - */ -public class Executor { - private static final String CLASS_NAME = Driver.class.getName(); - private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - private static final LogHelper CONSOLE = new LogHelper(LOG); - - private final Context context; - private final DriverContext driverContext; - private final DriverState driverState; - private final TaskQueue taskQueue; - - private HookContext hookContext; - - public Executor(Context context, DriverContext driverContext, DriverState driverState, TaskQueue taskQueue) { - this.context = context; - this.driverContext = driverContext; - this.driverState = driverState; - this.taskQueue = taskQueue; - } - - public void execute() throws CommandProcessorException { - SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE); - - boolean noName = Strings.isNullOrEmpty(driverContext.getConf().get(MRJobConfig.JOB_NAME)); - - checkState(); - - // Whether there's any error occurred during query execution. Used for query lifetime hook. - boolean executionError = false; - - try { - LOG.info("Executing command(queryId=" + driverContext.getQueryId() + "): " + driverContext.getQueryString()); - - // TODO: should this use getUserFromAuthenticator? - hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(), - context.getPathToCS(), SessionState.get().getUserName(), SessionState.get().getUserIpAddress(), - InetAddress.getLocalHost().getHostAddress(), driverContext.getOperationId(), - SessionState.get().getSessionId(), Thread.currentThread().getName(), SessionState.get().isHiveServerQuery(), - SessionState.getPerfLogger(), driverContext.getQueryInfo(), context); - - preExecutionActions(); - preExecutionCacheActions(); - runTasks(noName); - postExecutionCacheActions(); - postExecutionActions(); - } catch (CommandProcessorException cpe) { - executionError = true; - throw cpe; - } catch (Throwable e) { - executionError = true; - DriverUtils.checkInterrupted(driverState, driverContext, "during query execution: \n" + e.getMessage(), - hookContext, SessionState.getPerfLogger()); - handleException(hookContext, e); - } finally { - cleanUp(noName, hookContext, executionError); - } - } - - private void checkState() throws CommandProcessorException { - driverState.lock(); - try { - // if query is not in compiled state, or executing state which is carried over from - // a combined compile/execute in runInternal, throws the error - if (!driverState.isCompiled() && !driverState.isExecuting()) { - String errorMessage = "FAILED: unexpected driverstate: " + driverState + ", for query " + - driverContext.getQueryString(); - CONSOLE.printError(errorMessage); - throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null); - } else { - driverState.executing(); - } - } finally { - driverState.unlock(); - } - } - - private void preExecutionActions() throws Exception { - // compile and execute can get called from different threads in case of HS2 - // so clear timing in this thread's Hive object before proceeding. - Hive.get().clearMetaCallTiming(); - - driverContext.getPlan().setStarted(); - - SessionState.get().getHiveHistory().startQuery(driverContext.getQueryString(), driverContext.getQueryId()); - SessionState.get().getHiveHistory().logPlanProgress(driverContext.getPlan()); - driverContext.setResStream(null); - - hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); - driverContext.getHookRunner().runPreHooks(hookContext); - - // Trigger query hooks before query execution. - driverContext.getHookRunner().runBeforeExecutionHook(driverContext.getQueryString(), hookContext); - - setQueryDisplays(driverContext.getPlan().getRootTasks()); - - // A runtime that launches runnable tasks as separate Threads through TaskRunners - // As soon as a task isRunnable, it is put in a queue - // At any time, at most maxthreads tasks can be running - // The main thread polls the TaskRunners to check if they have finished. - - DriverUtils.checkInterrupted(driverState, driverContext, "before running tasks.", hookContext, - SessionState.getPerfLogger()); - - taskQueue.prepare(driverContext.getPlan()); - - context.setHDFSCleanup(true); - - SessionState.get().setMapRedStats(new LinkedHashMap<>()); - SessionState.get().setStackTraces(new HashMap<>()); - SessionState.get().setLocalMapRedErrors(new HashMap<>()); - - // Add root Tasks to runnable - Metrics metrics = MetricsFactory.getInstance(); - for (Task task : driverContext.getPlan().getRootTasks()) { - // This should never happen, if it does, it's a bug with the potential to produce - // incorrect results. - assert task.getParentTasks() == null || task.getParentTasks().isEmpty(); - taskQueue.addToRunnable(task); - - if (metrics != null) { - task.updateTaskMetrics(metrics); - } - } - } - - private void setQueryDisplays(List> tasks) { - if (tasks != null) { - Set> visited = new HashSet>(); - while (!tasks.isEmpty()) { - tasks = setQueryDisplays(tasks, visited); - } - } - } - - private List> setQueryDisplays(List> tasks, Set> visited) { - List> childTasks = new ArrayList<>(); - for (Task task : tasks) { - if (visited.contains(task)) { - continue; - } - task.setQueryDisplay(driverContext.getQueryDisplay()); - if (task.getDependentTasks() != null) { - childTasks.addAll(task.getDependentTasks()); - } - visited.add(task); - } - return childTasks; - } - - private void preExecutionCacheActions() throws Exception { - if (driverContext.getCacheUsage() == null) { - return; - } - - if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && - driverContext.getPlan().getFetchTask() != null) { - ValidTxnWriteIdList txnWriteIdList = null; - if (driverContext.getPlan().hasAcidResourcesInQuery()) { - txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf()); - } - // The results of this query execution might be cacheable. - // Add a placeholder entry in the cache so other queries know this result is pending. - CacheEntry pendingCacheEntry = - QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList); - if (pendingCacheEntry != null) { - // Update cacheUsage to reference the pending entry. - this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry); - } - } - } - - private void runTasks(boolean noName) throws Exception { - SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); - - int jobCount = getJobCount(); - String jobName = getJobName(); - - // Loop while you either have tasks running, or tasks queued up - while (taskQueue.isRunning()) { - launchTasks(noName, jobCount, jobName); - handleFinished(); - } - - SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS); - } - - private void handleFinished() throws Exception { - // poll the Tasks to see which one completed - TaskRunner taskRun = taskQueue.pollFinished(); - if (taskRun == null) { - return; - } - /* - This should be removed eventually. HIVE-17814 gives more detail - explanation of whats happening and HIVE-17815 as to why this is done. - Briefly for replication the graph is huge and so memory pressure is going to be huge if - we keep a lot of references around. - */ - String opName = driverContext.getPlan().getOperationName(); - boolean isReplicationOperation = opName.equals(HiveOperation.REPLDUMP.getOperationName()) - || opName.equals(HiveOperation.REPLLOAD.getOperationName()); - if (!isReplicationOperation) { - hookContext.addCompleteTask(taskRun); - } - - driverContext.getQueryDisplay().setTaskResult(taskRun.getTask().getId(), taskRun.getTaskResult()); - - Task task = taskRun.getTask(); - TaskResult result = taskRun.getTaskResult(); - - int exitVal = result.getExitVal(); - DriverUtils.checkInterrupted(driverState, driverContext, "when checking the execution result.", hookContext, - SessionState.getPerfLogger()); - - if (exitVal != 0) { - handleTaskFailure(task, result, exitVal); - return; - } - - taskQueue.finished(taskRun); - - SessionState.get().getHiveHistory().setTaskProperty(driverContext.getQueryId(), task.getId(), - Keys.TASK_RET_CODE, String.valueOf(exitVal)); - SessionState.get().getHiveHistory().endTask(driverContext.getQueryId(), task); - - if (task.getChildTasks() != null) { - for (Task child : task.getChildTasks()) { - if (TaskQueue.isLaunchable(child)) { - taskQueue.addToRunnable(child); - } - } - } - } - - private String getJobName() { - int maxlen; - if ("spark".equals(driverContext.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE))) { - maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH); - } else { - maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); - } - return Utilities.abbreviate(driverContext.getQueryString(), maxlen - 6); - } - - private int getJobCount() { - int mrJobCount = Utilities.getMRTasks(driverContext.getPlan().getRootTasks()).size(); - int jobCount = mrJobCount + Utilities.getTezTasks(driverContext.getPlan().getRootTasks()).size() - + Utilities.getSparkTasks(driverContext.getPlan().getRootTasks()).size(); - if (jobCount > 0) { - if (mrJobCount > 0 && "mr".equals(HiveConf.getVar(driverContext.getConf(), ConfVars.HIVE_EXECUTION_ENGINE))) { - LOG.warn(HiveConf.generateMrDeprecationWarning()); - } - CONSOLE.printInfo("Query ID = " + driverContext.getPlan().getQueryId()); - CONSOLE.printInfo("Total jobs = " + jobCount); - } - if (SessionState.get() != null) { - SessionState.get().getHiveHistory().setQueryProperty(driverContext.getPlan().getQueryId(), Keys.QUERY_NUM_TASKS, - String.valueOf(jobCount)); - SessionState.get().getHiveHistory().setIdToTableMap(driverContext.getPlan().getIdToTableNameMap()); - } - return jobCount; - } - - private void launchTasks(boolean noName, int jobCount, String jobName) throws HiveException { - // Launch upto maxthreads tasks - Task task; - int maxthreads = HiveConf.getIntVar(driverContext.getConf(), HiveConf.ConfVars.EXECPARALLETHREADNUMBER); - while ((task = taskQueue.getRunnable(maxthreads)) != null) { - TaskRunner runner = launchTask(task, noName, jobName, jobCount); - if (!runner.isRunning()) { - break; - } - } - } - - private TaskRunner launchTask(Task task, boolean noName, String jobName, int jobCount) throws HiveException { - SessionState.get().getHiveHistory().startTask(driverContext.getQueryId(), task, task.getClass().getName()); - - if (task.isMapRedTask() && !(task instanceof ConditionalTask)) { - if (noName) { - driverContext.getConf().set(MRJobConfig.JOB_NAME, jobName + " (" + task.getId() + ")"); - } - driverContext.getConf().set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, task.getId()); - Utilities.setWorkflowAdjacencies(driverContext.getConf(), driverContext.getPlan()); - taskQueue.incCurJobNo(1); - CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of " + jobCount); - } - - task.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, context); - TaskRunner taskRun = new TaskRunner(task, taskQueue); - taskQueue.launching(taskRun); - - if (HiveConf.getBoolVar(task.getConf(), HiveConf.ConfVars.EXECPARALLEL) && task.canExecuteInParallel()) { - LOG.info("Starting task [" + task + "] in parallel"); - taskRun.start(); - } else { - LOG.info("Starting task [" + task + "] in serial mode"); - taskRun.runSequential(); - } - return taskRun; - } - - private void handleTaskFailure(Task task, TaskResult result, int exitVal) - throws HiveException, Exception, CommandProcessorException { - Task backupTask = task.getAndInitBackupTask(); - if (backupTask != null) { - String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), task); - CONSOLE.printError(errorMessage); - errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); - CONSOLE.printError(errorMessage); - - // add backup task to runnable - if (TaskQueue.isLaunchable(backupTask)) { - taskQueue.addToRunnable(backupTask); - } - } else { - String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), task); - if (taskQueue.isShutdown()) { - errorMessage = "FAILED: Operation cancelled. " + errorMessage; - } - DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext, - errorMessage + Strings.nullToEmpty(task.getDiagnosticsMessage()), result.getTaskError()); - String sqlState = "08S01"; - - // 08S01 (Communication error) is the default sql state. Override the sqlstate - // based on the ErrorMsg set in HiveException. - if (result.getTaskError() instanceof HiveException) { - ErrorMsg errorMsg = ((HiveException) result.getTaskError()). - getCanonicalErrorMsg(); - if (errorMsg != ErrorMsg.GENERIC_ERROR) { - sqlState = errorMsg.getSQLState(); - } - } - - CONSOLE.printError(errorMessage); - taskQueue.shutdown(); - // in case we decided to run everything in local mode, restore the - // the jobtracker setting to its initial value - context.restoreOriginalTracker(); - throw DriverUtils.createProcessorException(driverContext, exitVal, errorMessage, sqlState, - result.getTaskError()); - } - } - - private String getErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task task) { - String errorMessage = "FAILED: Execution Error, return code " + exitVal + " from " + task.getClass().getName(); - if (downstreamError != null) { - //here we assume that upstream code may have parametrized the msg from ErrorMsg so we want to keep it - if (downstreamError.getMessage() != null) { - errorMessage += ". " + downstreamError.getMessage(); - } else { - errorMessage += ". " + StringUtils.stringifyException(downstreamError); - } - } else { - ErrorMsg em = ErrorMsg.getErrorMsg(exitVal); - if (em != null) { - errorMessage += ". " + em.getMsg(); - } - } - - return errorMessage; - } - - private void postExecutionCacheActions() throws Exception { - if (driverContext.getCacheUsage() == null) { - return; - } - - if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) { - // Using a previously cached result. - CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry(); - - // Reader count already incremented during cache lookup. - // Save to usedCacheEntry to ensure reader is released after query. - driverContext.setUsedCacheEntry(cacheEntry); - } else if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && - driverContext.getCacheUsage().getCacheEntry() != null && driverContext.getPlan().getFetchTask() != null) { - // Save results to the cache for future queries to use. - SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); - - CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry(); - boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(cacheEntry, - driverContext.getPlan().getFetchTask().getWork()); - LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry); - if (savedToCache) { - useFetchFromCache(driverContext.getCacheUsage().getCacheEntry()); - // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released. - driverContext.setUsedCacheEntry(driverContext.getCacheUsage().getCacheEntry()); - } - - SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); - } - } - - private void useFetchFromCache(CacheEntry cacheEntry) { - // Change query FetchTask to use new location specified in results cache. - FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork()); - fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, context); - driverContext.getPlan().setFetchTask(fetchTaskFromCache); - driverContext.setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry)); - } - - private void postExecutionActions() throws Exception { - // in case we decided to run everything in local mode, restore the the jobtracker setting to its initial value - context.restoreOriginalTracker(); - - if (taskQueue.isShutdown()) { - String errorMessage = "FAILED: Operation cancelled"; - DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext, errorMessage, null); - CONSOLE.printError(errorMessage); - throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null); - } - - // Remove incomplete outputs. - // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions, remove them - driverContext.getPlan().getOutputs().removeIf(x -> !x.isComplete()); - - hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK); - driverContext.getHookRunner().runPostExecHooks(hookContext); - - SessionState.get().getHiveHistory().setQueryProperty(driverContext.getQueryId(), Keys.QUERY_RET_CODE, - String.valueOf(0)); - SessionState.get().getHiveHistory().printRowCount(driverContext.getQueryId()); - releasePlan(driverContext.getPlan()); - } - - private void releasePlan(QueryPlan plan) { - // Plan maybe null if Driver.close is called in another thread for the same Driver object - driverState.lock(); - try { - if (plan != null) { - plan.setDone(); - if (SessionState.get() != null) { - try { - SessionState.get().getHiveHistory().logPlanProgress(plan); - } catch (Exception e) { - // Log and ignore - LOG.warn("Could not log query plan progress", e); - } - } - } - } finally { - driverState.unlock(); - } - } - - private void handleException(HookContext hookContext, Throwable e) throws CommandProcessorException { - context.restoreOriginalTracker(); - if (SessionState.get() != null) { - SessionState.get().getHiveHistory().setQueryProperty(driverContext.getQueryId(), Keys.QUERY_RET_CODE, - String.valueOf(12)); - } - // TODO: do better with handling types of Exception here - String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); - if (hookContext != null) { - try { - DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext, errorMessage, e); - } catch (Exception t) { - LOG.warn("Failed to invoke failure hook", t); - } - } - CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e)); - throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, "08S01", e); - } - - private void cleanUp(boolean noName, HookContext hookContext, boolean executionError) { - // Trigger query hooks after query completes its execution. - try { - driverContext.getHookRunner().runAfterExecutionHook(driverContext.getQueryString(), hookContext, executionError); - } catch (Exception e) { - LOG.warn("Failed when invoking query after execution hook", e); - } - - SessionState.get().getHiveHistory().endQuery(driverContext.getQueryId()); - if (noName) { - driverContext.getConf().set(MRJobConfig.JOB_NAME, ""); - } - double duration = SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE) / 1000.00; - - ImmutableMap executionHMSTimings = Hive.dumpMetaCallTimingWithoutEx("execution"); - driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings); - - logExecutionResourceUsage(); - - if (SessionState.get().getSparkSession() != null) { - SessionState.get().getSparkSession().onQueryCompletion(driverContext.getQueryId()); - } - - driverState.lock(); - try { - driverState.executionFinished(executionError); - } finally { - driverState.unlock(); - } - - if (driverState.isAborted()) { - LOG.info("Executing command(queryId={}) has been interrupted after {} seconds", driverContext.getQueryId(), - duration); - } else { - LOG.info("Completed executing command(queryId={}); Time taken: {} seconds", driverContext.getQueryId(), duration); - } - } - - private void logExecutionResourceUsage() { - Map stats = SessionState.get().getMapRedStats(); - if (stats != null && !stats.isEmpty()) { - long totalCpu = 0; - long numModifiedRows = 0; - CONSOLE.printInfo("MapReduce Jobs Launched: "); - for (Map.Entry entry : stats.entrySet()) { - CONSOLE.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue()); - totalCpu += entry.getValue().getCpuMSec(); - - if (numModifiedRows > -1) { - //if overflow, then numModifiedRows is set as -1. Else update numModifiedRows with the sum. - try { - numModifiedRows = Math.addExact(numModifiedRows, entry.getValue().getNumModifiedRows()); - } catch (ArithmeticException e) { - numModifiedRows = -1; - } - } - } - driverContext.getQueryState().setNumModifiedRows(numModifiedRows); - CONSOLE.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); - } - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java index cd05f52626..baad2694b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java @@ -45,7 +45,7 @@ QueryDisplay getQueryDisplay(); - void setOperationId(String operationId); + void setOperationId(String guid64); CommandProcessorResponse run() throws CommandProcessorException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java index c307085366..eab7f45fde 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java @@ -141,8 +141,8 @@ public QueryDisplay getQueryDisplay() { } @Override - public void setOperationId(String operationId) { - coreDriver.setOperationId(operationId); + public void setOperationId(String guid64) { + coreDriver.setOperationId(guid64); } @Override diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 1041d925d9..b3361854ea 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -59,7 +59,7 @@ private static boolean inited = false; private static boolean enabled = false; - private static Map encryptionZones = new HashMap<>(); + private static Map encryptionZoneToCmrootMapping = new HashMap<>(); private static HadoopShims hadoopShims = ShimLoader.getHadoopShims(); private static Configuration conf; private String msUser; @@ -156,23 +156,31 @@ private ReplChangeManager(Configuration conf) throws MetaException { cmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMDIR); encryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMENCRYPTEDDIR); fallbackNonEncryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR); + //validate cmRootEncrypted is absolute + Path cmRootEncrypted = new Path(encryptedCmRootDir); + if (cmRootEncrypted.isAbsolute()) { + throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be a relative path"); + } + Path cmRootFallback = new Path(fallbackNonEncryptedCmRootDir); + if (!cmRootFallback.isAbsolute()) { + throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be absolute path"); + } //Create default cm root Path cmroot = new Path(cmRootDir); createCmRoot(cmroot); FileSystem cmRootFs = cmroot.getFileSystem(conf); HdfsEncryptionShim pathEncryptionShim = hadoopShims .createHdfsEncryptionShim(cmRootFs, conf); - Path cmRootEncrypted = new Path(encryptedCmRootDir); - if (cmRootEncrypted.isAbsolute()) { - throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be a relative path"); - } if (pathEncryptionShim.isPathEncrypted(cmroot)) { //If cm root is encrypted we keep using it for the encryption zone String encryptionZonePath = cmRootFs.getUri() + pathEncryptionShim.getEncryptionZoneForPath(cmroot).getPath(); - encryptionZones.put(encryptionZonePath, cmRootDir); + encryptionZoneToCmrootMapping.put(encryptionZonePath, cmRootDir); } else { - encryptionZones.put(NO_ENCRYPTION, cmRootDir); + encryptionZoneToCmrootMapping.put(NO_ENCRYPTION, cmRootDir); + } + if (cmRootFs.exists(cmRootFallback) && pathEncryptionShim.isPathEncrypted(cmRootFallback)) { + throw new MetaException(ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.getHiveName() + " should not be encrypted"); } UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); msUser = usergroupInfo.getShortUserName(); @@ -500,7 +508,7 @@ static void scheduleCMClearer(Configuration conf) { .namingPattern(CM_THREAD_NAME_PREFIX + "%d") .daemon(true) .build()); - executor.scheduleAtFixedRate(new CMClearer(encryptionZones, + executor.scheduleAtFixedRate(new CMClearer(encryptionZoneToCmrootMapping, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); } @@ -553,14 +561,14 @@ static Path getCmRoot(Path path) throws IOException { //at the root of the encryption zone cmrootDir = encryptionZonePath + Path.SEPARATOR + encryptedCmRootDir; } - if (encryptionZones.containsKey(encryptionZonePath)) { - cmroot = new Path(encryptionZones.get(encryptionZonePath)); + if (encryptionZoneToCmrootMapping.containsKey(encryptionZonePath)) { + cmroot = new Path(encryptionZoneToCmrootMapping.get(encryptionZonePath)); } else { cmroot = new Path(cmrootDir); synchronized (instance) { - if (!encryptionZones.containsKey(encryptionZonePath)) { + if (!encryptionZoneToCmrootMapping.containsKey(encryptionZonePath)) { createCmRoot(cmroot); - encryptionZones.put(encryptionZonePath, cmrootDir); + encryptionZoneToCmrootMapping.put(encryptionZonePath, cmrootDir); } } } @@ -569,11 +577,22 @@ static Path getCmRoot(Path path) throws IOException { } private static void createCmRoot(Path cmroot) throws IOException { - FileSystem cmFs = cmroot.getFileSystem(conf); - // Create cmroot with permission 700 if not exist - if (!cmFs.exists(cmroot)) { - cmFs.mkdirs(cmroot); - cmFs.setPermission(cmroot, new FsPermission("700")); + Retry retriable = new Retry(IOException.class) { + @Override + public Void execute() throws IOException { + FileSystem cmFs = cmroot.getFileSystem(conf); + // Create cmroot with permission 700 if not exist + if (!cmFs.exists(cmroot)) { + cmFs.mkdirs(cmroot); + cmFs.setPermission(cmroot, new FsPermission("700")); + } + return null; + } + }; + try { + retriable.run(); + } catch (Exception e) { + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } } @@ -582,7 +601,7 @@ static void resetReplChangeManagerInstance() { inited = false; enabled = false; instance = null; - encryptionZones.clear(); + encryptionZoneToCmrootMapping.clear(); } public static final PathFilter CMROOT_PATH_FILTER = new PathFilter() { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index ee9f988294..06625de67c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -2840,27 +2840,7 @@ private boolean checkTableDataShouldBeDeleted(Table tbl, boolean deleteData) { */ private void deleteTableData(Path tablePath, boolean ifPurge, boolean shouldEnableCm) { if (tablePath != null) { - try { - if (shouldEnableCm) { - //Don't delete cmdir if its inside the table path - FileStatus[] statuses = tablePath.getFileSystem(conf).listStatus(tablePath, - ReplChangeManager.CMROOT_PATH_FILTER); - for (final FileStatus status : statuses) { - wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm); - } - //Check if table directory is empty, delete it - FileStatus[] statusWithoutFilter = tablePath.getFileSystem(conf).listStatus(tablePath); - if (statusWithoutFilter.length == 0) { - wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm); - } - } else { - //If no cm delete the complete table directory - wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm); - } - } catch (Exception e) { - LOG.error("Failed to delete table directory: " + tablePath + - " " + e.getMessage()); - } + deleteDataExcludeCmroot(tablePath, ifPurge, shouldEnableCm); } } @@ -2895,27 +2875,7 @@ private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { private void deletePartitionData(List partPaths, boolean ifPurge, boolean shouldEnableCm) { if (partPaths != null && !partPaths.isEmpty()) { for (Path partPath : partPaths) { - try { - if (shouldEnableCm) { - //Don't delete cmdir if its inside the partition path - FileStatus[] statuses = partPath.getFileSystem(conf).listStatus(partPath, - ReplChangeManager.CMROOT_PATH_FILTER); - for (final FileStatus status : statuses) { - wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm); - } - //Check if table directory is empty, delete it - FileStatus[] statusWithoutFilter = partPath.getFileSystem(conf).listStatus(partPath); - if (statusWithoutFilter.length == 0) { - wh.deleteDir(partPath, true, ifPurge, shouldEnableCm); - } - } else { - //If no cm delete the complete table directory - wh.deleteDir(partPath, true, ifPurge, shouldEnableCm); - } - } catch (Exception e) { - LOG.error("Failed to delete partition directory: " + partPath + - " " + e.getMessage()); - } + deleteDataExcludeCmroot(partPath, ifPurge, shouldEnableCm); } } } @@ -2942,6 +2902,39 @@ private void deletePartitionData(List partPaths, boolean ifPurge, Database } } + /** + * Delete data from path excluding cmdir + * and for each that fails logs an error. + * + * @param path + * @param ifPurge completely purge the partition (skipping trash) while + * removing data from warehouse + * @param shouldEnableCm If cm should be enabled + */ + private void deleteDataExcludeCmroot(Path path, boolean ifPurge, boolean shouldEnableCm) { + try { + if (shouldEnableCm) { + //Don't delete cmdir if its inside the partition path + FileStatus[] statuses = path.getFileSystem(conf).listStatus(path, + ReplChangeManager.CMROOT_PATH_FILTER); + for (final FileStatus status : statuses) { + wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm); + } + //Check if table directory is empty, delete it + FileStatus[] statusWithoutFilter = path.getFileSystem(conf).listStatus(path); + if (statusWithoutFilter.length == 0) { + wh.deleteDir(path, true, ifPurge, shouldEnableCm); + } + } else { + //If no cm delete the complete table directory + wh.deleteDir(path, true, ifPurge, shouldEnableCm); + } + } catch (Exception e) { + LOG.error("Failed to delete directory: " + path + + " " + e.getMessage()); + } + } + /** * Deletes the partitions specified by catName, dbName, tableName. If checkLocation is true, for * locations of partitions which may not be subdirectories of tablePath checks to make sure the