diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f..912ce2f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1132,13 +1132,12 @@ public FetchTask getFetchTask() { // Write the current set of valid transactions into the conf file so that it can be read by // the input format. - private void recordValidTxns() throws LockException { + private void recordValidTxns(HiveTxnManager txnMgr) throws LockException { ValidTxnList oldList = null; String s = conf.get(ValidTxnList.VALID_TXNS_KEY); if(s != null && s.length() > 0) { oldList = new ValidReadTxnList(s); } - HiveTxnManager txnMgr = SessionState.get().getTxnMgr(); ValidTxnList txns = txnMgr.getValidTxns(); if(oldList != null) { throw new IllegalStateException("calling recordValidTxn() more than once in the same " + @@ -1171,6 +1170,7 @@ private String getUserFromUGI() { } return null; } + /** * Acquire read and write locks needed by the statement. The list of objects to be locked are * obtained from the inputs and outputs populated by the compiler. Locking strategy depends on @@ -1180,11 +1180,15 @@ private String getUserFromUGI() { * transactions have been opened. **/ private int acquireLocks() { + SessionState ss = SessionState.get(); + HiveTxnManager txnMgr = ss.getTxnMgr(); + return acquireLocks(txnMgr); + } + + private int acquireLocks(HiveTxnManager txnMgr) { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); - SessionState ss = SessionState.get(); - HiveTxnManager txnMgr = ss.getTxnMgr(); if(!txnMgr.isTxnOpen() && txnMgr.supportsAcid()) { /*non acid txn managers don't support txns but fwd lock requests to lock managers acid txn manager requires all locks to be associated with a txn so if we @@ -1210,7 +1214,7 @@ private int acquireLocks() { HiveTxnManager can transition its state machine correctly*/ txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); if(txnMgr.recordSnapshot(plan)) { - recordValidTxns(); + recordValidTxns(txnMgr); } return 0; } catch (Exception e) { @@ -1361,6 +1365,22 @@ public CommandProcessorResponse compileAndRespond(String command) { return createProcessorResponse(compileInternal(command, false)); } + public CommandProcessorResponse lockAndRespond(HiveTxnManager txnManager) { + // Assumes the query has already been compiled + if (plan == null) { + throw new IllegalStateException("No previously compiled query for driver"); + } + + int ret = 0; + if (requiresLock()) { + ret = acquireLocks(txnManager); + } + if (ret != 0) { + return rollback(createProcessorResponse(ret)); + } + return createProcessorResponse(ret); + } + private static final ReentrantLock globalCompileLock = new ReentrantLock(); private int compileInternal(String command, boolean deferClose) { int ret; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index d7592bb..834d47a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.session; import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -286,6 +287,8 @@ private String atsDomainId; + private List cleanupItems = new LinkedList(); + /** * Get the lineage state stored in this session. * @@ -1642,6 +1645,14 @@ public void setCurrentDatabase(String currentDatabase) { } public void close() throws IOException { + for (Closeable cleanupItem : cleanupItems) { + try { + cleanupItem.close(); + } catch (Exception err) { + LOG.error("Error processing SessionState cleanup item", err); + } + } + registry.clear(); if (txnMgr != null) txnMgr.closeTxnManager(); JavaUtils.closeClassLoadersTo(sessionConf.getClassLoader(), parentLoader); @@ -1903,6 +1914,9 @@ public ProgressMonitor getProgressMonitor() { return progressMonitor; } + public void addCleanupItem(Closeable item) { + cleanupItems.add(item); + } } class ResourceMaps { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 5003f42..f74c73c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.udf.generic; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.FileNotFoundException; @@ -69,6 +70,8 @@ import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -184,12 +187,21 @@ public void process(Object[] arguments) throws HiveException { String query = stringOI.getPrimitiveJavaObject(arguments[0]); int num = intOI.get(arguments[1]); - PlanFragment fragment = createPlanFragment(query, num); + // Generate applicationId for the LLAP splits + LlapCoordinator coordinator = LlapCoordinator.getInstance(); + if (coordinator == null) { + throw new HiveException("LLAP coordinator is not initialized; must be running in HS2 with " + + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled"); + } + ApplicationId applicationId = coordinator.createExtClientAppId(); + LOG.info("Generated appID {} for LLAP splits", applicationId.toString()); + + PlanFragment fragment = createPlanFragment(query, num, applicationId); TezWork tezWork = fragment.work; Schema schema = fragment.schema; try { - for (InputSplit s : getSplits(jc, num, tezWork, schema)) { + for (InputSplit s : getSplits(jc, num, tezWork, schema, applicationId)) { Object[] os = new Object[1]; bos.reset(); s.write(dos); @@ -202,7 +214,7 @@ public void process(Object[] arguments) throws HiveException { } } - public PlanFragment createPlanFragment(String query, int num) + public PlanFragment createPlanFragment(String query, int num, ApplicationId splitsAppId) throws HiveException { HiveConf conf = new HiveConf(SessionState.get().getConf()); @@ -225,6 +237,8 @@ public PlanFragment createPlanFragment(String query, int num) } Driver driver = new Driver(conf); + DriverCleanup driverCleanup = null; + boolean needsCleanup = true; try { CommandProcessorResponse cpr = driver.compileAndRespond(query); if (cpr.getResponseCode() != 0) { @@ -276,16 +290,39 @@ public PlanFragment createPlanFragment(String query, int num) } tezWork = ((TezTask)roots.get(0)).getWork(); + } else { + // Table will be queried directly by LLAP + // Acquire locks if necessary - they will be released during session cleanup. + HiveTxnManager txnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + driverCleanup = new DriverCleanup(driver, txnManager, splitsAppId.toString()); + cpr = driver.lockAndRespond(txnManager); + if (cpr.getResponseCode() != 0) { + throw new HiveException("Failed to acquire locks: " + cpr.getException()); + } + + // Attach the resources to the session cleanup. + SessionState.get().addCleanupItem(driverCleanup); + needsCleanup = false; } return new PlanFragment(tezWork, schema, jc); } finally { - driver.close(); - driver.destroy(); + if (needsCleanup) { + if (driverCleanup != null) { + try { + driverCleanup.close(); + } catch (IOException err) { + throw new HiveException(err); + } + } else if (driver != null) { + driver.close(); + driver.destroy(); + } + } } } - public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) + public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId) throws IOException { DAG dag = DAG.create(work.getName()); @@ -311,7 +348,6 @@ public PlanFragment createPlanFragment(String query, int num) // Update the queryId to use the generated applicationId. See comment below about // why this is done. - ApplicationId applicationId = coordinator.createExtClientAppId(); HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString()); Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr, new ArrayList(), fs, ctx, false, work, @@ -412,6 +448,32 @@ public PlanFragment createPlanFragment(String query, int num) } } + private static class DriverCleanup implements Closeable { + private final Driver driver; + private final HiveTxnManager txnManager; + private final String applicationId; + + public DriverCleanup(Driver driver, HiveTxnManager txnManager, String applicationId) { + this.driver = driver; + this.txnManager = txnManager; + this.applicationId = applicationId; + } + + @Override + public void close() throws IOException { + try { + LOG.info("DriverCleanup for LLAP splits: {}", applicationId); + driver.releaseLocksAndCommitOrRollback(true, txnManager); + driver.close(); + driver.destroy(); + txnManager.closeTxnManager(); + } catch (Exception err) { + LOG.error("Error closing driver resources", err); + throw new IOException(err); + } + } + } + private static class JobTokenCreator { private static Token createJobToken(ApplicationId applicationId) { String tokenIdentifier = applicationId.toString();