diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f..678798d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -178,6 +178,13 @@ private QueryLifeTimeHookRunner queryLifeTimeHookRunner; private final HooksLoader hooksLoader; + // Transaction manager the Driver has been initialized with (can be null). + private HiveTxnManager initTxnMgr; + + // Transaction manager used for the query. This will be set at compile time based on + // either initTxnMgr or from the SessionState, in that order. + private HiveTxnManager queryTxnMgr; + public enum DriverState { INITIALIZED, COMPILING, @@ -355,6 +362,11 @@ public Driver(HiveConf conf) { this(getNewQueryState(conf), null); } + public Driver(HiveConf conf, HiveTxnManager txnMgr) { + this(getNewQueryState(conf), null); + this.initTxnMgr = txnMgr; + } + public Driver(HiveConf conf, Context ctx) { this(getNewQueryState(conf), null); this.ctx = ctx; @@ -480,16 +492,20 @@ private int compile(String command, boolean resetTaskIds, boolean deferClose) { try { // Initialize the transaction manager. This must be done before analyze is called. - final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf); - // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks + if (initTxnMgr != null) { + queryTxnMgr = initTxnMgr; + } else { + queryTxnMgr = SessionState.get().initTxnMgr(conf); + } + // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks // if compile is being called multiple times, clear the old shutdownhook ShutdownHookManager.removeShutdownHook(shutdownRunner); shutdownRunner = new Runnable() { @Override public void run() { try { - releaseLocksAndCommitOrRollback(false, txnManager); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " + e.getMessage()); @@ -538,13 +554,13 @@ public void run() { // because at that point we need access to the objects. Hive.get().getMSC().flushCache(); - if(checkConcurrency() && startImplicitTxn(txnManager)) { + if(checkConcurrency() && startImplicitTxn(queryTxnMgr)) { String userFromUGI = getUserFromUGI(); - if (!txnManager.isTxnOpen()) { + if (!queryTxnMgr.isTxnOpen()) { if(userFromUGI == null) { return 10; } - long txnid = txnManager.openTxn(ctx, userFromUGI); + long txnid = queryTxnMgr.openTxn(ctx, userFromUGI); } } // Do semantic analysis and plan generation @@ -1132,13 +1148,12 @@ public FetchTask getFetchTask() { // Write the current set of valid transactions into the conf file so that it can be read by // the input format. - private void recordValidTxns() throws LockException { + private void recordValidTxns(HiveTxnManager txnMgr) throws LockException { ValidTxnList oldList = null; String s = conf.get(ValidTxnList.VALID_TXNS_KEY); if(s != null && s.length() > 0) { oldList = new ValidReadTxnList(s); } - HiveTxnManager txnMgr = SessionState.get().getTxnMgr(); ValidTxnList txns = txnMgr.getValidTxns(); if(oldList != null) { throw new IllegalStateException("calling recordValidTxn() more than once in the same " + @@ -1171,6 +1186,7 @@ private String getUserFromUGI() { } return null; } + /** * Acquire read and write locks needed by the statement. The list of objects to be locked are * obtained from the inputs and outputs populated by the compiler. Locking strategy depends on @@ -1183,9 +1199,7 @@ private int acquireLocks() { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); - SessionState ss = SessionState.get(); - HiveTxnManager txnMgr = ss.getTxnMgr(); - if(!txnMgr.isTxnOpen() && txnMgr.supportsAcid()) { + if(!queryTxnMgr.isTxnOpen() && queryTxnMgr.supportsAcid()) { /*non acid txn managers don't support txns but fwd lock requests to lock managers acid txn manager requires all locks to be associated with a txn so if we end up here w/o an open txn it's because we are processing something like "use @@ -1200,17 +1214,17 @@ private int acquireLocks() { // Set the transaction id in all of the acid file sinks if (haveAcidWrite()) { for (FileSinkDesc desc : plan.getAcidSinks()) { - desc.setTransactionId(txnMgr.getCurrentTxnId()); + desc.setTransactionId(queryTxnMgr.getCurrentTxnId()); //it's possible to have > 1 FileSink writing to the same table/partition //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes - desc.setStatementId(txnMgr.getWriteIdAndIncrement()); + desc.setStatementId(queryTxnMgr.getWriteIdAndIncrement()); } } /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ - txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); - if(txnMgr.recordSnapshot(plan)) { - recordValidTxns(); + queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); + if(queryTxnMgr.recordSnapshot(plan)) { + recordValidTxns(queryTxnMgr); } return 0; } catch (Exception e) { @@ -1228,6 +1242,11 @@ private int acquireLocks() { private boolean haveAcidWrite() { return !plan.getAcidSinks().isEmpty(); } + + public void releaseLocksAndCommitOrRollback(boolean commit) throws LockException { + releaseLocksAndCommitOrRollback(commit, queryTxnMgr); + } + /** * @param commit if there is an open transaction and if true, commit, * if false rollback. If there is no open transaction this parameter is ignored. @@ -1241,8 +1260,8 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS); HiveTxnManager txnMgr; if (txnManager == null) { - SessionState ss = SessionState.get(); - txnMgr = ss.getTxnMgr(); + // Default to driver's txn manager if no txn manager specified + txnMgr = queryTxnMgr; } else { txnMgr = txnManager; } @@ -1361,6 +1380,22 @@ public CommandProcessorResponse compileAndRespond(String command) { return createProcessorResponse(compileInternal(command, false)); } + public CommandProcessorResponse lockAndRespond() { + // Assumes the query has already been compiled + if (plan == null) { + throw new IllegalStateException("No previously compiled query for driver"); + } + + int ret = 0; + if (requiresLock()) { + ret = acquireLocks(); + } + if (ret != 0) { + return rollback(createProcessorResponse(ret)); + } + return createProcessorResponse(ret); + } + private static final ReentrantLock globalCompileLock = new ReentrantLock(); private int compileInternal(String command, boolean deferClose) { int ret; @@ -1391,7 +1426,7 @@ private int compileInternal(String command, boolean deferClose) { if (ret != 0) { try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -1531,8 +1566,7 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - HiveTxnManager txnManager = SessionState.get().getTxnMgr(); - ctx.setHiveTxnManager(txnManager); + ctx.setHiveTxnManager(queryTxnMgr); if (requiresLock()) { // a checkpoint to see if the thread is interrupted or not before an expensive operation @@ -1554,11 +1588,11 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp //if needRequireLock is false, the release here will do nothing because there is no lock try { //since set autocommit starts an implicit txn, close it - if(txnManager.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) { - releaseLocksAndCommitOrRollback(true, null); + if(queryTxnMgr.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) { + releaseLocksAndCommitOrRollback(true); } else if(plan.getOperation() == HiveOperation.ROLLBACK) { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } else { //txn (if there is one started) is not finished @@ -1609,7 +1643,7 @@ else if(plan.getOperation() == HiveOperation.ROLLBACK) { private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { //console.printError(cpr.toString()); try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.error("rollback() FAILED: " + cpr);//make sure not to loose @@ -2351,7 +2385,7 @@ private int closeInProcess(boolean destroyed) { if(destroyed) { if (!hiveLocks.isEmpty()) { try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception when releasing locking in destroy: " + e.getMessage()); @@ -2406,7 +2440,7 @@ public void destroy() { } if (!hiveLocks.isEmpty()) { try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception when releasing locking in destroy: " + e.getMessage()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 8b64407..e8c9d87 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.session; import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -286,6 +287,8 @@ private String atsDomainId; + private List cleanupItems = new LinkedList(); + /** * Get the lineage state stored in this session. * @@ -1646,6 +1649,14 @@ public void setCurrentDatabase(String currentDatabase) { } public void close() throws IOException { + for (Closeable cleanupItem : cleanupItems) { + try { + cleanupItem.close(); + } catch (Exception err) { + LOG.error("Error processing SessionState cleanup item", err); + } + } + registry.clear(); if (txnMgr != null) txnMgr.closeTxnManager(); JavaUtils.closeClassLoadersTo(sessionConf.getClassLoader(), parentLoader); @@ -1907,6 +1918,9 @@ public ProgressMonitor getProgressMonitor() { return progressMonitor; } + public void addCleanupItem(Closeable item) { + cleanupItems.add(item); + } } class ResourceMaps { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 5003f42..7eced53 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.udf.generic; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.FileNotFoundException; @@ -69,6 +70,8 @@ import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -184,12 +187,21 @@ public void process(Object[] arguments) throws HiveException { String query = stringOI.getPrimitiveJavaObject(arguments[0]); int num = intOI.get(arguments[1]); - PlanFragment fragment = createPlanFragment(query, num); + // Generate applicationId for the LLAP splits + LlapCoordinator coordinator = LlapCoordinator.getInstance(); + if (coordinator == null) { + throw new HiveException("LLAP coordinator is not initialized; must be running in HS2 with " + + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled"); + } + ApplicationId applicationId = coordinator.createExtClientAppId(); + LOG.info("Generated appID {} for LLAP splits", applicationId.toString()); + + PlanFragment fragment = createPlanFragment(query, num, applicationId); TezWork tezWork = fragment.work; Schema schema = fragment.schema; try { - for (InputSplit s : getSplits(jc, num, tezWork, schema)) { + for (InputSplit s : getSplits(jc, num, tezWork, schema, applicationId)) { Object[] os = new Object[1]; bos.reset(); s.write(dos); @@ -202,7 +214,7 @@ public void process(Object[] arguments) throws HiveException { } } - public PlanFragment createPlanFragment(String query, int num) + public PlanFragment createPlanFragment(String query, int num, ApplicationId splitsAppId) throws HiveException { HiveConf conf = new HiveConf(SessionState.get().getConf()); @@ -224,7 +236,10 @@ public PlanFragment createPlanFragment(String query, int num) throw new HiveException(e); } - Driver driver = new Driver(conf); + HiveTxnManager txnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + Driver driver = new Driver(conf, txnManager); + DriverCleanup driverCleanup = new DriverCleanup(driver, txnManager, splitsAppId.toString()); + boolean needsCleanup = true; try { CommandProcessorResponse cpr = driver.compileAndRespond(query); if (cpr.getResponseCode() != 0) { @@ -276,16 +291,38 @@ public PlanFragment createPlanFragment(String query, int num) } tezWork = ((TezTask)roots.get(0)).getWork(); + } else { + // Table will be queried directly by LLAP + // Acquire locks if necessary - they will be released during session cleanup. + // The read will have READ_COMMITTED level semantics. + cpr = driver.lockAndRespond(); + if (cpr.getResponseCode() != 0) { + throw new HiveException("Failed to acquire locks: " + cpr.getException()); + } + + // Attach the resources to the session cleanup. + SessionState.get().addCleanupItem(driverCleanup); + needsCleanup = false; } return new PlanFragment(tezWork, schema, jc); } finally { - driver.close(); - driver.destroy(); + if (needsCleanup) { + if (driverCleanup != null) { + try { + driverCleanup.close(); + } catch (IOException err) { + throw new HiveException(err); + } + } else if (driver != null) { + driver.close(); + driver.destroy(); + } + } } } - public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) + public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId) throws IOException { DAG dag = DAG.create(work.getName()); @@ -311,7 +348,6 @@ public PlanFragment createPlanFragment(String query, int num) // Update the queryId to use the generated applicationId. See comment below about // why this is done. - ApplicationId applicationId = coordinator.createExtClientAppId(); HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString()); Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr, new ArrayList(), fs, ctx, false, work, @@ -412,6 +448,32 @@ public PlanFragment createPlanFragment(String query, int num) } } + private static class DriverCleanup implements Closeable { + private final Driver driver; + private final HiveTxnManager txnManager; + private final String applicationId; + + public DriverCleanup(Driver driver, HiveTxnManager txnManager, String applicationId) { + this.driver = driver; + this.txnManager = txnManager; + this.applicationId = applicationId; + } + + @Override + public void close() throws IOException { + try { + LOG.info("DriverCleanup for LLAP splits: {}", applicationId); + driver.releaseLocksAndCommitOrRollback(false); + driver.close(); + driver.destroy(); + txnManager.closeTxnManager(); + } catch (Exception err) { + LOG.error("Error closing driver resources", err); + throw new IOException(err); + } + } + } + private static class JobTokenCreator { private static Token createJobToken(ApplicationId applicationId) { String tokenIdentifier = applicationId.toString();