diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 58680fe2e4..f6729a9a59 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -36,6 +36,7 @@ import java.io.PrintStream; import java.io.Serializable; import java.io.StringWriter; +import java.io.UnsupportedEncodingException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.sql.SQLException; @@ -1200,11 +1201,13 @@ private void initDataSetForTest(File file){ DatasetCollection datasets = parser.getDatasets(); for (String table : datasets.getTables()){ - initDataset(table); + synchronized (QTestUtil.class){ + initDataset(table); + } } } - private void initDataset(String table) { + protected void initDataset(String table) { if (getSrcTables().contains(table)){ return; } @@ -1270,7 +1273,7 @@ public String cliInit(File file, boolean recreate) throws Exception { initDataSetForTest(file); HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, - "org.apache.hadoop.hive.ql.security.DummyAuthenticator"); + "org.apache.hadoop.hive.ql.security.DummyAuthenticator"); Utilities.clearWorkMap(conf); CliSessionState ss = new CliSessionState(conf); assert ss != null; @@ -1287,6 +1290,30 @@ public String cliInit(File file, boolean recreate) throws Exception { } File outf = new File(logDir, stdoutName); + + setSessionOutputs(fileName, ss, outf); + + SessionState oldSs = SessionState.get(); + + boolean canReuseSession = !qNoSessionReuseQuerySet.contains(fileName); + restartSessions(canReuseSession, ss, oldSs); + + closeSession(oldSs); + + SessionState.start(ss); + + cliDriver = new CliDriver(); + + if (fileName.equals("init_file.q")) { + ss.initFiles.add(AbstractCliConfig.HIVE_ROOT + "/data/scripts/test_init_file.sql"); + } + cliDriver.processInitFiles(ss); + + return outf.getAbsolutePath(); + } + + private void setSessionOutputs(String fileName, CliSessionState ss, File outf) + throws FileNotFoundException, Exception, UnsupportedEncodingException { OutputStream fo = new BufferedOutputStream(new FileOutputStream(outf)); if (qSortQuerySet.contains(fileName)) { ss.out = new SortPrintStream(fo, "UTF-8"); @@ -1299,10 +1326,12 @@ public String cliInit(File file, boolean recreate) throws Exception { } ss.err = new CachingPrintStream(fo, true, "UTF-8"); ss.setIsSilent(true); - SessionState oldSs = SessionState.get(); + } - boolean canReuseSession = !qNoSessionReuseQuerySet.contains(fileName); - if (oldSs != null && canReuseSession && clusterType.getCoreClusterType() == CoreClusterType.TEZ) { + private void restartSessions(boolean canReuseSession, CliSessionState ss, SessionState oldSs) + throws IOException { + if (oldSs != null && canReuseSession + && clusterType.getCoreClusterType() == CoreClusterType.TEZ) { // Copy the tezSessionState from the old CliSessionState. TezSessionState tezSessionState = oldSs.getTezSession(); oldSs.setTezSession(null); @@ -1316,27 +1345,9 @@ public String cliInit(File file, boolean recreate) throws Exception { oldSs.setSparkSession(null); oldSs.close(); } - - if (oldSs != null && oldSs.out != null && oldSs.out != System.out) { - oldSs.out.close(); - } - if (oldSs != null) { - oldSs.close(); - } - SessionState.start(ss); - - cliDriver = new CliDriver(); - - if (fileName.equals("init_file.q")) { - ss.initFiles.add(AbstractCliConfig.HIVE_ROOT + "/data/scripts/test_init_file.sql"); - } - cliDriver.processInitFiles(ss); - - return outf.getAbsolutePath(); } - private CliSessionState startSessionState(boolean canReuseSession) - throws IOException { + private CliSessionState startSessionState(boolean canReuseSession) throws IOException { HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, "org.apache.hadoop.hive.ql.security.DummyAuthenticator"); @@ -1350,26 +1361,10 @@ private CliSessionState startSessionState(boolean canReuseSession) ss.err = System.out; SessionState oldSs = SessionState.get(); - if (oldSs != null && canReuseSession && clusterType.getCoreClusterType() == CoreClusterType.TEZ) { - // Copy the tezSessionState from the old CliSessionState. - TezSessionState tezSessionState = oldSs.getTezSession(); - ss.setTezSession(tezSessionState); - oldSs.setTezSession(null); - oldSs.close(); - } - if (oldSs != null && clusterType.getCoreClusterType() == CoreClusterType.SPARK) { - sparkSession = oldSs.getSparkSession(); - ss.setSparkSession(sparkSession); - oldSs.setSparkSession(null); - oldSs.close(); - } - if (oldSs != null && oldSs.out != null && oldSs.out != System.out) { - oldSs.out.close(); - } - if (oldSs != null) { - oldSs.close(); - } + restartSessions(canReuseSession, ss, oldSs); + + closeSession(oldSs); SessionState.start(ss); isSessionStateStarted = true; @@ -1378,6 +1373,15 @@ private CliSessionState startSessionState(boolean canReuseSession) return ss; } + private void closeSession(SessionState oldSs) throws IOException { + if (oldSs != null && oldSs.out != null && oldSs.out != System.out) { + oldSs.out.close(); + } + if (oldSs != null) { + oldSs.close(); + } + } + public int executeAdhocCommand(String q) { if (!q.contains(";")) { return -1; @@ -1986,6 +1990,7 @@ public QTRunner(QTestUtil qt, File file) { @Override public void run() { try { + qt.startSessionState(false); // assumption is that environment has already been cleaned once globally // hence each thread does not call cleanUp() and createSources() again qt.cliInit(file, false);