diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 8b4b21f..ee2acc1 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.util.ArrayList; import java.util.List; @@ -28,14 +30,20 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.After; @@ -704,6 +712,54 @@ public void testBucketedAcidInsertWithRemoveUnion() throws Exception { Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); } } + + @Test + public void testGetSplitsLocks() throws Exception { + // Need to test this with LLAP settings, which requires some additional configurations set. + HiveConf modConf = new HiveConf(hiveConf); + setupTez(modConf); + modConf.setVar(ConfVars.HIVE_EXECUTION_ENGINE, "tez"); + modConf.setVar(ConfVars.HIVEFETCHTASKCONVERSION, "more"); + modConf.setVar(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS, "localhost"); + + // SessionState/Driver needs to be restarted with the Tez conf settings. + restartSessionAndDriver(modConf); + TxnStore txnHandler = TxnUtils.getTxnStore(modConf); + + try { + String queryParam = "select * from " + Table.ACIDTBL; + runStatementOnDriver("select get_splits(\"" + queryParam + "\", 1)"); + + // The get_splits call should have resulted in a lock on ACIDTBL + ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest()); + TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks()); + assertEquals(1, slr.getLocksSize()); + } finally { + // Close the session which should free up the TxnHandler/locks held by the session. + // Done in the finally block to make sure we free up the locks; otherwise + // the cleanup in tearDown() will get stuck waiting on the lock held here on ACIDTBL. + restartSessionAndDriver(hiveConf); + } + + // Lock should be freed up now. + ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest()); + assertEquals(0, slr.getLocksSize()); + } + + private void restartSessionAndDriver(HiveConf conf) throws Exception { + SessionState ss = SessionState.get(); + if (ss != null) { + ss.close(); + } + if (d != null) { + d.destroy(); + d.close(); + } + + SessionState.start(conf); + d = new Driver(conf); + } + // Ideally test like this should be a qfile test. However, the explain output from qfile is always // slightly different depending on where the test is run, specifically due to file size estimation private void testJoin(String engine, String joinType) throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index ec228b4..f01edf8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -175,6 +175,19 @@ private QueryLifeTimeHookRunner queryLifeTimeHookRunner; private final HooksLoader hooksLoader; + // Transaction manager the Driver has been initialized with (can be null). + // If this is set then this Transaction manager will be used during query + // compilation/execution rather than using the current session's transaction manager. + // This might be needed in a situation where a Driver is nested within an already + // running Driver/query - the nested Driver requires a separate transaction manager + // so as not to conflict with the outer Driver/query which is using the session + // transaction manager. + private final HiveTxnManager initTxnMgr; + + // Transaction manager used for the query. This will be set at compile time based on + // either initTxnMgr or from the SessionState, in that order. + private HiveTxnManager queryTxnMgr; + public enum DriverState { INITIALIZED, COMPILING, @@ -352,8 +365,12 @@ public Driver(HiveConf conf) { this(getNewQueryState(conf), null); } + public Driver(HiveConf conf, HiveTxnManager txnMgr) { + this(getNewQueryState(conf), null, null, txnMgr); + } + public Driver(HiveConf conf, Context ctx) { - this(getNewQueryState(conf), null); + this(getNewQueryState(conf), null, null); this.ctx = ctx; } @@ -362,18 +379,22 @@ public Driver(HiveConf conf, String userName) { } public Driver(QueryState queryState, String userName) { - this(queryState, userName, new HooksLoader(queryState.getConf()), null); + this(queryState, userName, new HooksLoader(queryState.getConf()), null, null); } public Driver(HiveConf conf, HooksLoader hooksLoader) { - this(getNewQueryState(conf), null, hooksLoader, null); + this(getNewQueryState(conf), null, hooksLoader, null, null); } public Driver(QueryState queryState, String userName, QueryInfo queryInfo) { - this(queryState, userName, new HooksLoader(queryState.getConf()), queryInfo); + this(queryState, userName, new HooksLoader(queryState.getConf()), queryInfo, null); + } + + public Driver(QueryState queryState, String userName, QueryInfo queryInfo, HiveTxnManager txnMgr) { + this(queryState, userName, new HooksLoader(queryState.getConf()), queryInfo, txnMgr); } - public Driver(QueryState queryState, String userName, HooksLoader hooksLoader, QueryInfo queryInfo) { + public Driver(QueryState queryState, String userName, HooksLoader hooksLoader, QueryInfo queryInfo, HiveTxnManager txnMgr) { this.queryState = queryState; this.conf = queryState.getConf(); isParallelEnabled = (conf != null) @@ -382,6 +403,7 @@ public Driver(QueryState queryState, String userName, HooksLoader hooksLoader, Q this.hooksLoader = hooksLoader; this.queryLifeTimeHookRunner = new QueryLifeTimeHookRunner(conf, hooksLoader, console); this.queryInfo = queryInfo; + this.initTxnMgr = txnMgr; } /** @@ -477,16 +499,22 @@ private int compile(String command, boolean resetTaskIds, boolean deferClose) { try { // Initialize the transaction manager. This must be done before analyze is called. - final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf); - // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks + if (initTxnMgr != null) { + queryTxnMgr = initTxnMgr; + } else { + queryTxnMgr = SessionState.get().initTxnMgr(conf); + } + queryState.setTxnManager(queryTxnMgr); + // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks // if compile is being called multiple times, clear the old shutdownhook ShutdownHookManager.removeShutdownHook(shutdownRunner); + final HiveTxnManager txnMgr = queryTxnMgr; shutdownRunner = new Runnable() { @Override public void run() { try { - releaseLocksAndCommitOrRollback(false, txnManager); + releaseLocksAndCommitOrRollback(false, txnMgr); } catch (LockException e) { LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " + e.getMessage()); @@ -535,13 +563,13 @@ public void run() { // because at that point we need access to the objects. Hive.get().getMSC().flushCache(); - if(checkConcurrency() && startImplicitTxn(txnManager)) { + if(checkConcurrency() && startImplicitTxn(queryTxnMgr)) { String userFromUGI = getUserFromUGI(); - if (!txnManager.isTxnOpen()) { + if (!queryTxnMgr.isTxnOpen()) { if(userFromUGI == null) { return 10; } - long txnid = txnManager.openTxn(ctx, userFromUGI); + long txnid = queryTxnMgr.openTxn(ctx, userFromUGI); } } // Do semantic analysis and plan generation @@ -1133,13 +1161,12 @@ public FetchTask getFetchTask() { // Write the current set of valid transactions into the conf file so that it can be read by // the input format. - private void recordValidTxns() throws LockException { + private void recordValidTxns(HiveTxnManager txnMgr) throws LockException { ValidTxnList oldList = null; String s = conf.get(ValidTxnList.VALID_TXNS_KEY); if(s != null && s.length() > 0) { oldList = new ValidReadTxnList(s); } - HiveTxnManager txnMgr = SessionState.get().getTxnMgr(); ValidTxnList txns = txnMgr.getValidTxns(); if(oldList != null) { throw new IllegalStateException("calling recordValidTxn() more than once in the same " + @@ -1172,6 +1199,7 @@ private String getUserFromUGI() { } return null; } + /** * Acquire read and write locks needed by the statement. The list of objects to be locked are * obtained from the inputs and outputs populated by the compiler. Locking strategy depends on @@ -1184,9 +1212,7 @@ private int acquireLocks() { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); - SessionState ss = SessionState.get(); - HiveTxnManager txnMgr = ss.getTxnMgr(); - if(!txnMgr.isTxnOpen() && txnMgr.supportsAcid()) { + if(!queryTxnMgr.isTxnOpen() && queryTxnMgr.supportsAcid()) { /*non acid txn managers don't support txns but fwd lock requests to lock managers acid txn manager requires all locks to be associated with a txn so if we end up here w/o an open txn it's because we are processing something like "use @@ -1205,17 +1231,17 @@ private int acquireLocks() { //so this makes (file name -> data) mapping stable acidSinks.sort((FileSinkDesc fsd1, FileSinkDesc fsd2) -> fsd1.getDirName().compareTo(fsd2.getDirName())); for (FileSinkDesc desc : acidSinks) { - desc.setTransactionId(txnMgr.getCurrentTxnId()); + desc.setTransactionId(queryTxnMgr.getCurrentTxnId()); //it's possible to have > 1 FileSink writing to the same table/partition //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes - desc.setStatementId(txnMgr.getWriteIdAndIncrement()); + desc.setStatementId(queryTxnMgr.getWriteIdAndIncrement()); } } /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ - txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); - if(txnMgr.recordSnapshot(plan)) { - recordValidTxns(); + queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); + if(queryTxnMgr.recordSnapshot(plan)) { + recordValidTxns(queryTxnMgr); } return 0; } catch (Exception e) { @@ -1233,6 +1259,11 @@ private int acquireLocks() { private boolean haveAcidWrite() { return !plan.getAcidSinks().isEmpty(); } + + public void releaseLocksAndCommitOrRollback(boolean commit) throws LockException { + releaseLocksAndCommitOrRollback(commit, queryTxnMgr); + } + /** * @param commit if there is an open transaction and if true, commit, * if false rollback. If there is no open transaction this parameter is ignored. @@ -1246,8 +1277,8 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS); HiveTxnManager txnMgr; if (txnManager == null) { - SessionState ss = SessionState.get(); - txnMgr = ss.getTxnMgr(); + // Default to driver's txn manager if no txn manager specified + txnMgr = queryTxnMgr; } else { txnMgr = txnManager; } @@ -1366,6 +1397,23 @@ public CommandProcessorResponse compileAndRespond(String command) { return createProcessorResponse(compileInternal(command, false)); } + public CommandProcessorResponse lockAndRespond() { + // Assumes the query has already been compiled + if (plan == null) { + throw new IllegalStateException( + "No previously compiled query for driver - queryId=" + queryState.getQueryId()); + } + + int ret = 0; + if (requiresLock()) { + ret = acquireLocks(); + } + if (ret != 0) { + return rollback(createProcessorResponse(ret)); + } + return createProcessorResponse(ret); + } + private static final ReentrantLock globalCompileLock = new ReentrantLock(); private int compileInternal(String command, boolean deferClose) { int ret; @@ -1396,7 +1444,7 @@ private int compileInternal(String command, boolean deferClose) { if (ret != 0) { try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -1536,8 +1584,7 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - HiveTxnManager txnManager = SessionState.get().getTxnMgr(); - ctx.setHiveTxnManager(txnManager); + ctx.setHiveTxnManager(queryTxnMgr); if (requiresLock()) { // a checkpoint to see if the thread is interrupted or not before an expensive operation @@ -1559,11 +1606,11 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp //if needRequireLock is false, the release here will do nothing because there is no lock try { //since set autocommit starts an implicit txn, close it - if(txnManager.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) { - releaseLocksAndCommitOrRollback(true, null); + if(queryTxnMgr.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) { + releaseLocksAndCommitOrRollback(true); } else if(plan.getOperation() == HiveOperation.ROLLBACK) { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } else { //txn (if there is one started) is not finished @@ -1614,7 +1661,7 @@ else if(plan.getOperation() == HiveOperation.ROLLBACK) { private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { //console.printError(cpr.toString()); try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.error("rollback() FAILED: " + cpr);//make sure not to loose @@ -2373,7 +2420,7 @@ private int closeInProcess(boolean destroyed) { if(destroyed) { if (!hiveLocks.isEmpty()) { try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception when releasing locking in destroy: " + e.getMessage()); @@ -2428,7 +2475,7 @@ public void destroy() { } if (!hiveLocks.isEmpty()) { try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception when releasing locking in destroy: " + e.getMessage()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index fa7c323..7d5aa8b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.HiveOperation; /** @@ -39,6 +40,11 @@ private HiveOperation commandType; /** + * transaction manager used in the query. + */ + private HiveTxnManager txnManager; + + /** * Private constructor, use QueryState.Builder instead * @param conf The query specific configuration object */ @@ -73,6 +79,14 @@ public HiveConf getConf() { return queryConf; } + public HiveTxnManager getTxnManager() { + return txnManager; + } + + public void setTxnManager(HiveTxnManager txnManager) { + this.txnManager = txnManager; + } + /** * Builder to instantiate the QueryState object. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 3ad30c4..06e00d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; @@ -78,6 +79,7 @@ import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -123,6 +125,8 @@ // whether any ACID table is involved in a query protected boolean acidInQuery; + protected HiveTxnManager txnManager; + public static final int HIVE_COLUMN_ORDER_ASC = 1; public static final int HIVE_COLUMN_ORDER_DESC = 0; public static final int HIVE_COLUMN_NULLS_FIRST = 0; @@ -160,7 +164,6 @@ void setAutoCommitValue(Boolean autoCommit) { autoCommitValue = autoCommit; } - public boolean skipAuthorization() { return false; } @@ -230,6 +233,7 @@ public BaseSemanticAnalyzer(QueryState queryState, Hive db) throws SemanticExcep idToTableNameMap = new HashMap(); inputs = new LinkedHashSet(); outputs = new LinkedHashSet(); + txnManager = queryState.getTxnManager(); } catch (Exception e) { throw new SemanticException(e); } @@ -1918,4 +1922,11 @@ protected FetchTask createFetchTask(String schema) { fetch.setSerializationNullFormat(" "); return (FetchTask) TaskFactory.get(fetch, conf); } + + protected HiveTxnManager getTxnMgr() { + if (txnManager != null) { + return txnManager; + } + return SessionState.get().getTxnMgr(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 4814fcd..5b19b7d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -13656,7 +13656,7 @@ private boolean isAcidOutputFormat(Class of) { } private AcidUtils.Operation getAcidType(Class of, String dest) { - if (SessionState.get() == null || !SessionState.get().getTxnMgr().supportsAcid()) { + if (SessionState.get() == null || !getTxnMgr().supportsAcid()) { return AcidUtils.Operation.NOT_ACID; } else if (isAcidOutputFormat(of)) { return getAcidType(dest); @@ -13675,7 +13675,7 @@ protected boolean deleting(String destination) { // Make sure the proper transaction manager that supports ACID is being used protected void checkAcidTxnManager(Table table) throws SemanticException { - if (SessionState.get() != null && !SessionState.get().getTxnMgr().supportsAcid()) { + if (SessionState.get() != null && !getTxnMgr().supportsAcid()) { throw new SemanticException(ErrorMsg.TXNMGR_NOT_ACID, table.getDbName(), table.getTableName()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 702be2e..b3193d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -70,7 +70,7 @@ public void analyzeInternal(ASTNode tree) throws SemanticException { super.analyzeInternal(tree); } else { - if (!SessionState.get().getTxnMgr().supportsAcid()) { + if (!getTxnMgr().supportsAcid()) { throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg()); } switch (tree.getToken().getType()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index cceeec0..6dece05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.session; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -290,6 +291,8 @@ private String atsDomainId; + private List cleanupItems = new LinkedList(); + /** * Get the lineage state stored in this session. * @@ -1651,6 +1654,14 @@ public void setCurrentDatabase(String currentDatabase) { } public void close() throws IOException { + for (Closeable cleanupItem : cleanupItems) { + try { + cleanupItem.close(); + } catch (Exception err) { + LOG.error("Error processing SessionState cleanup item " + cleanupItem.toString(), err); + } + } + registry.clear(); if (txnMgr != null) txnMgr.closeTxnManager(); JavaUtils.closeClassLoadersTo(sessionConf.getClassLoader(), parentLoader); @@ -1927,6 +1938,10 @@ public void setKillQuery(KillQuery killQuery) { public KillQuery getKillQuery() { return killQuery; } + + public void addCleanupItem(Closeable item) { + cleanupItems.add(item); + } } class ResourceMaps { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 5003f42..985fcda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.udf.generic; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.FileNotFoundException; @@ -69,6 +70,8 @@ import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -184,12 +187,21 @@ public void process(Object[] arguments) throws HiveException { String query = stringOI.getPrimitiveJavaObject(arguments[0]); int num = intOI.get(arguments[1]); - PlanFragment fragment = createPlanFragment(query, num); + // Generate applicationId for the LLAP splits + LlapCoordinator coordinator = LlapCoordinator.getInstance(); + if (coordinator == null) { + throw new HiveException("LLAP coordinator is not initialized; must be running in HS2 with " + + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled"); + } + ApplicationId applicationId = coordinator.createExtClientAppId(); + LOG.info("Generated appID {} for LLAP splits", applicationId.toString()); + + PlanFragment fragment = createPlanFragment(query, num, applicationId); TezWork tezWork = fragment.work; Schema schema = fragment.schema; try { - for (InputSplit s : getSplits(jc, num, tezWork, schema)) { + for (InputSplit s : getSplits(jc, num, tezWork, schema, applicationId)) { Object[] os = new Object[1]; bos.reset(); s.write(dos); @@ -202,7 +214,7 @@ public void process(Object[] arguments) throws HiveException { } } - public PlanFragment createPlanFragment(String query, int num) + public PlanFragment createPlanFragment(String query, int num, ApplicationId splitsAppId) throws HiveException { HiveConf conf = new HiveConf(SessionState.get().getConf()); @@ -224,7 +236,17 @@ public PlanFragment createPlanFragment(String query, int num) throw new HiveException(e); } - Driver driver = new Driver(conf); + // Instantiate Driver to compile the query passed in. + // This UDF is running as part of an existing query, which may already be using the + // SessionState TxnManager. If this new Driver also tries to use the same TxnManager + // then this may mess up the existing state of the TxnManager. + // So initialize the new Driver with a new TxnManager so that it does not use the + // Session TxnManager that is already in use. + HiveTxnManager txnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + Driver driver = new Driver(conf, txnManager); + driver.init(); + DriverCleanup driverCleanup = new DriverCleanup(driver, txnManager, splitsAppId.toString()); + boolean needsCleanup = true; try { CommandProcessorResponse cpr = driver.compileAndRespond(query); if (cpr.getResponseCode() != 0) { @@ -276,16 +298,38 @@ public PlanFragment createPlanFragment(String query, int num) } tezWork = ((TezTask)roots.get(0)).getWork(); + } else { + // Table will be queried directly by LLAP + // Acquire locks if necessary - they will be released during session cleanup. + // The read will have READ_COMMITTED level semantics. + cpr = driver.lockAndRespond(); + if (cpr.getResponseCode() != 0) { + throw new HiveException("Failed to acquire locks: " + cpr.getException()); + } + + // Attach the resources to the session cleanup. + SessionState.get().addCleanupItem(driverCleanup); + needsCleanup = false; } return new PlanFragment(tezWork, schema, jc); } finally { - driver.close(); - driver.destroy(); + if (needsCleanup) { + if (driverCleanup != null) { + try { + driverCleanup.close(); + } catch (IOException err) { + throw new HiveException(err); + } + } else if (driver != null) { + driver.close(); + driver.destroy(); + } + } } } - public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) + public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId) throws IOException { DAG dag = DAG.create(work.getName()); @@ -311,7 +355,6 @@ public PlanFragment createPlanFragment(String query, int num) // Update the queryId to use the generated applicationId. See comment below about // why this is done. - ApplicationId applicationId = coordinator.createExtClientAppId(); HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString()); Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr, new ArrayList(), fs, ctx, false, work, @@ -412,6 +455,37 @@ public PlanFragment createPlanFragment(String query, int num) } } + private static class DriverCleanup implements Closeable { + private final Driver driver; + private final HiveTxnManager txnManager; + private final String applicationId; + + public DriverCleanup(Driver driver, HiveTxnManager txnManager, String applicationId) { + this.driver = driver; + this.txnManager = txnManager; + this.applicationId = applicationId; + } + + @Override + public void close() throws IOException { + try { + LOG.info("DriverCleanup for LLAP splits: {}", applicationId); + driver.releaseLocksAndCommitOrRollback(false); + driver.close(); + driver.destroy(); + txnManager.closeTxnManager(); + } catch (Exception err) { + LOG.error("Error closing driver resources", err); + throw new IOException(err); + } + } + + @Override + public String toString() { + return "DriverCleanup for LLAP splits: " + applicationId; + } + } + private static class JobTokenCreator { private static Token createJobToken(ApplicationId applicationId) { String tokenIdentifier = applicationId.toString();