diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a120b4573d..605bd9997f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -455,7 +455,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPLCMENCRYPTEDDIR("hive.repl.cm.encryptionzone.rootdir", ".cmroot", "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."), REPLCMFALLBACKNONENCRYPTEDDIR("hive.repl.cm.nonencryptionzone.rootdir", - "/user/${system:user.name}/cmroot/", + "", "Root dir for ChangeManager for non encrypted paths if hive.repl.cmrootdir is encrypted."), REPLCMINTERVAL("hive.repl.cm.interval","3600s", new TimeValidator(TimeUnit.SECONDS), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java index d89d67c423..7ba0d3ee2a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java @@ -109,6 +109,7 @@ private void addReplChangeManagerConfigs() throws Exception { MetastoreConf.setBoolVar(conf, ConfVars.REPLCMENABLED, true); String cmroot = "hdfs://" + miniDFS.getNameNode().getHostAndPort() + "/cmroot"; MetastoreConf.setVar(conf, ConfVars.REPLCMDIR, cmroot); + MetastoreConf.setVar(conf, ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR, cmroot); threadNames.put(ReplChangeManager.CM_THREAD_NAME_PREFIX, false); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java index 51bb78733a..41a1ce9e1d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java @@ -1364,8 +1364,10 @@ public void testCmrootEncrypted() throws Exception { "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); - String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmroot"; + String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootDirEncrypted"; encryptedHiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootdirEncrypted); + FileSystem cmrootdirEncryptedFs = new Path(cmrootdirEncrypted).getFileSystem(hiveConf); + cmrootdirEncryptedFs.mkdirs(new Path(cmrootdirEncrypted)); encryptedHiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmrootFallBack); //Create cm in encrypted zone @@ -1410,10 +1412,89 @@ public void testCmrootEncrypted() throws Exception { exceptionThrown = true; } assertFalse(exceptionThrown); + cmrootdirEncryptedFs.delete(new Path(cmrootdirEncrypted), true); ReplChangeManager.resetReplChangeManagerInstance(); initReplChangeManager(); } + @Test + public void testCmrootFallbackEncrypted() throws Exception { + HiveConf encryptedHiveConf = new HiveConf(TestReplChangeManager.class); + encryptedHiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); + encryptedHiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); + encryptedHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); + String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootIsEncrypted"; + String cmRootFallbackEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + + "/cmrootFallbackEncrypted"; + FileSystem cmrootdirEncryptedFs = new Path(cmrootdirEncrypted).getFileSystem(encryptedHiveConf); + try { + cmrootdirEncryptedFs.mkdirs(new Path(cmrootdirEncrypted)); + cmrootdirEncryptedFs.mkdirs(new Path(cmRootFallbackEncrypted)); + encryptedHiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootdirEncrypted); + encryptedHiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmRootFallbackEncrypted); + + //Create cm in encrypted zone + HadoopShims.HdfsEncryptionShim shimCmEncrypted = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf); + shimCmEncrypted.createEncryptionZone(new Path(cmrootdirEncrypted), "test_key_db"); + shimCmEncrypted.createEncryptionZone(new Path(cmRootFallbackEncrypted), "test_key_db"); + ReplChangeManager.resetReplChangeManagerInstance(); + boolean exceptionThrown = false; + try { + new Warehouse(encryptedHiveConf); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("should not be encrypted")); + } + assertTrue(exceptionThrown); + } finally { + cmrootdirEncryptedFs.delete(new Path(cmrootdirEncrypted), true); + cmrootdirEncryptedFs.delete(new Path(cmRootFallbackEncrypted), true); + ReplChangeManager.resetReplChangeManagerInstance(); + initReplChangeManager(); + } + } + + @Test + public void testCmrootFallbackRelative() throws Exception { + HiveConf encryptedHiveConf = new HiveConf(TestReplChangeManager.class); + encryptedHiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); + encryptedHiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); + encryptedHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); + String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootIsEncrypted"; + String cmRootFallbackEncrypted = "cmrootFallbackEncrypted"; + FileSystem cmrootdirEncryptedFs = new Path(cmrootdirEncrypted).getFileSystem(encryptedHiveConf); + try { + cmrootdirEncryptedFs.mkdirs(new Path(cmrootdirEncrypted)); + cmrootdirEncryptedFs.mkdirs(new Path(cmRootFallbackEncrypted)); + encryptedHiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootdirEncrypted); + encryptedHiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmRootFallbackEncrypted); + + //Create cm in encrypted zone + HadoopShims.HdfsEncryptionShim shimCmEncrypted = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf); + shimCmEncrypted.createEncryptionZone(new Path(cmrootdirEncrypted), "test_key_db"); + + ReplChangeManager.resetReplChangeManagerInstance(); + boolean exceptionThrown = false; + try { + new Warehouse(encryptedHiveConf); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("should be absolute")); + } + assertTrue(exceptionThrown); + } finally { + cmrootdirEncryptedFs.delete(new Path(cmrootdirEncrypted), true); + cmrootdirEncryptedFs.delete(new Path(cmRootFallbackEncrypted), true); + ReplChangeManager.resetReplChangeManagerInstance(); + initReplChangeManager(); + } + } + + private void createFile(Path path, String content) throws IOException { FSDataOutputStream output = path.getFileSystem(hiveConf).create(path); diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java index cb3d9cc4e0..d9c5666bc4 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java @@ -39,21 +39,13 @@ private BufferAllocator allocator; private ArrowStreamReader arrowStreamReader; - //Allows client to provide and manage their own arrow BufferAllocator public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class clazz, - JobConf job, Closeable client, Socket socket, BufferAllocator allocator) throws IOException { + JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { super(in, schema, clazz, job, client, socket); - this.allocator = allocator; + allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit); this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator); } - //Use the global arrow BufferAllocator - public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class clazz, - JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { - this(in, schema, clazz, job, client, socket, - RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit)); - } - @Override public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOException { try { @@ -84,9 +76,6 @@ public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOExcep @Override public void close() throws IOException { arrowStreamReader.close(); - //allocator.close() will throw exception unless all buffers have been released - //See org.apache.arrow.memory.BaseAllocator.close() - allocator.close(); } } diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java index 46566be332..fafbdee210 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java @@ -25,28 +25,16 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import java.io.IOException; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; -import java.util.UUID; /* * Adapts an Arrow batch reader to a row reader - * Only used for testing */ public class LlapArrowRowInputFormat implements InputFormat { private LlapBaseInputFormat baseInputFormat; public LlapArrowRowInputFormat(long arrowAllocatorLimit) { - BufferAllocator allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit).newChildAllocator( - //allocator name, use UUID for testing - UUID.randomUUID().toString(), - //No use for reservation, allocators claim memory from the same pool, - //but allocate/releases are tracked per-allocator - 0, - //Limit passed in by client - arrowAllocatorLimit); - baseInputFormat = new LlapBaseInputFormat(true, allocator); + baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit); } @Override diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 6bf7f33f64..5c99655104 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -66,7 +66,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.arrow.memory.BufferAllocator; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.Credentials; @@ -108,7 +107,6 @@ private String query; private boolean useArrow; private long arrowAllocatorLimit; - private BufferAllocator allocator; private final Random rand = new Random(); public static final String URL_KEY = "llap.if.hs2.connection"; @@ -131,17 +129,11 @@ public LlapBaseInputFormat(String url, String user, String pwd, String query) { this.query = query; } - //Exposed only for testing, clients should use LlapBaseInputFormat(boolean, BufferAllocator instead) public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) { this.useArrow = useArrow; this.arrowAllocatorLimit = arrowAllocatorLimit; } - public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) { - this.useArrow = useArrow; - this.allocator = allocator; - } - public LlapBaseInputFormat() { this.useArrow = false; } @@ -222,19 +214,10 @@ public LlapBaseInputFormat() { @SuppressWarnings("rawtypes") LlapBaseRecordReader recordReader; if(useArrow) { - if(allocator != null) { - //Client provided their own allocator - recordReader = new LlapArrowBatchRecordReader( - socket.getInputStream(), llapSplit.getSchema(), - ArrowWrapperWritable.class, job, llapClient, socket, - allocator); - } else { - //Client did not provide their own allocator, use constructor for global allocator - recordReader = new LlapArrowBatchRecordReader( - socket.getInputStream(), llapSplit.getSchema(), - ArrowWrapperWritable.class, job, llapClient, socket, - arrowAllocatorLimit); - } + recordReader = new LlapArrowBatchRecordReader( + socket.getInputStream(), llapSplit.getSchema(), + ArrowWrapperWritable.class, job, llapClient, socket, + arrowAllocatorLimit); } else { recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 1f8bc12fde..5191800f32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hive.ql; +import java.io.DataInput; import java.io.IOException; +import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -49,18 +53,26 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; import org.apache.hadoop.hive.ql.ddl.DDLDesc.DDLDescWithWriteId; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.TaskResult; +import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.PrivateHookContext; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lock.CompileLock; import org.apache.hadoop.hive.ql.lock.CompileLockFactory; @@ -70,6 +82,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter; @@ -89,6 +102,7 @@ import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.hive.common.util.TxnIdUtils; @@ -97,6 +111,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; public class Driver implements IDriver { @@ -109,11 +124,15 @@ private int maxRows = 100; private ByteStream.Output bos = new ByteStream.Output(); + private DataInput resStream; private Context context; private final DriverContext driverContext; private TaskQueue taskQueue; private final List hiveLocks = new ArrayList(); + // HS2 operation handle guid string + private String operationId; + private DriverState driverState = new DriverState(); @Override @@ -926,9 +945,7 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command } try { - taskQueue = new TaskQueue(context); // for canceling the query (should be bound to session?) - Executor executor = new Executor(context, driverContext, driverState, taskQueue); - executor.execute(); + execute(); } catch (CommandProcessorException cpe) { rollback(cpe); throw cpe; @@ -1064,6 +1081,535 @@ private boolean isExplicitLockOperation() { return false; } + private void useFetchFromCache(CacheEntry cacheEntry) { + // Change query FetchTask to use new location specified in results cache. + FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork()); + fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, context); + driverContext.getPlan().setFetchTask(fetchTaskFromCache); + driverContext.setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry)); + } + + private void preExecutionCacheActions() throws Exception { + if (driverContext.getCacheUsage() != null) { + if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && + driverContext.getPlan().getFetchTask() != null) { + ValidTxnWriteIdList txnWriteIdList = null; + if (driverContext.getPlan().hasAcidResourcesInQuery()) { + txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf()); + } + // The results of this query execution might be cacheable. + // Add a placeholder entry in the cache so other queries know this result is pending. + CacheEntry pendingCacheEntry = + QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList); + if (pendingCacheEntry != null) { + // Update cacheUsage to reference the pending entry. + this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry); + } + } + } + } + + private void postExecutionCacheActions() throws Exception { + if (driverContext.getCacheUsage() != null) { + if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) { + // Using a previously cached result. + CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry(); + + // Reader count already incremented during cache lookup. + // Save to usedCacheEntry to ensure reader is released after query. + driverContext.setUsedCacheEntry(cacheEntry); + } else if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && + driverContext.getCacheUsage().getCacheEntry() != null && + driverContext.getPlan().getFetchTask() != null) { + // Save results to the cache for future queries to use. + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); + + ValidTxnWriteIdList txnWriteIdList = null; + if (driverContext.getPlan().hasAcidResourcesInQuery()) { + txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf()); + } + CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry(); + boolean savedToCache = QueryResultsCache.getInstance().setEntryValid( + cacheEntry, + driverContext.getPlan().getFetchTask().getWork()); + LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry); + if (savedToCache) { + useFetchFromCache(driverContext.getCacheUsage().getCacheEntry()); + // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released. + driverContext.setUsedCacheEntry(driverContext.getCacheUsage().getCacheEntry()); + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); + } + } + } + + private void execute() throws CommandProcessorException { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE); + + boolean noName = Strings.isNullOrEmpty(driverContext.getConf().get(MRJobConfig.JOB_NAME)); + + int maxlen; + if ("spark".equals(driverContext.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE))) { + maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH); + } else { + maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); + } + Metrics metrics = MetricsFactory.getInstance(); + + String queryId = driverContext.getPlan().getQueryId(); + // Get the query string from the conf file as the compileInternal() method might + // hide sensitive information during query redaction. + String queryStr = driverContext.getConf().getQueryString(); + + driverState.lock(); + try { + // if query is not in compiled state, or executing state which is carried over from + // a combined compile/execute in runInternal, throws the error + if (!driverState.isCompiled() && !driverState.isExecuting()) { + String errorMessage = "FAILED: unexpected driverstate: " + driverState + ", for query " + queryStr; + CONSOLE.printError(errorMessage); + throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null); + } else { + driverState.executing(); + } + } finally { + driverState.unlock(); + } + + HookContext hookContext = null; + + // Whether there's any error occurred during query execution. Used for query lifetime hook. + boolean executionError = false; + + try { + LOG.info("Executing command(queryId=" + queryId + "): " + queryStr); + // compile and execute can get called from different threads in case of HS2 + // so clear timing in this thread's Hive object before proceeding. + Hive.get().clearMetaCallTiming(); + + driverContext.getPlan().setStarted(); + + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().startQuery(queryStr, queryId); + SessionState.get().getHiveHistory().logPlanProgress(driverContext.getPlan()); + } + resStream = null; + + SessionState ss = SessionState.get(); + + // TODO: should this use getUserFromAuthenticator? + hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(), + context.getPathToCS(), SessionState.get().getUserName(), ss.getUserIpAddress(), + InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(), Thread.currentThread().getName(), + ss.isHiveServerQuery(), perfLogger, driverContext.getQueryInfo(), context); + hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); + + driverContext.getHookRunner().runPreHooks(hookContext); + + // Trigger query hooks before query execution. + driverContext.getHookRunner().runBeforeExecutionHook(queryStr, hookContext); + + setQueryDisplays(driverContext.getPlan().getRootTasks()); + int mrJobs = Utilities.getMRTasks(driverContext.getPlan().getRootTasks()).size(); + int jobs = mrJobs + Utilities.getTezTasks(driverContext.getPlan().getRootTasks()).size() + + Utilities.getSparkTasks(driverContext.getPlan().getRootTasks()).size(); + if (jobs > 0) { + logMrWarning(mrJobs); + CONSOLE.printInfo("Query ID = " + queryId); + CONSOLE.printInfo("Total jobs = " + jobs); + } + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS, + String.valueOf(jobs)); + SessionState.get().getHiveHistory().setIdToTableMap(driverContext.getPlan().getIdToTableNameMap()); + } + String jobname = Utilities.abbreviate(queryStr, maxlen - 6); + + // A runtime that launches runnable tasks as separate Threads through + // TaskRunners + // As soon as a task isRunnable, it is put in a queue + // At any time, at most maxthreads tasks can be running + // The main thread polls the TaskRunners to check if they have finished. + + DriverUtils.checkInterrupted(driverState, driverContext, "before running tasks.", hookContext, perfLogger); + + taskQueue = new TaskQueue(context); // for canceling the query (should be bound to session?) + taskQueue.prepare(driverContext.getPlan()); + + context.setHDFSCleanup(true); + + SessionState.get().setMapRedStats(new LinkedHashMap<>()); + SessionState.get().setStackTraces(new HashMap<>()); + SessionState.get().setLocalMapRedErrors(new HashMap<>()); + + // Add root Tasks to runnable + for (Task tsk : driverContext.getPlan().getRootTasks()) { + // This should never happen, if it does, it's a bug with the potential to produce + // incorrect results. + assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty(); + taskQueue.addToRunnable(tsk); + + if (metrics != null) { + tsk.updateTaskMetrics(metrics); + } + } + + preExecutionCacheActions(); + + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); + // Loop while you either have tasks running, or tasks queued up + while (taskQueue.isRunning()) { + // Launch upto maxthreads tasks + Task task; + int maxthreads = HiveConf.getIntVar(driverContext.getConf(), HiveConf.ConfVars.EXECPARALLETHREADNUMBER); + while ((task = taskQueue.getRunnable(maxthreads)) != null) { + TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, taskQueue); + if (!runner.isRunning()) { + break; + } + } + + // poll the Tasks to see which one completed + TaskRunner tskRun = taskQueue.pollFinished(); + if (tskRun == null) { + continue; + } + /* + This should be removed eventually. HIVE-17814 gives more detail + explanation of whats happening and HIVE-17815 as to why this is done. + Briefly for replication the graph is huge and so memory pressure is going to be huge if + we keep a lot of references around. + */ + String opName = driverContext.getPlan().getOperationName(); + boolean isReplicationOperation = opName.equals(HiveOperation.REPLDUMP.getOperationName()) + || opName.equals(HiveOperation.REPLLOAD.getOperationName()); + if (!isReplicationOperation) { + hookContext.addCompleteTask(tskRun); + } + + driverContext.getQueryDisplay().setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult()); + + Task tsk = tskRun.getTask(); + TaskResult result = tskRun.getTaskResult(); + + int exitVal = result.getExitVal(); + DriverUtils.checkInterrupted(driverState, driverContext, "when checking the execution result.", hookContext, + perfLogger); + + if (exitVal != 0) { + Task backupTask = tsk.getAndInitBackupTask(); + if (backupTask != null) { + String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); + CONSOLE.printError(errorMessage); + errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); + CONSOLE.printError(errorMessage); + + // add backup task to runnable + if (TaskQueue.isLaunchable(backupTask)) { + taskQueue.addToRunnable(backupTask); + } + continue; + + } else { + String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); + if (taskQueue.isShutdown()) { + errorMessage = "FAILED: Operation cancelled. " + errorMessage; + } + DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, + errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError()); + String sqlState = "08S01"; + + // 08S01 (Communication error) is the default sql state. Override the sqlstate + // based on the ErrorMsg set in HiveException. + if (result.getTaskError() instanceof HiveException) { + ErrorMsg errorMsg = ((HiveException) result.getTaskError()). + getCanonicalErrorMsg(); + if (errorMsg != ErrorMsg.GENERIC_ERROR) { + sqlState = errorMsg.getSQLState(); + } + } + + CONSOLE.printError(errorMessage); + taskQueue.shutdown(); + // in case we decided to run everything in local mode, restore the + // the jobtracker setting to its initial value + context.restoreOriginalTracker(); + throw DriverUtils.createProcessorException(driverContext, exitVal, errorMessage, sqlState, + result.getTaskError()); + } + } + + taskQueue.finished(tskRun); + + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(), + Keys.TASK_RET_CODE, String.valueOf(exitVal)); + SessionState.get().getHiveHistory().endTask(queryId, tsk); + } + + if (tsk.getChildTasks() != null) { + for (Task child : tsk.getChildTasks()) { + if (TaskQueue.isLaunchable(child)) { + taskQueue.addToRunnable(child); + } + } + } + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS); + + postExecutionCacheActions(); + + // in case we decided to run everything in local mode, restore the + // the jobtracker setting to its initial value + context.restoreOriginalTracker(); + + if (taskQueue.isShutdown()) { + String errorMessage = "FAILED: Operation cancelled"; + DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, null); + CONSOLE.printError(errorMessage); + throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null); + } + + // remove incomplete outputs. + // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions. + // remove them + HashSet remOutputs = new LinkedHashSet(); + for (WriteEntity output : driverContext.getPlan().getOutputs()) { + if (!output.isComplete()) { + remOutputs.add(output); + } + } + + for (WriteEntity output : remOutputs) { + driverContext.getPlan().getOutputs().remove(output); + } + + + hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK); + + driverContext.getHookRunner().runPostExecHooks(hookContext); + + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, + String.valueOf(0)); + SessionState.get().getHiveHistory().printRowCount(queryId); + } + releasePlan(driverContext.getPlan()); + } catch (CommandProcessorException cpe) { + executionError = true; + throw cpe; + } catch (Throwable e) { + executionError = true; + + DriverUtils.checkInterrupted(driverState, driverContext, "during query execution: \n" + e.getMessage(), + hookContext, perfLogger); + + context.restoreOriginalTracker(); + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, + String.valueOf(12)); + } + // TODO: do better with handling types of Exception here + String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); + if (hookContext != null) { + try { + DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, e); + } catch (Exception t) { + LOG.warn("Failed to invoke failure hook", t); + } + } + CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e)); + throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, "08S01", e); + } finally { + // Trigger query hooks after query completes its execution. + try { + driverContext.getHookRunner().runAfterExecutionHook(queryStr, hookContext, executionError); + } catch (Exception e) { + LOG.warn("Failed when invoking query after execution hook", e); + } + + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().endQuery(queryId); + } + if (noName) { + driverContext.getConf().set(MRJobConfig.JOB_NAME, ""); + } + double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE)/1000.00; + + ImmutableMap executionHMSTimings = Hive.dumpMetaCallTimingWithoutEx("execution"); + driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings); + + Map stats = SessionState.get().getMapRedStats(); + if (stats != null && !stats.isEmpty()) { + long totalCpu = 0; + long numModifiedRows = 0; + CONSOLE.printInfo("MapReduce Jobs Launched: "); + for (Map.Entry entry : stats.entrySet()) { + CONSOLE.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue()); + totalCpu += entry.getValue().getCpuMSec(); + + if (numModifiedRows > -1) { + //if overflow, then numModifiedRows is set as -1. Else update numModifiedRows with the sum. + numModifiedRows = addWithOverflowCheck(numModifiedRows, entry.getValue().getNumModifiedRows()); + } + } + driverContext.getQueryState().setNumModifiedRows(numModifiedRows); + CONSOLE.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); + } + SparkSession ss = SessionState.get().getSparkSession(); + if (ss != null) { + ss.onQueryCompletion(queryId); + } + driverState.lock(); + try { + driverState.executionFinished(executionError); + } finally { + driverState.unlock(); + } + if (driverState.isAborted()) { + LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); + } else { + LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); + } + } + } + + private long addWithOverflowCheck(long val1, long val2) { + try { + return Math.addExact(val1, val2); + } catch (ArithmeticException e) { + return -1; + } + } + + private void releasePlan(QueryPlan plan) { + // Plan maybe null if Driver.close is called in another thread for the same Driver object + driverState.lock(); + try { + if (plan != null) { + plan.setDone(); + if (SessionState.get() != null) { + try { + SessionState.get().getHiveHistory().logPlanProgress(plan); + } catch (Exception e) { + // Log and ignore + LOG.warn("Could not log query plan progress", e); + } + } + } + } finally { + driverState.unlock(); + } + } + + private void setQueryDisplays(List> tasks) { + if (tasks != null) { + Set> visited = new HashSet>(); + while (!tasks.isEmpty()) { + tasks = setQueryDisplays(tasks, visited); + } + } + } + + private List> setQueryDisplays( + List> tasks, + Set> visited) { + List> childTasks = new ArrayList<>(); + for (Task task : tasks) { + if (visited.contains(task)) { + continue; + } + task.setQueryDisplay(driverContext.getQueryDisplay()); + if (task.getDependentTasks() != null) { + childTasks.addAll(task.getDependentTasks()); + } + visited.add(task); + } + return childTasks; + } + + private void logMrWarning(int mrJobs) { + if (mrJobs <= 0 || !("mr".equals(HiveConf.getVar(driverContext.getConf(), ConfVars.HIVE_EXECUTION_ENGINE)))) { + return; + } + String warning = HiveConf.generateMrDeprecationWarning(); + LOG.warn(warning); + } + + private String getErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task tsk) { + String errorMessage = "FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName(); + if (downstreamError != null) { + //here we assume that upstream code may have parametrized the msg from ErrorMsg + //so we want to keep it + if (downstreamError.getMessage() != null) { + errorMessage += ". " + downstreamError.getMessage(); + } else { + errorMessage += ". " + StringUtils.stringifyException(downstreamError); + } + } + else { + ErrorMsg em = ErrorMsg.getErrorMsg(exitVal); + if (em != null) { + errorMessage += ". " + em.getMsg(); + } + } + + return errorMessage; + } + + /** + * Launches a new task + * + * @param tsk + * task being launched + * @param queryId + * Id of the query containing the task + * @param noName + * whether the task has a name set + * @param jobname + * name of the task, if it is a map-reduce job + * @param jobs + * number of map-reduce jobs + * @param taskQueue + * the task queue + */ + private TaskRunner launchTask(Task tsk, String queryId, boolean noName, + String jobname, int jobs, TaskQueue taskQueue) throws HiveException { + if (SessionState.get() != null) { + SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName()); + } + if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) { + if (noName) { + driverContext.getConf().set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")"); + } + driverContext.getConf().set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId()); + Utilities.setWorkflowAdjacencies(driverContext.getConf(), driverContext.getPlan()); + taskQueue.incCurJobNo(1); + CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of " + jobs); + } + tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, context); + TaskRunner tskRun = new TaskRunner(tsk, taskQueue); + + taskQueue.launching(tskRun); + // Launch Task + if (HiveConf.getBoolVar(tsk.getConf(), HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) { + // Launch it in the parallel mode, as a separate thread only for MR tasks + if (LOG.isInfoEnabled()){ + LOG.info("Starting task [" + tsk + "] in parallel"); + } + tskRun.start(); + } else { + if (LOG.isInfoEnabled()){ + LOG.info("Starting task [" + tsk + "] in serial mode"); + } + tskRun.runSequential(); + } + return tskRun; + } + @Override public boolean isFetchingTable() { return driverContext.getFetchTask() != null; @@ -1089,10 +1635,10 @@ public boolean getResults(List res) throws IOException { return driverContext.getFetchTask().fetch(res); } - if (driverContext.getResStream() == null) { - driverContext.setResStream(context.getStream()); + if (resStream == null) { + resStream = context.getStream(); } - if (driverContext.getResStream() == null) { + if (resStream == null) { return false; } @@ -1100,7 +1646,7 @@ public boolean getResults(List res) throws IOException { String row = null; while (numRows < maxRows) { - if (driverContext.getResStream() == null) { + if (resStream == null) { if (numRows > 0) { return true; } else { @@ -1111,7 +1657,7 @@ public boolean getResults(List res) throws IOException { bos.reset(); Utilities.StreamStatus ss; try { - ss = Utilities.readColumn(driverContext.getResStream(), bos); + ss = Utilities.readColumn(resStream, bos); if (bos.getLength() > 0) { row = new String(bos.getData(), 0, bos.getLength(), "UTF-8"); } else if (ss == Utilities.StreamStatus.TERMINATED) { @@ -1129,7 +1675,7 @@ public boolean getResults(List res) throws IOException { } if (ss == Utilities.StreamStatus.EOF) { - driverContext.setResStream(context.getStream()); + resStream = context.getStream(); } } return true; @@ -1150,7 +1696,7 @@ public void resetFetch() throws IOException { driverContext.getFetchTask().initialize(driverContext.getQueryState(), null, null, context); } else { context.resetStream(); - driverContext.setResStream(null); + resStream = null; } } @@ -1210,9 +1756,9 @@ private void releaseContext() { private void releaseResStream() { try { - if (driverContext.getResStream() != null) { - ((FSDataInputStream) driverContext.getResStream()).close(); - driverContext.setResStream(null); + if (resStream != null) { + ((FSDataInputStream) resStream).close(); + resStream = null; } } catch (Exception e) { LOG.debug(" Exception while closing the resStream ", e); @@ -1337,11 +1883,11 @@ public QueryDisplay getQueryDisplay() { /** * Set the HS2 operation handle's guid string - * @param operationId base64 encoded guid string + * @param opId base64 encoded guid string */ @Override - public void setOperationId(String operationId) { - driverContext.setOperationId(operationId); + public void setOperationId(String opId) { + this.operationId = opId; } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index a8c83fc504..bbf7fe5379 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql; -import java.io.DataInput; - import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Schema; @@ -73,11 +71,6 @@ private Context backupContext = null; private boolean retrial = false; - private DataInput resStream; - - // HS2 operation handle guid string - private String operationId; - public DriverContext(QueryState queryState, QueryInfo queryInfo, HookRunner hookRunner, HiveTxnManager initTxnManager) { this.queryState = queryState; @@ -222,20 +215,4 @@ public boolean isRetrial() { public void setRetrial(boolean retrial) { this.retrial = retrial; } - - public DataInput getResStream() { - return resStream; - } - - public void setResStream(DataInput resStream) { - this.resStream = resStream; - } - - public String getOperationId() { - return operationId; - } - - public void setOperationId(String operationId) { - this.operationId = operationId; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Executor.java b/ql/src/java/org/apache/hadoop/hive/ql/Executor.java deleted file mode 100644 index e9909a9943..0000000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/Executor.java +++ /dev/null @@ -1,593 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.metrics.common.Metrics; -import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.cache.results.CacheUsage; -import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; -import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; -import org.apache.hadoop.hive.ql.exec.ConditionalTask; -import org.apache.hadoop.hive.ql.exec.DagUtils; -import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.TaskResult; -import org.apache.hadoop.hive.ql.exec.TaskRunner; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; -import org.apache.hadoop.hive.ql.hooks.HookContext; -import org.apache.hadoop.hive.ql.hooks.PrivateHookContext; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.HiveOperation; -import org.apache.hadoop.hive.ql.processors.CommandProcessorException; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; - -/** - * Executes the Query Plan. - */ -public class Executor { - private static final String CLASS_NAME = Driver.class.getName(); - private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - private static final LogHelper CONSOLE = new LogHelper(LOG); - - private final Context context; - private final DriverContext driverContext; - private final DriverState driverState; - private final TaskQueue taskQueue; - - private HookContext hookContext; - - public Executor(Context context, DriverContext driverContext, DriverState driverState, TaskQueue taskQueue) { - this.context = context; - this.driverContext = driverContext; - this.driverState = driverState; - this.taskQueue = taskQueue; - } - - public void execute() throws CommandProcessorException { - SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE); - - boolean noName = Strings.isNullOrEmpty(driverContext.getConf().get(MRJobConfig.JOB_NAME)); - - checkState(); - - // Whether there's any error occurred during query execution. Used for query lifetime hook. - boolean executionError = false; - - try { - LOG.info("Executing command(queryId=" + driverContext.getQueryId() + "): " + driverContext.getQueryString()); - - // TODO: should this use getUserFromAuthenticator? - hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(), - context.getPathToCS(), SessionState.get().getUserName(), SessionState.get().getUserIpAddress(), - InetAddress.getLocalHost().getHostAddress(), driverContext.getOperationId(), - SessionState.get().getSessionId(), Thread.currentThread().getName(), SessionState.get().isHiveServerQuery(), - SessionState.getPerfLogger(), driverContext.getQueryInfo(), context); - - preExecutionActions(); - preExecutionCacheActions(); - runTasks(noName); - postExecutionCacheActions(); - postExecutionActions(); - } catch (CommandProcessorException cpe) { - executionError = true; - throw cpe; - } catch (Throwable e) { - executionError = true; - DriverUtils.checkInterrupted(driverState, driverContext, "during query execution: \n" + e.getMessage(), - hookContext, SessionState.getPerfLogger()); - handleException(hookContext, e); - } finally { - cleanUp(noName, hookContext, executionError); - } - } - - private void checkState() throws CommandProcessorException { - driverState.lock(); - try { - // if query is not in compiled state, or executing state which is carried over from - // a combined compile/execute in runInternal, throws the error - if (!driverState.isCompiled() && !driverState.isExecuting()) { - String errorMessage = "FAILED: unexpected driverstate: " + driverState + ", for query " + - driverContext.getQueryString(); - CONSOLE.printError(errorMessage); - throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null); - } else { - driverState.executing(); - } - } finally { - driverState.unlock(); - } - } - - private void preExecutionActions() throws Exception { - // compile and execute can get called from different threads in case of HS2 - // so clear timing in this thread's Hive object before proceeding. - Hive.get().clearMetaCallTiming(); - - driverContext.getPlan().setStarted(); - - SessionState.get().getHiveHistory().startQuery(driverContext.getQueryString(), driverContext.getQueryId()); - SessionState.get().getHiveHistory().logPlanProgress(driverContext.getPlan()); - driverContext.setResStream(null); - - hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); - driverContext.getHookRunner().runPreHooks(hookContext); - - // Trigger query hooks before query execution. - driverContext.getHookRunner().runBeforeExecutionHook(driverContext.getQueryString(), hookContext); - - setQueryDisplays(driverContext.getPlan().getRootTasks()); - - // A runtime that launches runnable tasks as separate Threads through TaskRunners - // As soon as a task isRunnable, it is put in a queue - // At any time, at most maxthreads tasks can be running - // The main thread polls the TaskRunners to check if they have finished. - - DriverUtils.checkInterrupted(driverState, driverContext, "before running tasks.", hookContext, - SessionState.getPerfLogger()); - - taskQueue.prepare(driverContext.getPlan()); - - context.setHDFSCleanup(true); - - SessionState.get().setMapRedStats(new LinkedHashMap<>()); - SessionState.get().setStackTraces(new HashMap<>()); - SessionState.get().setLocalMapRedErrors(new HashMap<>()); - - // Add root Tasks to runnable - Metrics metrics = MetricsFactory.getInstance(); - for (Task task : driverContext.getPlan().getRootTasks()) { - // This should never happen, if it does, it's a bug with the potential to produce - // incorrect results. - assert task.getParentTasks() == null || task.getParentTasks().isEmpty(); - taskQueue.addToRunnable(task); - - if (metrics != null) { - task.updateTaskMetrics(metrics); - } - } - } - - private void setQueryDisplays(List> tasks) { - if (tasks != null) { - Set> visited = new HashSet>(); - while (!tasks.isEmpty()) { - tasks = setQueryDisplays(tasks, visited); - } - } - } - - private List> setQueryDisplays(List> tasks, Set> visited) { - List> childTasks = new ArrayList<>(); - for (Task task : tasks) { - if (visited.contains(task)) { - continue; - } - task.setQueryDisplay(driverContext.getQueryDisplay()); - if (task.getDependentTasks() != null) { - childTasks.addAll(task.getDependentTasks()); - } - visited.add(task); - } - return childTasks; - } - - private void preExecutionCacheActions() throws Exception { - if (driverContext.getCacheUsage() == null) { - return; - } - - if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && - driverContext.getPlan().getFetchTask() != null) { - ValidTxnWriteIdList txnWriteIdList = null; - if (driverContext.getPlan().hasAcidResourcesInQuery()) { - txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf()); - } - // The results of this query execution might be cacheable. - // Add a placeholder entry in the cache so other queries know this result is pending. - CacheEntry pendingCacheEntry = - QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList); - if (pendingCacheEntry != null) { - // Update cacheUsage to reference the pending entry. - this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry); - } - } - } - - private void runTasks(boolean noName) throws Exception { - SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); - - int jobCount = getJobCount(); - String jobName = getJobName(); - - // Loop while you either have tasks running, or tasks queued up - while (taskQueue.isRunning()) { - launchTasks(noName, jobCount, jobName); - handleFinished(); - } - - SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS); - } - - private void handleFinished() throws Exception { - // poll the Tasks to see which one completed - TaskRunner taskRun = taskQueue.pollFinished(); - if (taskRun == null) { - return; - } - /* - This should be removed eventually. HIVE-17814 gives more detail - explanation of whats happening and HIVE-17815 as to why this is done. - Briefly for replication the graph is huge and so memory pressure is going to be huge if - we keep a lot of references around. - */ - String opName = driverContext.getPlan().getOperationName(); - boolean isReplicationOperation = opName.equals(HiveOperation.REPLDUMP.getOperationName()) - || opName.equals(HiveOperation.REPLLOAD.getOperationName()); - if (!isReplicationOperation) { - hookContext.addCompleteTask(taskRun); - } - - driverContext.getQueryDisplay().setTaskResult(taskRun.getTask().getId(), taskRun.getTaskResult()); - - Task task = taskRun.getTask(); - TaskResult result = taskRun.getTaskResult(); - - int exitVal = result.getExitVal(); - DriverUtils.checkInterrupted(driverState, driverContext, "when checking the execution result.", hookContext, - SessionState.getPerfLogger()); - - if (exitVal != 0) { - handleTaskFailure(task, result, exitVal); - return; - } - - taskQueue.finished(taskRun); - - SessionState.get().getHiveHistory().setTaskProperty(driverContext.getQueryId(), task.getId(), - Keys.TASK_RET_CODE, String.valueOf(exitVal)); - SessionState.get().getHiveHistory().endTask(driverContext.getQueryId(), task); - - if (task.getChildTasks() != null) { - for (Task child : task.getChildTasks()) { - if (TaskQueue.isLaunchable(child)) { - taskQueue.addToRunnable(child); - } - } - } - } - - private String getJobName() { - int maxlen; - if ("spark".equals(driverContext.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE))) { - maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH); - } else { - maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); - } - return Utilities.abbreviate(driverContext.getQueryString(), maxlen - 6); - } - - private int getJobCount() { - int mrJobCount = Utilities.getMRTasks(driverContext.getPlan().getRootTasks()).size(); - int jobCount = mrJobCount + Utilities.getTezTasks(driverContext.getPlan().getRootTasks()).size() - + Utilities.getSparkTasks(driverContext.getPlan().getRootTasks()).size(); - if (jobCount > 0) { - if (mrJobCount > 0 && "mr".equals(HiveConf.getVar(driverContext.getConf(), ConfVars.HIVE_EXECUTION_ENGINE))) { - LOG.warn(HiveConf.generateMrDeprecationWarning()); - } - CONSOLE.printInfo("Query ID = " + driverContext.getPlan().getQueryId()); - CONSOLE.printInfo("Total jobs = " + jobCount); - } - if (SessionState.get() != null) { - SessionState.get().getHiveHistory().setQueryProperty(driverContext.getPlan().getQueryId(), Keys.QUERY_NUM_TASKS, - String.valueOf(jobCount)); - SessionState.get().getHiveHistory().setIdToTableMap(driverContext.getPlan().getIdToTableNameMap()); - } - return jobCount; - } - - private void launchTasks(boolean noName, int jobCount, String jobName) throws HiveException { - // Launch upto maxthreads tasks - Task task; - int maxthreads = HiveConf.getIntVar(driverContext.getConf(), HiveConf.ConfVars.EXECPARALLETHREADNUMBER); - while ((task = taskQueue.getRunnable(maxthreads)) != null) { - TaskRunner runner = launchTask(task, noName, jobName, jobCount); - if (!runner.isRunning()) { - break; - } - } - } - - private TaskRunner launchTask(Task task, boolean noName, String jobName, int jobCount) throws HiveException { - SessionState.get().getHiveHistory().startTask(driverContext.getQueryId(), task, task.getClass().getName()); - - if (task.isMapRedTask() && !(task instanceof ConditionalTask)) { - if (noName) { - driverContext.getConf().set(MRJobConfig.JOB_NAME, jobName + " (" + task.getId() + ")"); - } - driverContext.getConf().set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, task.getId()); - Utilities.setWorkflowAdjacencies(driverContext.getConf(), driverContext.getPlan()); - taskQueue.incCurJobNo(1); - CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of " + jobCount); - } - - task.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, context); - TaskRunner taskRun = new TaskRunner(task, taskQueue); - taskQueue.launching(taskRun); - - if (HiveConf.getBoolVar(task.getConf(), HiveConf.ConfVars.EXECPARALLEL) && task.canExecuteInParallel()) { - LOG.info("Starting task [" + task + "] in parallel"); - taskRun.start(); - } else { - LOG.info("Starting task [" + task + "] in serial mode"); - taskRun.runSequential(); - } - return taskRun; - } - - private void handleTaskFailure(Task task, TaskResult result, int exitVal) - throws HiveException, Exception, CommandProcessorException { - Task backupTask = task.getAndInitBackupTask(); - if (backupTask != null) { - String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), task); - CONSOLE.printError(errorMessage); - errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); - CONSOLE.printError(errorMessage); - - // add backup task to runnable - if (TaskQueue.isLaunchable(backupTask)) { - taskQueue.addToRunnable(backupTask); - } - } else { - String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), task); - if (taskQueue.isShutdown()) { - errorMessage = "FAILED: Operation cancelled. " + errorMessage; - } - DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext, - errorMessage + Strings.nullToEmpty(task.getDiagnosticsMessage()), result.getTaskError()); - String sqlState = "08S01"; - - // 08S01 (Communication error) is the default sql state. Override the sqlstate - // based on the ErrorMsg set in HiveException. - if (result.getTaskError() instanceof HiveException) { - ErrorMsg errorMsg = ((HiveException) result.getTaskError()). - getCanonicalErrorMsg(); - if (errorMsg != ErrorMsg.GENERIC_ERROR) { - sqlState = errorMsg.getSQLState(); - } - } - - CONSOLE.printError(errorMessage); - taskQueue.shutdown(); - // in case we decided to run everything in local mode, restore the - // the jobtracker setting to its initial value - context.restoreOriginalTracker(); - throw DriverUtils.createProcessorException(driverContext, exitVal, errorMessage, sqlState, - result.getTaskError()); - } - } - - private String getErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task task) { - String errorMessage = "FAILED: Execution Error, return code " + exitVal + " from " + task.getClass().getName(); - if (downstreamError != null) { - //here we assume that upstream code may have parametrized the msg from ErrorMsg so we want to keep it - if (downstreamError.getMessage() != null) { - errorMessage += ". " + downstreamError.getMessage(); - } else { - errorMessage += ". " + StringUtils.stringifyException(downstreamError); - } - } else { - ErrorMsg em = ErrorMsg.getErrorMsg(exitVal); - if (em != null) { - errorMessage += ". " + em.getMsg(); - } - } - - return errorMessage; - } - - private void postExecutionCacheActions() throws Exception { - if (driverContext.getCacheUsage() == null) { - return; - } - - if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) { - // Using a previously cached result. - CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry(); - - // Reader count already incremented during cache lookup. - // Save to usedCacheEntry to ensure reader is released after query. - driverContext.setUsedCacheEntry(cacheEntry); - } else if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS && - driverContext.getCacheUsage().getCacheEntry() != null && driverContext.getPlan().getFetchTask() != null) { - // Save results to the cache for future queries to use. - SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); - - CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry(); - boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(cacheEntry, - driverContext.getPlan().getFetchTask().getWork()); - LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry); - if (savedToCache) { - useFetchFromCache(driverContext.getCacheUsage().getCacheEntry()); - // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released. - driverContext.setUsedCacheEntry(driverContext.getCacheUsage().getCacheEntry()); - } - - SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE); - } - } - - private void useFetchFromCache(CacheEntry cacheEntry) { - // Change query FetchTask to use new location specified in results cache. - FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork()); - fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, context); - driverContext.getPlan().setFetchTask(fetchTaskFromCache); - driverContext.setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry)); - } - - private void postExecutionActions() throws Exception { - // in case we decided to run everything in local mode, restore the the jobtracker setting to its initial value - context.restoreOriginalTracker(); - - if (taskQueue.isShutdown()) { - String errorMessage = "FAILED: Operation cancelled"; - DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext, errorMessage, null); - CONSOLE.printError(errorMessage); - throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null); - } - - // Remove incomplete outputs. - // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions, remove them - driverContext.getPlan().getOutputs().removeIf(x -> !x.isComplete()); - - hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK); - driverContext.getHookRunner().runPostExecHooks(hookContext); - - SessionState.get().getHiveHistory().setQueryProperty(driverContext.getQueryId(), Keys.QUERY_RET_CODE, - String.valueOf(0)); - SessionState.get().getHiveHistory().printRowCount(driverContext.getQueryId()); - releasePlan(driverContext.getPlan()); - } - - private void releasePlan(QueryPlan plan) { - // Plan maybe null if Driver.close is called in another thread for the same Driver object - driverState.lock(); - try { - if (plan != null) { - plan.setDone(); - if (SessionState.get() != null) { - try { - SessionState.get().getHiveHistory().logPlanProgress(plan); - } catch (Exception e) { - // Log and ignore - LOG.warn("Could not log query plan progress", e); - } - } - } - } finally { - driverState.unlock(); - } - } - - private void handleException(HookContext hookContext, Throwable e) throws CommandProcessorException { - context.restoreOriginalTracker(); - if (SessionState.get() != null) { - SessionState.get().getHiveHistory().setQueryProperty(driverContext.getQueryId(), Keys.QUERY_RET_CODE, - String.valueOf(12)); - } - // TODO: do better with handling types of Exception here - String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); - if (hookContext != null) { - try { - DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext, errorMessage, e); - } catch (Exception t) { - LOG.warn("Failed to invoke failure hook", t); - } - } - CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e)); - throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, "08S01", e); - } - - private void cleanUp(boolean noName, HookContext hookContext, boolean executionError) { - // Trigger query hooks after query completes its execution. - try { - driverContext.getHookRunner().runAfterExecutionHook(driverContext.getQueryString(), hookContext, executionError); - } catch (Exception e) { - LOG.warn("Failed when invoking query after execution hook", e); - } - - SessionState.get().getHiveHistory().endQuery(driverContext.getQueryId()); - if (noName) { - driverContext.getConf().set(MRJobConfig.JOB_NAME, ""); - } - double duration = SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE) / 1000.00; - - ImmutableMap executionHMSTimings = Hive.dumpMetaCallTimingWithoutEx("execution"); - driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings); - - logExecutionResourceUsage(); - - if (SessionState.get().getSparkSession() != null) { - SessionState.get().getSparkSession().onQueryCompletion(driverContext.getQueryId()); - } - - driverState.lock(); - try { - driverState.executionFinished(executionError); - } finally { - driverState.unlock(); - } - - if (driverState.isAborted()) { - LOG.info("Executing command(queryId={}) has been interrupted after {} seconds", driverContext.getQueryId(), - duration); - } else { - LOG.info("Completed executing command(queryId={}); Time taken: {} seconds", driverContext.getQueryId(), duration); - } - } - - private void logExecutionResourceUsage() { - Map stats = SessionState.get().getMapRedStats(); - if (stats != null && !stats.isEmpty()) { - long totalCpu = 0; - long numModifiedRows = 0; - CONSOLE.printInfo("MapReduce Jobs Launched: "); - for (Map.Entry entry : stats.entrySet()) { - CONSOLE.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue()); - totalCpu += entry.getValue().getCpuMSec(); - - if (numModifiedRows > -1) { - //if overflow, then numModifiedRows is set as -1. Else update numModifiedRows with the sum. - try { - numModifiedRows = Math.addExact(numModifiedRows, entry.getValue().getNumModifiedRows()); - } catch (ArithmeticException e) { - numModifiedRows = -1; - } - } - } - driverContext.getQueryState().setNumModifiedRows(numModifiedRows); - CONSOLE.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu)); - } - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java index cd05f52626..baad2694b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java @@ -45,7 +45,7 @@ QueryDisplay getQueryDisplay(); - void setOperationId(String operationId); + void setOperationId(String guid64); CommandProcessorResponse run() throws CommandProcessorException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java index c307085366..eab7f45fde 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java @@ -141,8 +141,8 @@ public QueryDisplay getQueryDisplay() { } @Override - public void setOperationId(String operationId) { - coreDriver.setOperationId(operationId); + public void setOperationId(String guid64) { + coreDriver.setOperationId(guid64); } @Override diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCalls.java b/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCalls.java new file mode 100644 index 0000000000..6e755d588d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCalls.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.ql.session.SessionState; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +// this test is to ensure number of calls to metastore server by query compiler +public class TestNumMetastoreCalls { + static HiveConf hConf = null; + static Driver driver = null; + + @BeforeClass + public static void Setup() throws Exception { + hConf = new HiveConf(Driver.class); + driver = setUpImpl(hConf); + driver.run("create table t1(id1 int, name1 string)"); + driver.run("create table t2(id2 int, id1 int, name2 string)"); + driver.run("create database db1"); + driver.run("create table db1.tdb1(id2 int, id1 int, name2 string)"); + driver.run("create table tpart(id2 int, id1 int)" + + " partitioned by (name string)"); + driver.run("alter table tpart add partition (name='p1')") ; + driver.run("alter table tpart add partition (name='p2')") ; + } + + @AfterClass + public static void Teardown() throws Exception { + driver.run("drop table t1"); + driver.run("drop table t2"); + driver.run("drop table db1.tdb1"); + driver.run("drop database db1 cascade"); + } + + private static Driver setUpImpl(HiveConf hiveConf) throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, true); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.RAW_STORE_IMPL, + "org.apache.hadoop.hive.ql.TestNumMetastoreCallsObjectStore"); + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.SCHEDULED_QUERIES_ENABLED, false); + SessionState.start(hiveConf); + return new Driver(hiveConf); + } + + // compiler should do 6 metastore calls for each table reference + // get table, get table col statistics + // pk, fk, unique, not null constraints + // for partitioned table there would be an extra call to get partitions + @Test + public void testSelectQuery() { + int numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + int numCallsAfter = 0; + + // simple select * + String query1 = "select * from t1"; + int rc = driver.compile(query1, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 6); + + // single table + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query2 = "select count(distinct id1) from t1 group by name1"; + rc = driver.compile(query2, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 6); + + // two different tables + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query3 = "select count(*) from t1 join t2 on t1.id1 = t2.id1 " + + "where t2.id2 > 0 group by t1.name1, t2.name2"; + rc = driver.compile(query3, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 12 ); + + //from different dbs + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query4 = "select count(*) from t1 join db1.tdb1 as t2 on t1.id1 = t2.id1 " + + "where t2.id2 > 0 group by t1.name1, t2.name2"; + rc = driver.compile(query4, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 12); + + // three table join + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query5 = "select count(*) from t1 join db1.tdb1 as dbt2 on t1.id1 = dbt2.id1 " + + "join t2 on t1.id1 = t2.id1 " + + "where t2.id2 > 0 group by t1.name1, t2.name2"; + rc = driver.compile(query5, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 18); + + // single partitioned table + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query6 = "select count(distinct id1) from tpart group by name"; + rc = driver.compile(query6, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 7); + + // two different tables + numCallsBefore = TestNumMetastoreCallsObjectStore.getNumCalls(); + String query7 = "select count(*) from t1 join tpart on t1.id1 = tpart.id1 " + + "where tpart.id2 > 0 group by t1.name1, tpart.name"; + rc = driver.compile(query7, true); + assert(rc==0); + numCallsAfter = TestNumMetastoreCallsObjectStore.getNumCalls(); + assert((numCallsAfter - numCallsBefore) == 13); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCallsObjectStore.java b/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCallsObjectStore.java new file mode 100644 index 0000000000..df72c422bb --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestNumMetastoreCallsObjectStore.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.*; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.Catalog; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec; +import org.apache.hadoop.hive.metastore.api.GetPartitionsProjectionSpec; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse; +import org.apache.hadoop.hive.metastore.api.RuntimeStat; +import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint; +import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; +import org.apache.thrift.TException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +// this class is a test wrapper around ObjectStore overriding most of the get methods +// used by compiler. This is used my TestNumMetastoreCalls to ensure the number of calls to +// metastore +public class TestNumMetastoreCallsObjectStore extends ObjectStore { + + static Map callMap = new HashMap<>(); + static private int numCalls = 0; + + static void incrementCall() { + numCalls++; + } + + static int getNumCalls() { + return numCalls; + } + + public TestNumMetastoreCallsObjectStore() { + super(); + } + + + @Override public Catalog getCatalog(String catalogName) + throws NoSuchObjectException, MetaException { + incrementCall(); + return super.getCatalog(catalogName); + } + + @Override public List getCatalogs() throws MetaException { + incrementCall(); + return super.getCatalogs(); + } + + @Override public Database getDatabase(String catalogName, String name) + throws NoSuchObjectException { + incrementCall(); + return super.getDatabase(catalogName, name); + } + + @Override public List getDatabases(String catName, String pattern) throws MetaException { + incrementCall(); + return super.getDatabases(catName, pattern); + } + + @Override public List getAllDatabases(String catName) throws MetaException { + incrementCall(); + return super.getAllDatabases(catName); + } + + @Override public Table getTable(String catName, String dbName, String tableName, + String writeIdList) throws MetaException { + incrementCall(); + return super.getTable(catName, dbName, tableName, writeIdList); + } + + @Override public List getTables(String catName, String dbName, String pattern) + throws MetaException { + incrementCall(); + return super.getTables(catName, dbName, pattern); + } + + @Override public List getTables(String catName, String dbName, String pattern, + TableType tableType, int limit) throws MetaException { + incrementCall(); + return super.getTables(catName, dbName, pattern, tableType, limit); + } + + @Override public List getTableNamesWithStats() + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getTableNamesWithStats(); + } + + @Override public Map> getPartitionColsWithStats(String catName, + String dbName, String tableName) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionColsWithStats(catName, dbName, tableName); + } + + @Override public List getAllTableNamesForStats() + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getAllTableNamesForStats(); + } + + @Override public List getAllMaterializedViewObjectsForRewriting(String catName) + throws MetaException { + incrementCall(); + return super.getAllMaterializedViewObjectsForRewriting(catName); + } + + @Override public List getMaterializedViewsForRewriting(String catName, String dbName) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getMaterializedViewsForRewriting(catName, dbName); + } + + @Override public int getDatabaseCount() throws MetaException { + incrementCall(); + return super.getDatabaseCount(); + } + + @Override public int getPartitionCount() throws MetaException { + incrementCall(); + return super.getPartitionCount(); + } + + @Override public int getTableCount() throws MetaException { + incrementCall(); + return super.getTableCount(); + } + + @Override public List getTableMeta(String catName, String dbNames, String tableNames, + List tableTypes) throws MetaException { + incrementCall(); + return super.getTableMeta(catName, dbNames, tableNames, tableTypes); + } + + @Override public List getAllTables(String catName, String dbName) throws MetaException { + incrementCall(); + return super.getAllTables(catName, dbName); + } + + @Override public List
getTableObjectsByName(String catName, String db, + List tbl_names) throws MetaException, UnknownDBException { + incrementCall(); + return super.getTableObjectsByName(catName, db, tbl_names); + } + + @Override public Partition getPartition(String catName, String dbName, String tableName, + List part_vals) throws NoSuchObjectException, MetaException { + incrementCall(); + return super.getPartition(catName, dbName, tableName, part_vals); + } + + @Override public Partition getPartition(String catName, String dbName, String tableName, + List part_vals, String validWriteIds) throws NoSuchObjectException, MetaException { + incrementCall(); + return super.getPartition(catName, dbName, tableName, part_vals, validWriteIds); + } + + @Override public List getPartitions(String catName, String dbName, String tableName, + int maxParts) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitions(catName, dbName, tableName, maxParts); + } + + @Override public Map getPartitionLocations(String catName, String dbName, + String tblName, String baseLocationToNotShow, int max) { + incrementCall(); + return super.getPartitionLocations(catName, dbName, tblName, baseLocationToNotShow, max); + } + + @Override public List getPartitionsWithAuth(String catName, String dbName, + String tblName, short max, String userName, List groupNames) + throws MetaException, InvalidObjectException { + incrementCall(); + return super.getPartitionsWithAuth(catName, dbName, tblName, max, userName, groupNames); + } + + @Override public Partition getPartitionWithAuth(String catName, String dbName, String tblName, + List partVals, String user_name, List group_names) + throws NoSuchObjectException, MetaException, InvalidObjectException { + incrementCall(); + return super.getPartitionWithAuth(catName, dbName, tblName, partVals, user_name, group_names); + } + + @Override public List listPartitionNames(String catName, String dbName, String tableName, + short max) throws MetaException { + incrementCall(); + return super.listPartitionNames(catName, dbName, tableName, max); + } + + @Override public PartitionValuesResponse listPartitionValues(String catName, String dbName, + String tableName, List cols, boolean applyDistinct, String filter, + boolean ascending, List order, long maxParts) throws MetaException { + incrementCall(); + return super + .listPartitionValues(catName, dbName, tableName, cols, applyDistinct, filter, ascending, + order, maxParts); + } + + @Override public List listPartitionsPsWithAuth(String catName, String db_name, + String tbl_name, List part_vals, short max_parts, String userName, + List groupNames) throws MetaException, InvalidObjectException, NoSuchObjectException { + incrementCall(); + return super + .listPartitionsPsWithAuth(catName, db_name, tbl_name, part_vals, max_parts, userName, + groupNames); + } + + @Override public List listPartitionNamesPs(String catName, String dbName, + String tableName, List part_vals, short max_parts) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.listPartitionNamesPs(catName, dbName, tableName, part_vals, max_parts); + } + + @Override public List getPartitionsByNames(String catName, String dbName, + String tblName, List partNames) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionsByNames(catName, dbName, tblName, partNames); + } + + @Override public boolean getPartitionsByExpr(String catName, String dbName, String tblName, + byte[] expr, String defaultPartitionName, short maxParts, List result) + throws TException { + incrementCall(); + return super.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts, + result); + } + + @Override protected boolean getPartitionsByExprInternal(String catName, String dbName, + String tblName, byte[] expr, String defaultPartitionName, short maxParts, + List result, boolean allowSql, boolean allowJdo) throws TException { + incrementCall(); + return super + .getPartitionsByExprInternal(catName, dbName, tblName, expr, defaultPartitionName, maxParts, + result, allowSql, allowJdo); + } + + @Override public List getPartitionsByFilter(String catName, String dbName, + String tblName, String filter, short maxParts) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionsByFilter(catName, dbName, tblName, filter, maxParts); + } + + @Override public int getNumPartitionsByFilter(String catName, String dbName, String tblName, + String filter) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getNumPartitionsByFilter(catName, dbName, tblName, filter); + } + + @Override public int getNumPartitionsByExpr(String catName, String dbName, String tblName, + byte[] expr) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getNumPartitionsByExpr(catName, dbName, tblName, expr); + } + + @Override protected List getPartitionsByFilterInternal(String catName, String dbName, + String tblName, String filter, short maxParts, boolean allowSql, boolean allowJdo) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionsByFilterInternal(catName, dbName, tblName, filter, maxParts, allowSql, + allowJdo); + } + + @Override public List getPartitionSpecsByFilterAndProjection(Table table, + GetPartitionsProjectionSpec partitionsProjectSpec, GetPartitionsFilterSpec filterSpec) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionSpecsByFilterAndProjection(table, partitionsProjectSpec, filterSpec); + } + + @Override public List listTableNamesByFilter(String catName, String dbName, String filter, + short maxTables) throws MetaException { + incrementCall(); + return super.listTableNamesByFilter(catName, dbName, filter, maxTables); + } + + @Override public List getTableColumnStatistics(String catName, String dbName, + String tableName, List colNames) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getTableColumnStatistics(catName, dbName, tableName, colNames); + } + + @Override public ColumnStatistics getTableColumnStatistics(String catName, String dbName, + String tableName, List colNames, String engine) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getTableColumnStatistics(catName, dbName, tableName, colNames, engine); + } + + @Override public ColumnStatistics getTableColumnStatistics(String catName, String dbName, + String tableName, List colNames, String engine, String writeIdList) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super + .getTableColumnStatistics(catName, dbName, tableName, colNames, engine, writeIdList); + } + + @Override public List> getPartitionColumnStatistics(String catName, + String dbName, String tableName, List partNames, List colNames) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionColumnStatistics(catName, dbName, tableName, partNames, colNames); + } + + @Override public List getPartitionColumnStatistics(String catName, + String dbName, String tableName, List partNames, List colNames, String engine) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super + .getPartitionColumnStatistics(catName, dbName, tableName, partNames, colNames, engine); + } + + @Override public List getPartitionColumnStatistics(String catName, + String dbName, String tableName, List partNames, List colNames, String engine, + String writeIdList) throws MetaException, NoSuchObjectException { + incrementCall(); + return super + .getPartitionColumnStatistics(catName, dbName, tableName, partNames, colNames, engine, + writeIdList); + } + + @Override public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, + List partNames, List colNames, String engine, String writeIdList) + throws MetaException, NoSuchObjectException { + incrementCall(); + return super + .get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, engine, writeIdList); + } + + @Override public List getPartitionColStatsForDatabase( + String catName, String dbName) throws MetaException, NoSuchObjectException { + incrementCall(); + return super.getPartitionColStatsForDatabase(catName, dbName); + } + + @Override public List getPrimaryKeys(String catName, String db_name, + String tbl_name) throws MetaException { + incrementCall(); + return super.getPrimaryKeys(catName, db_name, tbl_name); + } + + @Override public List getForeignKeys(String catName, String parent_db_name, + String parent_tbl_name, String foreign_db_name, String foreign_tbl_name) + throws MetaException { + incrementCall(); + return super.getForeignKeys(catName, parent_db_name, parent_tbl_name, foreign_db_name, + foreign_tbl_name); + } + + @Override public List getUniqueConstraints(String catName, String db_name, + String tbl_name) throws MetaException { + incrementCall(); + return super.getUniqueConstraints(catName, db_name, tbl_name); + } + + @Override public List getNotNullConstraints(String catName, String db_name, + String tbl_name) throws MetaException { + incrementCall(); + return super.getNotNullConstraints(catName, db_name, tbl_name); + } + + @Override public List getDefaultConstraints(String catName, String db_name, + String tbl_name) throws MetaException { + incrementCall(); + return super.getDefaultConstraints(catName, db_name, tbl_name); + } + + @Override public List getCheckConstraints(String catName, String db_name, + String tbl_name) throws MetaException { + incrementCall(); + return super.getCheckConstraints(catName, db_name, tbl_name); + } + + @Override public List getRuntimeStats(int maxEntries, int maxCreateTime) + throws MetaException { + incrementCall(); + return super.getRuntimeStats(maxEntries, maxCreateTime); + } +} diff --git a/standalone-metastore/DEV-README b/standalone-metastore/DEV-README index 84ed9383bb..ab5df26590 100644 --- a/standalone-metastore/DEV-README +++ b/standalone-metastore/DEV-README @@ -51,8 +51,6 @@ Supported databases for testing: -Dit.test=ITestPostgres -Dit.test=ITestSqlServer -By adding -Dverbose.schematool the Schema Tool output becomes more detailed. - You can download the Oracle driver at http://www.oracle.com/technetwork/database/features/jdbc/index-091264.html You should download Oracle 11g Release 1, ojdbc6.jar diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 1041d925d9..00e48b902f 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -59,7 +59,7 @@ private static boolean inited = false; private static boolean enabled = false; - private static Map encryptionZones = new HashMap<>(); + private static Map encryptionZoneToCmrootMapping = new HashMap<>(); private static HadoopShims hadoopShims = ShimLoader.getHadoopShims(); private static Configuration conf; private String msUser; @@ -156,23 +156,34 @@ private ReplChangeManager(Configuration conf) throws MetaException { cmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMDIR); encryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMENCRYPTEDDIR); fallbackNonEncryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR); + //validate cmRootEncrypted is absolute + Path cmRootEncrypted = new Path(encryptedCmRootDir); + if (cmRootEncrypted.isAbsolute()) { + throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be a relative path"); + } //Create default cm root Path cmroot = new Path(cmRootDir); createCmRoot(cmroot); FileSystem cmRootFs = cmroot.getFileSystem(conf); HdfsEncryptionShim pathEncryptionShim = hadoopShims .createHdfsEncryptionShim(cmRootFs, conf); - Path cmRootEncrypted = new Path(encryptedCmRootDir); - if (cmRootEncrypted.isAbsolute()) { - throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be a relative path"); - } if (pathEncryptionShim.isPathEncrypted(cmroot)) { //If cm root is encrypted we keep using it for the encryption zone String encryptionZonePath = cmRootFs.getUri() + pathEncryptionShim.getEncryptionZoneForPath(cmroot).getPath(); - encryptionZones.put(encryptionZonePath, cmRootDir); + encryptionZoneToCmrootMapping.put(encryptionZonePath, cmRootDir); } else { - encryptionZones.put(NO_ENCRYPTION, cmRootDir); + encryptionZoneToCmrootMapping.put(NO_ENCRYPTION, cmRootDir); + } + if (!StringUtils.isEmpty(fallbackNonEncryptedCmRootDir)) { + Path cmRootFallback = new Path(fallbackNonEncryptedCmRootDir); + if (!cmRootFallback.isAbsolute()) { + throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be absolute path"); + } + if (cmRootFs.exists(cmRootFallback) && pathEncryptionShim.isPathEncrypted(cmRootFallback)) { + throw new MetaException(ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.getHiveName() + + " should not be encrypted"); + } } UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); msUser = usergroupInfo.getShortUserName(); @@ -500,7 +511,7 @@ static void scheduleCMClearer(Configuration conf) { .namingPattern(CM_THREAD_NAME_PREFIX + "%d") .daemon(true) .build()); - executor.scheduleAtFixedRate(new CMClearer(encryptionZones, + executor.scheduleAtFixedRate(new CMClearer(encryptionZoneToCmrootMapping, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); } @@ -553,14 +564,14 @@ static Path getCmRoot(Path path) throws IOException { //at the root of the encryption zone cmrootDir = encryptionZonePath + Path.SEPARATOR + encryptedCmRootDir; } - if (encryptionZones.containsKey(encryptionZonePath)) { - cmroot = new Path(encryptionZones.get(encryptionZonePath)); + if (encryptionZoneToCmrootMapping.containsKey(encryptionZonePath)) { + cmroot = new Path(encryptionZoneToCmrootMapping.get(encryptionZonePath)); } else { cmroot = new Path(cmrootDir); synchronized (instance) { - if (!encryptionZones.containsKey(encryptionZonePath)) { + if (!encryptionZoneToCmrootMapping.containsKey(encryptionZonePath)) { createCmRoot(cmroot); - encryptionZones.put(encryptionZonePath, cmrootDir); + encryptionZoneToCmrootMapping.put(encryptionZonePath, cmrootDir); } } } @@ -569,11 +580,22 @@ static Path getCmRoot(Path path) throws IOException { } private static void createCmRoot(Path cmroot) throws IOException { - FileSystem cmFs = cmroot.getFileSystem(conf); - // Create cmroot with permission 700 if not exist - if (!cmFs.exists(cmroot)) { - cmFs.mkdirs(cmroot); - cmFs.setPermission(cmroot, new FsPermission("700")); + Retry retriable = new Retry(IOException.class) { + @Override + public Void execute() throws IOException { + FileSystem cmFs = cmroot.getFileSystem(conf); + // Create cmroot with permission 700 if not exist + if (!cmFs.exists(cmroot)) { + cmFs.mkdirs(cmroot); + cmFs.setPermission(cmroot, new FsPermission("700")); + } + return null; + } + }; + try { + retriable.run(); + } catch (Exception e) { + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); } } @@ -582,7 +604,7 @@ static void resetReplChangeManagerInstance() { inited = false; enabled = false; instance = null; - encryptionZones.clear(); + encryptionZoneToCmrootMapping.clear(); } public static final PathFilter CMROOT_PATH_FILTER = new PathFilter() { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 2aeb37406a..58b67e888c 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -920,7 +920,7 @@ public static ConfVars getMetaConf(String name) { REPLCMENCRYPTEDDIR("metastore.repl.cm.encryptionzone.rootdir", "hive.repl.cm.encryptionzone.rootdir", ".cmroot", "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."), REPLCMFALLBACKNONENCRYPTEDDIR("metastore.repl.cm.nonencryptionzone.rootdir", - "hive.repl.cm.nonencryptionzone.rootdir", "/user/${system:user.name}/cmroot/", + "hive.repl.cm.nonencryptionzone.rootdir", "", "Root dir for ChangeManager for non encrypted paths if hive.repl.cmrootdir is encrypted."), REPLCMRETIAN("metastore.repl.cm.retain", "hive.repl.cm.retain", 24, TimeUnit.HOURS, "Time to retain removed files in cmrootdir."), diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index ee9f988294..06625de67c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -2840,27 +2840,7 @@ private boolean checkTableDataShouldBeDeleted(Table tbl, boolean deleteData) { */ private void deleteTableData(Path tablePath, boolean ifPurge, boolean shouldEnableCm) { if (tablePath != null) { - try { - if (shouldEnableCm) { - //Don't delete cmdir if its inside the table path - FileStatus[] statuses = tablePath.getFileSystem(conf).listStatus(tablePath, - ReplChangeManager.CMROOT_PATH_FILTER); - for (final FileStatus status : statuses) { - wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm); - } - //Check if table directory is empty, delete it - FileStatus[] statusWithoutFilter = tablePath.getFileSystem(conf).listStatus(tablePath); - if (statusWithoutFilter.length == 0) { - wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm); - } - } else { - //If no cm delete the complete table directory - wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm); - } - } catch (Exception e) { - LOG.error("Failed to delete table directory: " + tablePath + - " " + e.getMessage()); - } + deleteDataExcludeCmroot(tablePath, ifPurge, shouldEnableCm); } } @@ -2895,27 +2875,7 @@ private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { private void deletePartitionData(List partPaths, boolean ifPurge, boolean shouldEnableCm) { if (partPaths != null && !partPaths.isEmpty()) { for (Path partPath : partPaths) { - try { - if (shouldEnableCm) { - //Don't delete cmdir if its inside the partition path - FileStatus[] statuses = partPath.getFileSystem(conf).listStatus(partPath, - ReplChangeManager.CMROOT_PATH_FILTER); - for (final FileStatus status : statuses) { - wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm); - } - //Check if table directory is empty, delete it - FileStatus[] statusWithoutFilter = partPath.getFileSystem(conf).listStatus(partPath); - if (statusWithoutFilter.length == 0) { - wh.deleteDir(partPath, true, ifPurge, shouldEnableCm); - } - } else { - //If no cm delete the complete table directory - wh.deleteDir(partPath, true, ifPurge, shouldEnableCm); - } - } catch (Exception e) { - LOG.error("Failed to delete partition directory: " + partPath + - " " + e.getMessage()); - } + deleteDataExcludeCmroot(partPath, ifPurge, shouldEnableCm); } } } @@ -2942,6 +2902,39 @@ private void deletePartitionData(List partPaths, boolean ifPurge, Database } } + /** + * Delete data from path excluding cmdir + * and for each that fails logs an error. + * + * @param path + * @param ifPurge completely purge the partition (skipping trash) while + * removing data from warehouse + * @param shouldEnableCm If cm should be enabled + */ + private void deleteDataExcludeCmroot(Path path, boolean ifPurge, boolean shouldEnableCm) { + try { + if (shouldEnableCm) { + //Don't delete cmdir if its inside the partition path + FileStatus[] statuses = path.getFileSystem(conf).listStatus(path, + ReplChangeManager.CMROOT_PATH_FILTER); + for (final FileStatus status : statuses) { + wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm); + } + //Check if table directory is empty, delete it + FileStatus[] statusWithoutFilter = path.getFileSystem(conf).listStatus(path); + if (statusWithoutFilter.length == 0) { + wh.deleteDir(path, true, ifPurge, shouldEnableCm); + } + } else { + //If no cm delete the complete table directory + wh.deleteDir(path, true, ifPurge, shouldEnableCm); + } + } catch (Exception e) { + LOG.error("Failed to delete directory: " + path + + " " + e.getMessage()); + } + } + /** * Deletes the partitions specified by catName, dbName, tableName. If checkLocation is true, for * locations of partitions which may not be subdirectories of tablePath checks to make sure the diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 0f94e13148..bae23f773e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -75,20 +75,20 @@ public CompactionTxnHandler() { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); // Check for completed transactions - String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\".\"CTC_PARTITION\" " + - "FROM \"COMPLETED_TXN_COMPONENTS\" TC " + (checkInterval > 0 ? - "LEFT JOIN ( " + - " SELECT \"C1\".* FROM \"COMPLETED_COMPACTIONS\" \"C1\" " + - " INNER JOIN ( " + - " SELECT MAX(\"CC_ID\") \"CC_ID\" FROM \"COMPLETED_COMPACTIONS\" " + - " GROUP BY \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\"" + - " ) \"C2\" " + - " ON \"C1\".\"CC_ID\" = \"C2\".\"CC_ID\" " + - " WHERE \"C1\".\"CC_STATE\" IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" + - ") \"C\" " + - "ON \"TC\".\"CTC_DATABASE\" = \"C\".\"CC_DATABASE\" AND \"TC\".\"CTC_TABLE\" = \"C\".\"CC_TABLE\" " + - " AND (\"TC\".\"CTC_PARTITION\" = \"C\".\"CC_PARTITION\" OR (\"TC\".\"CTC_PARTITION\" IS NULL AND \"C\".\"CC_PARTITION\" IS NULL)) " + - "WHERE \"C\".\"CC_ID\" IS NOT NULL OR " + isWithinCheckInterval("\"TC\".\"CTC_TIMESTAMP\"", checkInterval) : ""); + String s = "select distinct tc.ctc_database, tc.ctc_table, tc.ctc_partition " + + "from COMPLETED_TXN_COMPONENTS tc " + (checkInterval > 0 ? + "left join ( " + + " select c1.* from COMPLETED_COMPACTIONS c1 " + + " inner join ( " + + " select max(cc_id) cc_id from COMPLETED_COMPACTIONS " + + " group by cc_database, cc_table, cc_partition" + + " ) c2 " + + " on c1.cc_id = c2.cc_id " + + " where c1.cc_state IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" + + ") c " + + "on tc.ctc_database = c.cc_database and tc.ctc_table = c.cc_table " + + " and (tc.ctc_partition = c.cc_partition or (tc.ctc_partition is null and c.cc_partition is null)) " + + "where c.cc_id is not null or " + isWithinCheckInterval("tc.ctc_timestamp", checkInterval) : ""); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -102,11 +102,11 @@ public CompactionTxnHandler() { rs.close(); // Check for aborted txns - s = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + - "FROM \"TXNS\", \"TXN_COMPONENTS\" " + - "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " + - "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + - "HAVING COUNT(*) > " + abortedThreshold; + s = "select tc_database, tc_table, tc_partition " + + "from TXNS, TXN_COMPONENTS " + + "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + + "group by tc_database, tc_table, tc_partition " + + "having count(*) > " + abortedThreshold; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -151,8 +151,8 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + - "\"CQ_TYPE\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "'"; + String s = "select cq_id, cq_database, cq_table, cq_partition, " + + "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -171,9 +171,9 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { info.properties = rs.getString(6); // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); - s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = '" + workerId + "', " + - "\"CQ_START\" = " + now + ", \"CQ_STATE\" = '" + WORKING_STATE + "' WHERE \"CQ_ID\" = " + info.id + - " AND \"CQ_STATE\"='" + INITIATED_STATE + "'"; + s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id + + " AND cq_state='" + INITIATED_STATE + "'"; LOG.debug("Going to execute update <" + s + ">"); int updCount = updStmt.executeUpdate(s); if(updCount == 1) { @@ -221,8 +221,8 @@ public void markCompacted(CompactionInfo info) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " + - "\"CQ_WORKER_ID\" = NULL WHERE \"CQ_ID\" = " + info.id; + String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + + "cq_worker_id = null where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); int updCnt = stmt.executeUpdate(s); if (updCnt != 1) { @@ -265,8 +265,8 @@ public void markCompacted(CompactionInfo info) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " - + "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + String s = "select cq_id, cq_database, cq_table, cq_partition, " + + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -333,7 +333,7 @@ public long findMinOpenTxnId() throws MetaException { * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn to move higher) */ private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException { - String s = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""; + String s = "select ntxn_next from NEXT_TXN_ID"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -341,7 +341,7 @@ private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLExceptio "initialized, no record found in next_txn_id"); } long hwm = rs.getLong(1); - s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\""; + s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); rs.next(); @@ -369,10 +369,9 @@ public void markCleaned(CompactionInfo info) throws MetaException { ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " - + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", " - + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\" " - + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, " + + "CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, " + + "CQ_HADOOP_JOB_ID, CQ_ERROR_MESSAGE from COMPACTION_QUEUE WHERE CQ_ID = ?"); pStmt.setLong(1, info.id); rs = pStmt.executeQuery(); if(rs.next()) { @@ -392,11 +391,10 @@ public void markCleaned(CompactionInfo info) throws MetaException { LOG.debug("Going to rollback"); dbConn.rollback(); } - pStmt = dbConn.prepareStatement("INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", " - + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", " - + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", " - + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\")" - + " VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, " + + "CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, " + + "CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID, CC_ERROR_MESSAGE) " + + "VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); info.state = SUCCEEDED_STATE; CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); updCount = pStmt.executeUpdate(); @@ -404,13 +402,13 @@ public void markCleaned(CompactionInfo info) throws MetaException { // Remove entries from completed_txn_components as well, so we don't start looking there // again but only up to the highest write ID include in this compaction job. //highestWriteId will be NULL in upgrade scenarios - s = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\" = ? AND " + - "\"CTC_TABLE\" = ?"; + s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " + + "ctc_table = ?"; if (info.partName != null) { - s += " AND \"CTC_PARTITION\" = ?"; + s += " and ctc_partition = ?"; } if(info.highestWriteId != 0) { - s += " AND \"CTC_WRITEID\" <= ?"; + s += " and ctc_writeid <= ?"; } pStmt = dbConn.prepareStatement(s); int paramCount = 1; @@ -433,10 +431,10 @@ public void markCleaned(CompactionInfo info) throws MetaException { * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns). * See {@link ql.txn.compactor.Cleaner.removeFiles()} */ - s = "SELECT DISTINCT \"TXN_ID\" FROM \"TXNS\", \"TXN_COMPONENTS\" WHERE \"TXN_ID\" = \"TC_TXNID\" " - + "AND \"TXN_STATE\" = '" + TXN_ABORTED + "' AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?"; - if (info.highestWriteId != 0) s += " AND \"TC_WRITEID\" <= ?"; - if (info.partName != null) s += " AND \"TC_PARTITION\" = ?"; + s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + + TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; + if (info.highestWriteId != 0) s += " and tc_writeid <= ?"; + if (info.partName != null) s += " and tc_partition = ?"; pStmt = dbConn.prepareStatement(s); paramCount = 1; @@ -466,18 +464,18 @@ public void markCleaned(CompactionInfo info) throws MetaException { StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("DELETE FROM \"TXN_COMPONENTS\" WHERE "); + prefix.append("delete from TXN_COMPONENTS where "); //because 1 txn may include different partitions/tables even in auto commit mode - suffix.append(" AND \"TC_DATABASE\" = ?"); - suffix.append(" AND \"TC_TABLE\" = ?"); + suffix.append(" and tc_database = ?"); + suffix.append(" and tc_table = ?"); if (info.partName != null) { - suffix.append(" AND \"TC_PARTITION\" = ?"); + suffix.append(" and tc_partition = ?"); } // Populate the complete query with provided prefix and suffix List counts = TxnUtils - .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "\"TC_TXNID\"", + .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid", true, false); int totalCount = 0; for (int i = 0; i < queries.size(); i++) { @@ -546,7 +544,7 @@ public void cleanTxnToWriteIdTable() throws MetaException { // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid). - String s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED); + String s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (rs.next()) { @@ -557,7 +555,7 @@ public void cleanTxnToWriteIdTable() throws MetaException { } // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. - s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommittedTxnId; + s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId; LOG.debug("Going to execute delete <" + s + ">"); int rc = stmt.executeUpdate(s); LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId); @@ -596,9 +594,9 @@ public void cleanEmptyAbortedTxns() throws MetaException { //after that, so READ COMMITTED is sufficient. dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + - "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + - "\"TXN_STATE\" = '" + TXN_ABORTED + "'"; + String s = "select txn_id from TXNS where " + + "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + + "txn_state = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); List txnids = new ArrayList<>(); @@ -614,10 +612,10 @@ public void cleanEmptyAbortedTxns() throws MetaException { StringBuilder suffix = new StringBuilder(); // Delete from TXNS. - prefix.append("DELETE FROM \"TXNS\" WHERE "); + prefix.append("delete from TXNS where "); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -660,8 +658,8 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL, \"CQ_STATE\" = '" - + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' AND \"CQ_WORKER_ID\" LIKE '" + String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" + hostname + "%'"; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died @@ -705,8 +703,8 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); long latestValidStart = getDbTime(dbConn) - timeout; stmt = dbConn.createStatement(); - String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL, \"CQ_STATE\" = '" - + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' AND \"CQ_START\" < " + String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " + latestValidStart; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died @@ -802,9 +800,9 @@ public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String sqlText = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HIGHEST_WRITE_ID\" = " + - ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) + - " WHERE \"CQ_ID\" = " + ci.id; + String sqlText = "UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + + ci.highestWriteId + ", cq_run_as = " + quoteString(ci.runAs) + + " WHERE CQ_ID = " + ci.id; if(LOG.isDebugEnabled()) { LOG.debug("About to execute: " + sqlText); } @@ -820,13 +818,13 @@ public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws * a new write id (so as not to invalidate result set caches/materialized views) but * we need to set it to something to that markCleaned() only cleans TXN_COMPONENTS up to * the level to which aborted files/data has been cleaned.*/ - sqlText = "INSERT INTO \"TXN_COMPONENTS\"(" + - "\"TC_TXNID\", " + - "\"TC_DATABASE\", " + - "\"TC_TABLE\", " + - (ci.partName == null ? "" : "\"TC_PARTITION\", ") + - "\"TC_WRITEID\", " + - "\"TC_OPERATION_TYPE\")" + + sqlText = "insert into TXN_COMPONENTS(" + + "TC_TXNID, " + + "TC_DATABASE, " + + "TC_TABLE, " + + (ci.partName == null ? "" : "TC_PARTITION, ") + + "TC_WRITEID, " + + "TC_OPERATION_TYPE)" + " VALUES(" + compactionTxnId + "," + quoteString(ci.dbname) + "," + @@ -909,8 +907,8 @@ public void purgeCompactionHistory() throws MetaException { stmt = dbConn.createStatement(); /*cc_id is monotonically increasing so for any entity sorts in order of compaction history, thus this query groups by entity and withing group sorts most recent first*/ - rs = stmt.executeQuery("SELECT \"CC_ID\", \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\" " - + "FROM \"COMPLETED_COMPACTIONS\" ORDER BY \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_ID\" DESC"); + rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " + + "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc"); String lastCompactedEntity = null; /*In each group, walk from most recent and count occurences of each state type. Once you * have counted enough (for each state) to satisfy retention policy, delete all other @@ -936,15 +934,14 @@ public void purgeCompactionHistory() throws MetaException { StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("DELETE FROM \"COMPLETED_COMPACTIONS\" WHERE "); + prefix.append("delete from COMPLETED_COMPACTIONS where "); suffix.append(""); List questions = new ArrayList<>(deleteSet.size()); for (int i = 0; i < deleteSet.size(); i++) { questions.add("?"); } - List counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, - "\"CC_ID\"", false, false); + List counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false); int totalCount = 0; for (int i = 0; i < queries.size(); i++) { String query = queries.get(i); @@ -1006,11 +1003,11 @@ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - pStmt = dbConn.prepareStatement("SELECT \"CC_STATE\" FROM \"COMPLETED_COMPACTIONS\" WHERE " + - "\"CC_DATABASE\" = ? AND " + - "\"CC_TABLE\" = ? " + - (ci.partName != null ? "AND \"CC_PARTITION\" = ?" : "") + - " AND \"CC_STATE\" != " + quoteChar(ATTEMPTED_STATE) + " ORDER BY \"CC_ID\" DESC"); + pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " + + "CC_DATABASE = ? and " + + "CC_TABLE = ? " + + (ci.partName != null ? "and CC_PARTITION = ?" : "") + + " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); pStmt.setString(1, ci.dbname); pStmt.setString(2, ci.tableName); if (ci.partName != null) { @@ -1065,15 +1062,14 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " - + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", " - + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\" " - + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, " + + "CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, " + + "CQ_HADOOP_JOB_ID, CQ_ERROR_MESSAGE from COMPACTION_QUEUE WHERE CQ_ID = ?"); pStmt.setLong(1, ci.id); rs = pStmt.executeQuery(); if(rs.next()) { ci = CompactionInfo.loadFullFromCompactionQueue(rs); - String s = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"; + String s = "delete from COMPACTION_QUEUE where cq_id = ?"; pStmt = dbConn.prepareStatement(s); pStmt.setLong(1, ci.id); LOG.debug("Going to execute update <" + s + ">"); @@ -1102,10 +1098,9 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho close(rs, stmt, null); closeStmt(pStmt); - pStmt = dbConn.prepareStatement("INSERT INTO \"COMPLETED_COMPACTIONS\" " - + "(\"CC_ID\", \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", " - + "\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", \"CC_START\", \"CC_END\", \"CC_RUN_AS\", " - + "\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", \"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\") " + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, " + + "CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, " + + "CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID, CC_ERROR_MESSAGE) " + "VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); if (errorMessage != null) { ci.errorMessage = errorMessage; @@ -1143,8 +1138,7 @@ public void setHadoopJobId(String hadoopJobId, long id) { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HADOOP_JOB_ID\" = " + quoteString(hadoopJobId) - + " WHERE \"CQ_ID\" = " + id; + String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id; LOG.debug("Going to execute <" + s + ">"); int updateCount = stmt.executeUpdate(s); LOG.debug("Going to commit"); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java index a6d22d19ef..3f82891ef6 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java @@ -61,10 +61,6 @@ private boolean verbose; - public DatabaseRule() { - verbose = System.getProperty("verbose.schematool") != null; - } - public DatabaseRule setVerbose(boolean verbose) { this.verbose = verbose; return this;