diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index f0e4f46..8424e32 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -226,9 +226,16 @@ public void process(Object[] arguments) throws HiveException { } } - public PlanFragment createPlanFragment(String query, int num, ApplicationId splitsAppId) - throws HiveException { + private static class HiveConfState { + public HiveConfState(HiveConf conf, String originalMode) { + this.conf = conf; + this.originalMode = originalMode; + } + HiveConf conf; + String originalMode; + } + private HiveConfState createNewConf() { HiveConf conf = new HiveConf(SessionState.get().getConf()); HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none"); HiveConf.setVar(conf, ConfVars.HIVEQUERYRESULTFILEFORMAT, PlanUtils.LLAP_OUTPUT_FORMAT_KEY); @@ -242,6 +249,13 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli // Tez/LLAP requires RPC query plan HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true); HiveConf.setBoolVar(conf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, false); + return new HiveConfState(conf, originalMode); + } + + public PlanFragment createPlanFragment(String query, int num, ApplicationId splitsAppId) + throws HiveException { + + HiveConf conf = createNewConf().conf; try { jc = DagUtils.getInstance().createConfiguration(conf); @@ -285,21 +299,26 @@ public PlanFragment createPlanFragment(String query, int num, ApplicationId spli } if (tezWork == null || tezWork.getAllWork().size() != 1) { - + HiveConfState hiveConfState = createNewConf(); String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", ""); - String storageFormatString = getTempTableStorageFormatString(conf); + String storageFormatString = getTempTableStorageFormatString(hiveConfState.conf); String ctas = "create temporary table " + tableName + " " + storageFormatString + " as " + query; LOG.info("Materializing the query for LLAPIF; CTAS: " + ctas); driver.releaseResources(); - HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_MODE, originalMode); + driver.close(); + driverCleanup = null; + txnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConfState.conf); + driver = new Driver(new QueryState.Builder().withHiveConf(hiveConfState.conf).nonIsolated().build(), null, null, txnManager); + driverCleanup = new DriverCleanup(driver, txnManager, splitsAppId.toString()); + HiveConf.setVar(hiveConfState.conf, ConfVars.HIVE_EXECUTION_MODE, hiveConfState.originalMode); cpr = driver.run(ctas, false); if(cpr.getResponseCode() != 0) { throw new HiveException("Failed to create temp table: " + cpr.getException()); } - HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_MODE, "llap"); + HiveConf.setVar(hiveConfState.conf, ConfVars.HIVE_EXECUTION_MODE, "llap"); query = "select * from " + tableName; cpr = driver.compileAndRespond(query, true); if(cpr.getResponseCode() != 0) {