diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 043c97669a..bb59dce705 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -35,8 +35,6 @@ import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; @@ -203,61 +201,6 @@ private Context backupContext = null; private boolean retrial = false; - private enum DriverState { - INITIALIZED, - COMPILING, - COMPILED, - EXECUTING, - EXECUTED, - // a state that the driver enters after close() has been called to clean the query results - // and release the resources after the query has been executed - CLOSED, - // a state that the driver enters after destroy() is called and it is the end of driver life cycle - DESTROYED, - ERROR - } - - public static class LockedDriverState { - // a lock is used for synchronizing the state transition and its associated - // resource releases - public final ReentrantLock stateLock = new ReentrantLock(); - public DriverState driverState = DriverState.INITIALIZED; - public AtomicBoolean aborted = new AtomicBoolean(); - private static ThreadLocal lds = new ThreadLocal() { - @Override - protected LockedDriverState initialValue() { - return new LockedDriverState(); - } - }; - - public static void setLockedDriverState(LockedDriverState lDrv) { - lds.set(lDrv); - } - - public static LockedDriverState getLockedDriverState() { - return lds.get(); - } - - public static void removeLockedDriverState() { - if (lds != null) { - lds.remove(); - } - } - - public boolean isAborted() { - return aborted.get(); - } - - public void abort() { - aborted.set(true); - } - - @Override - public String toString() { - return String.format("%s(aborted:%s)", driverState, aborted.get()); - } - } - private boolean checkConcurrency() { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { @@ -416,11 +359,11 @@ public int compile(String command, boolean resetTaskIds) { void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); - lDrvState.stateLock.lock(); + lDrvState.lock(); try { - lDrvState.driverState = DriverState.COMPILING; + lDrvState.compiling(); } finally { - lDrvState.stateLock.unlock(); + lDrvState.unlock(); } command = new VariableSubstitution(new HiveVariableSource() { @@ -695,15 +638,15 @@ public void run() { if (isInterrupted && !deferClose) { closeInProcess(true); } - lDrvState.stateLock.lock(); + lDrvState.lock(); try { if (isInterrupted) { - lDrvState.driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR; + lDrvState.compilationInterrupted(deferClose); } else { - lDrvState.driverState = compileError ? DriverState.ERROR : DriverState.COMPILED; + lDrvState.compilationFinished(compileError); } } finally { - lDrvState.stateLock.unlock(); + lDrvState.unlock(); } if (isInterrupted) { @@ -1462,21 +1405,21 @@ private void runInternal(String command, boolean alreadyCompiled) throws Command downstreamError = null; LockedDriverState.setLockedDriverState(lDrvState); - lDrvState.stateLock.lock(); + lDrvState.lock(); try { if (alreadyCompiled) { - if (lDrvState.driverState == DriverState.COMPILED) { - lDrvState.driverState = DriverState.EXECUTING; + if (lDrvState.isCompiled()) { + lDrvState.executing(); } else { errorMessage = "FAILED: Precompiled query has been cancelled or closed."; console.printError(errorMessage); throw createProcessorResponse(12); } } else { - lDrvState.driverState = DriverState.COMPILING; + lDrvState.compiling(); } } finally { - lDrvState.stateLock.unlock(); + lDrvState.unlock(); } // a flag that helps to set the correct driver state in finally block by tracking if @@ -1610,11 +1553,11 @@ else if(plan.getOperation() == HiveOperation.ROLLBACK) { releaseResources(); } - lDrvState.stateLock.lock(); + lDrvState.lock(); try { - lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED; + lDrvState.executionFinished(isFinishedWithError); } finally { - lDrvState.stateLock.unlock(); + lDrvState.unlock(); } } } @@ -1790,21 +1733,20 @@ private void execute() throws CommandProcessorResponse { // hide sensitive information during query redaction. String queryStr = conf.getQueryString(); - lDrvState.stateLock.lock(); + lDrvState.lock(); try { // if query is not in compiled state, or executing state which is carried over from // a combined compile/execute in runInternal, throws the error - if (lDrvState.driverState != DriverState.COMPILED && - lDrvState.driverState != DriverState.EXECUTING) { + if (lDrvState.isCompiled() && lDrvState.isExecuting()) { SQLState = "HY008"; errorMessage = "FAILED: unexpected driverstate: " + lDrvState + ", for query " + queryStr; console.printError(errorMessage); throw createProcessorResponse(1000); } else { - lDrvState.driverState = DriverState.EXECUTING; + lDrvState.executing(); } } finally { - lDrvState.stateLock.unlock(); + lDrvState.unlock(); } maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER); @@ -2093,11 +2035,11 @@ private void execute() throws CommandProcessorResponse { if (ss != null) { ss.onQueryCompletion(queryId); } - lDrvState.stateLock.lock(); + lDrvState.lock(); try { - lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; + lDrvState.executionFinished(executionError); } finally { - lDrvState.stateLock.unlock(); + lDrvState.unlock(); } if (lDrvState.isAborted()) { LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); @@ -2117,7 +2059,7 @@ private long addWithOverflowCheck(long val1, long val2) { private void releasePlan(QueryPlan plan) { // Plan maybe null if Driver.close is called in another thread for the same Driver object - lDrvState.stateLock.lock(); + lDrvState.lock(); try { if (plan != null) { plan.setDone(); @@ -2131,7 +2073,7 @@ private void releasePlan(QueryPlan plan) { } } } finally { - lDrvState.stateLock.unlock(); + lDrvState.unlock(); } } @@ -2256,7 +2198,7 @@ public boolean isFetchingTable() { @SuppressWarnings("unchecked") @Override public boolean getResults(List res) throws IOException { - if (lDrvState.driverState == DriverState.DESTROYED || lDrvState.driverState == DriverState.CLOSED) { + if (lDrvState.isDestroyed() || lDrvState.isClosed()) { throw new IOException("FAILED: query has been cancelled, closed, or destroyed."); } @@ -2321,7 +2263,7 @@ public boolean getResults(List res) throws IOException { @Override public void resetFetch() throws IOException { - if (lDrvState.driverState == DriverState.DESTROYED || lDrvState.driverState == DriverState.CLOSED) { + if (lDrvState.isDestroyed() || lDrvState.isClosed()) { throw new IOException("FAILED: driver has been cancelled, closed or destroyed."); } if (isFetchingTable()) { @@ -2341,7 +2283,7 @@ public void resetFetch() throws IOException { // DriverContext could be released in the query and close processes at same // time, which needs to be thread protected. private void releaseDriverContext() { - lDrvState.stateLock.lock(); + lDrvState.lock(); try { if (driverCxt != null) { driverCxt.shutdown(); @@ -2350,7 +2292,7 @@ private void releaseDriverContext() { } catch (Exception e) { LOG.debug("Exception while shutting down the task runner", e); } finally { - lDrvState.stateLock.unlock(); + lDrvState.unlock(); } } @@ -2467,11 +2409,10 @@ private int closeInProcess(boolean destroyed) { // is called to stop the query if it is running, clean query results, and release resources. @Override public void close() { - lDrvState.stateLock.lock(); + lDrvState.lock(); try { releaseDriverContext(); - if (lDrvState.driverState == DriverState.COMPILING || - lDrvState.driverState == DriverState.EXECUTING) { + if (lDrvState.isCompiling() || lDrvState.isExecuting()) { lDrvState.abort(); } releasePlan(); @@ -2479,9 +2420,9 @@ public void close() { releaseCachedResult(); releaseFetchTask(); releaseResStream(); - lDrvState.driverState = DriverState.CLOSED; + lDrvState.closed(); } finally { - lDrvState.stateLock.unlock(); + lDrvState.unlock(); LockedDriverState.removeLockedDriverState(); } destroy(); @@ -2491,17 +2432,17 @@ public void close() { // do not understand why it is needed and wonder if it could be combined with close. @Override public void destroy() { - lDrvState.stateLock.lock(); + lDrvState.lock(); try { // in the cancel case where the driver state is INTERRUPTED, destroy will be deferred to // the query process - if (lDrvState.driverState == DriverState.DESTROYED) { + if (lDrvState.isDestroyed()) { return; } else { - lDrvState.driverState = DriverState.DESTROYED; + lDrvState.descroyed(); } } finally { - lDrvState.stateLock.unlock(); + lDrvState.unlock(); } if (!hiveLocks.isEmpty()) { try { diff --git ql/src/java/org/apache/hadoop/hive/ql/LockedDriverState.java ql/src/java/org/apache/hadoop/hive/ql/LockedDriverState.java new file mode 100644 index 0000000000..cee3708e30 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/LockedDriverState.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Represents the driver's state. Also has mechanism for locking for the time of state transitions. + */ +public class LockedDriverState { + private static ThreadLocal tlInstance = new ThreadLocal() { + @Override + protected LockedDriverState initialValue() { + return new LockedDriverState(); + } + }; + + public static void setLockedDriverState(LockedDriverState state) { + tlInstance.set(state); + } + + public static LockedDriverState getLockedDriverState() { + return tlInstance.get(); + } + + public static void removeLockedDriverState() { + tlInstance.remove(); + } + + /** + * Enumeration of the potential driver states. + */ + private enum DriverState { + INITIALIZED, + COMPILING, + COMPILED, + EXECUTING, + EXECUTED, + // a state that the driver enters after close() has been called to clean the query results + // and release the resources after the query has been executed + CLOSED, + // a state that the driver enters after destroy() is called and it is the end of driver life cycle + DESTROYED, + ERROR + } + + // a lock is used for synchronizing the state transition and its associated resource releases + private final ReentrantLock stateLock = new ReentrantLock(); + private final AtomicBoolean aborted = new AtomicBoolean(); + private DriverState driverState = DriverState.INITIALIZED; + + public void lock() { + stateLock.lock(); + } + + public void unlock() { + stateLock.unlock(); + } + + public boolean isAborted() { + return aborted.get(); + } + + public void abort() { + aborted.set(true); + } + + public void compiling() { + driverState = DriverState.COMPILING; + } + + public boolean isCompiling() { + return driverState == DriverState.COMPILING; + } + + public void compilationInterrupted(boolean deferClose) { + driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR; + } + + public void compilationFinished(boolean wasError) { + driverState = wasError ? DriverState.ERROR : DriverState.COMPILED; + } + + public boolean isCompiled() { + return driverState == DriverState.COMPILED; + } + + public void executing() { + driverState = DriverState.EXECUTING; + } + + public boolean isExecuting() { + return driverState == DriverState.EXECUTING; + } + + public void executionFinished(boolean wasError) { + driverState = wasError ? DriverState.ERROR : DriverState.EXECUTED; + } + + public boolean isExecuted() { + return driverState == DriverState.EXECUTED; + } + + public void closed() { + driverState = DriverState.CLOSED; + } + + public boolean isClosed() { + return driverState == DriverState.CLOSED; + } + + public void descroyed() { + driverState = DriverState.DESTROYED; + } + + public boolean isDestroyed() { + return driverState == DriverState.DESTROYED; + } + + public void error() { + driverState = DriverState.ERROR; + } + + public boolean isError() { + return driverState == DriverState.ERROR; + } + + @Override + public String toString() { + return String.format("%s(aborted:%s)", driverState, aborted.get()); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index d59ca8c564..4e74a19046 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -109,8 +109,8 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.LockedDriverState; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 1844ce05a8..4c799993b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -43,7 +43,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.ql.Driver.LockedDriverState; +import org.apache.hadoop.hive.ql.LockedDriverState; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 5ced5c5a75..28f3dac451 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -26,9 +26,9 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.metastore.api.*; -import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ddl.table.lock.ShowLocksOperation; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.LockedDriverState; import org.apache.thrift.TException; import java.io.ByteArrayOutputStream; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 17a2d20a00..9c88578459 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -28,8 +28,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.LockedDriverState; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java index 646b72e8cf..e7403f8d92 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java @@ -21,7 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Driver.LockedDriverState; +import org.apache.hadoop.hive.ql.LockedDriverState; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.metadata.*; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java index 171356c32f..b46a193847 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java @@ -19,7 +19,8 @@ package org.apache.hadoop.hive.ql.lockmgr; import java.util.List; -import org.apache.hadoop.hive.ql.Driver.LockedDriverState; + +import org.apache.hadoop.hive.ql.LockedDriverState; /** * Manager for locks in Hive. Users should not instantiate a lock manager diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 3b795bc3c2..3b1ddc43c8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver.LockedDriverState; +import org.apache.hadoop.hive.ql.LockedDriverState; import org.apache.hadoop.hive.ql.ddl.database.lock.LockDatabaseDesc; import org.apache.hadoop.hive.ql.ddl.database.unlock.UnlockDatabaseDesc; import org.apache.hadoop.hive.ql.ddl.table.lock.LockTableDesc; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java index d68f1401c9..b68b3d60fe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ddl.database.lock.LockDatabaseDesc; import org.apache.hadoop.hive.ql.ddl.database.unlock.UnlockDatabaseDesc; import org.apache.hadoop.hive.ql.ddl.table.lock.LockTableDesc; @@ -35,6 +34,7 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.LockedDriverState; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 64f6c27846..b64dff02bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -24,8 +24,8 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.LockedDriverState; import org.apache.hadoop.hive.ql.lockmgr.*; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.metadata.*; @@ -196,11 +196,11 @@ public int compare(HiveLockObj o1, HiveLockObj o2) { HiveLock lock = null; boolean isInterrupted = false; if (lDrvState != null) { - lDrvState.stateLock.lock(); + lDrvState.lock(); if (lDrvState.isAborted()) { isInterrupted = true; } - lDrvState.stateLock.unlock(); + lDrvState.unlock(); } if (!isInterrupted) { try { diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java index 8f7505dae0..b337ddbfc8 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.Driver.LockedDriverState; +import org.apache.hadoop.hive.ql.LockedDriverState; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity;