diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestLocationQueries.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestLocationQueries.java index 405d21cdb2..1e01c96d2d 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestLocationQueries.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestLocationQueries.java @@ -24,7 +24,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; /** * Suite for testing location. e.g. if "alter table alter partition @@ -123,7 +123,7 @@ public void testAlterTablePartitionLocation_alter5() throws Exception { qt[i].clearTestSideEffects(); } - boolean success = QTestUtil.queryListRunnerSingleThreaded(qfiles, qt); + boolean success = QTestRunnerUtils.queryListRunnerSingleThreaded(qfiles, qt); if (!success) { fail("One or more queries failed"); } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java index 62c037edc8..e336712329 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestMTQueries.java @@ -36,7 +36,7 @@ public void testMTQueries1() throws Exception { String[] testNames = new String[] {"join2.q", "groupby1.q", "input1.q", "input19.q"}; File[] qfiles = setupQFiles(testNames); - QTestUtil[] qts = QTestUtil.queryListRunnerSetup(qfiles, resDir, logDir, "q_test_init_src_with_stats.sql", + QTestUtil[] qts = QTestRunnerUtils.queryListRunnerSetup(qfiles, resDir, logDir, "q_test_init_src_with_stats.sql", "q_test_cleanup_src_with_stats.sql"); for (QTestUtil util : qts) { // derby fails creating multiple stats aggregator concurrently @@ -48,7 +48,7 @@ public void testMTQueries1() throws Exception { util.getConf().set("hive.stats.column.autogather", "false"); util.newSession(); } - boolean success = QTestUtil.queryListRunnerMultiThreaded(qfiles, qts); + boolean success = QTestRunnerUtils.queryListRunnerMultiThreaded(qfiles, qts); if (!success) { fail("One or more queries failed"); } diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java index b2ddff7a2e..ca77db0dd4 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java @@ -50,7 +50,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import static org.apache.hadoop.hive.ql.QTestUtil.DEFAULT_TEST_EC_POLICY; +import static org.apache.hadoop.hive.ql.QTestMiniClusters.DEFAULT_TEST_EC_POLICY; import static org.apache.hive.jdbc.TestJdbcWithMiniHS2.getDetailedTableDescription; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloQTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloQTestUtil.java index 01c3c0bebb..acf64b0dc9 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloQTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloQTestUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.accumulo; import org.apache.hadoop.hive.ql.QTestArguments; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.apache.hadoop.hive.ql.QTestUtil; /** diff --git itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloTestSetup.java itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloTestSetup.java index c5998145e2..4e24e15717 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloTestSetup.java +++ itests/util/src/main/java/org/apache/hadoop/hive/accumulo/AccumuloTestSetup.java @@ -20,9 +20,6 @@ import java.sql.Date; import java.sql.Timestamp; -import junit.extensions.TestSetup; -import junit.framework.Test; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; @@ -37,12 +34,12 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.hive.ql.QTestMiniClusters; /** * Start and stop an AccumuloMiniCluster for testing purposes */ -public class AccumuloTestSetup extends QTestUtil.QTestSetup { +public class AccumuloTestSetup extends QTestMiniClusters.QTestSetup { public static final String PASSWORD = "password"; public static final String TABLE_NAME = "accumuloHiveTable"; diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/AbstractCliConfig.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/AbstractCliConfig.java index 142f903ecc..c83b010fd7 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/AbstractCliConfig.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/AbstractCliConfig.java @@ -33,9 +33,8 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.ql.QTestUtil; -import org.apache.hadoop.hive.ql.QTestUtil.FsType; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.FsType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.apache.hive.testutils.HiveTestEnvSetup; import com.google.common.base.Splitter; diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/AbstractCoreBlobstoreCliDriver.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/AbstractCoreBlobstoreCliDriver.java index aa2c7a7ea3..0377e0e2e3 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/AbstractCoreBlobstoreCliDriver.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/AbstractCoreBlobstoreCliDriver.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.QTestArguments; import org.apache.hadoop.hive.ql.QTestProcessExecResult; import org.apache.hadoop.hive.ql.QTestUtil; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.apache.hive.testutils.HiveTestEnvSetup; import org.junit.After; import org.junit.AfterClass; diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 1108e49668..f43e83ecd7 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -21,8 +21,8 @@ import java.net.MalformedURLException; import java.net.URL; -import org.apache.hadoop.hive.ql.QTestUtil; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.apache.hadoop.hive.ql.parse.CoreParseNegative; public class CliConfigs { @@ -143,7 +143,7 @@ public MiniTezCliConfig() { setHiveConfDir("data/conf/tez"); setClusterType(MiniClusterType.tez); setMetastoreType(MetastoreType.sql); - setFsType(QTestUtil.FsType.hdfs); + setFsType(QTestMiniClusters.FsType.hdfs); } catch (Exception e) { throw new RuntimeException("can't construct cliconfig", e); } @@ -194,7 +194,7 @@ public MiniDruidCliConfig() { setHiveConfDir("data/conf/llap"); setClusterType(MiniClusterType.druid); setMetastoreType(MetastoreType.sql); - setFsType(QTestUtil.FsType.hdfs); + setFsType(QTestMiniClusters.FsType.hdfs); } catch (Exception e) { throw new RuntimeException("can't construct cliconfig", e); } @@ -215,7 +215,7 @@ public MiniDruidKafkaCliConfig() { setHiveConfDir("data/conf/llap"); setClusterType(MiniClusterType.druidKafka); setMetastoreType(MetastoreType.sql); - setFsType(QTestUtil.FsType.hdfs); + setFsType(QTestMiniClusters.FsType.hdfs); } catch (Exception e) { throw new RuntimeException("can't construct cliconfig", e); } @@ -233,7 +233,7 @@ public MiniKafkaCliConfig() { setHiveConfDir("data/conf/llap"); setClusterType(MiniClusterType.kafka); setMetastoreType(MetastoreType.sql); - setFsType(QTestUtil.FsType.hdfs); + setFsType(QTestMiniClusters.FsType.hdfs); } catch (Exception e) { throw new RuntimeException("can't construct cliconfig", e); } @@ -269,7 +269,7 @@ public MiniLlapLocalCliConfig() { setHiveConfDir("data/conf/llap"); setClusterType(MiniClusterType.llap_local); setMetastoreType(MetastoreType.sql); - setFsType(QTestUtil.FsType.local); + setFsType(QTestMiniClusters.FsType.local); } catch (Exception e) { throw new RuntimeException("can't construct cliconfig", e); } @@ -292,7 +292,7 @@ public EncryptedHDFSCliConfig() { setClusterType(MiniClusterType.mr); - setFsType(QTestUtil.FsType.encrypted_hdfs); + setFsType(QTestMiniClusters.FsType.encrypted_hdfs); if (getClusterType() == MiniClusterType.tez) { setHiveConfDir("data/conf/tez"); } else { @@ -727,7 +727,7 @@ public ErasureCodingHDFSCliConfig() { setCleanupScript("q_test_cleanup_src.sql"); setClusterType(MiniClusterType.mr); - setFsType(QTestUtil.FsType.erasure_coded_hdfs); + setFsType(QTestMiniClusters.FsType.erasure_coded_hdfs); setHiveConfDir(getClusterType()); } catch (Exception e) { throw new RuntimeException("can't construct cliconfig", e); @@ -771,7 +771,7 @@ public MiniDruidLlapLocalCliConfig() { setHiveConfDir("data/conf/llap"); setClusterType(MiniClusterType.druidLocal); setMetastoreType(MetastoreType.sql); - setFsType(QTestUtil.FsType.local); + setFsType(QTestMiniClusters.FsType.local); } catch (Exception e) { throw new RuntimeException("can't construct cliconfig", e); } diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreAccumuloCliDriver.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreAccumuloCliDriver.java index 6bbcf3d474..88830c24d1 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreAccumuloCliDriver.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreAccumuloCliDriver.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.accumulo.AccumuloQTestUtil; import org.apache.hadoop.hive.accumulo.AccumuloTestSetup; import org.apache.hadoop.hive.ql.QTestProcessExecResult; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreCliDriver.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreCliDriver.java index 846547466d..f2428b5106 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreCliDriver.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreCliDriver.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.QTestArguments; import org.apache.hadoop.hive.ql.QTestProcessExecResult; import org.apache.hadoop.hive.ql.QTestUtil; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.apache.hadoop.hive.util.ElapsedTimeLoggingWrapper; import org.junit.After; import org.junit.AfterClass; @@ -166,11 +166,6 @@ public Void invokeInternal() throws Exception { } } - private static String debugHint = - "\nSee ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, " - + "or check ./ql/target/surefire-reports " - + "or ./itests/qtest/target/surefire-reports/ for specific test cases logs."; - @Override public void runTest(String testName, String fname, String fpath) { Stopwatch sw = Stopwatch.createStarted(); @@ -186,7 +181,7 @@ public void runTest(String testName, String fname, String fpath) { int ecode = qt.executeClient(fname); if (ecode != 0) { failed = true; - qt.failed(ecode, fname, debugHint); + qt.failed(ecode, fname, QTestUtil.DEBUG_HINT); } setupAdditionalPartialMasks(); @@ -194,14 +189,14 @@ public void runTest(String testName, String fname, String fpath) { resetAdditionalPartialMasks(); if (result.getReturnCode() != 0) { failed = true; - String message = Strings.isNullOrEmpty(result.getCapturedOutput()) ? debugHint + String message = Strings.isNullOrEmpty(result.getCapturedOutput()) ? QTestUtil.DEBUG_HINT : "\r\n" + result.getCapturedOutput(); qt.failedDiff(result.getReturnCode(), fname, message); } } catch (Exception e) { failed = true; - qt.failed(e, fname, debugHint); + qt.failed(e, fname, QTestUtil.DEBUG_HINT); } finally { String message = "Done query " + fname + ". succeeded=" + !failed + ", skipped=" + skipped + ". ElapsedTime(ms)=" + sw.stop().elapsed(TimeUnit.MILLISECONDS); diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreCompareCliDriver.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreCompareCliDriver.java index 7a06768fda..3a4e3d0d2e 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreCompareCliDriver.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreCompareCliDriver.java @@ -23,10 +23,11 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.ql.QFileVersionHandler; import org.apache.hadoop.hive.ql.QTestArguments; import org.apache.hadoop.hive.ql.QTestProcessExecResult; import org.apache.hadoop.hive.ql.QTestUtil; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -36,6 +37,7 @@ public class CoreCompareCliDriver extends CliAdapter{ private static QTestUtil qt; + private QFileVersionHandler qvh = new QFileVersionHandler(); public CoreCompareCliDriver(AbstractCliConfig testCliConfig) { super(testCliConfig); @@ -115,9 +117,6 @@ public void shutdown() { } } - private static String debugHint = "\nSee ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, " - + "or check ./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific test cases logs."; - @Override public void runTest(String tname, String fname, String fpath) { final String queryDirectory = cliConfig.getQueryDirectory(); @@ -126,7 +125,7 @@ public void runTest(String tname, String fname, String fpath) { try { System.err.println("Begin query: " + fname); // TODO: versions could also be picked at build time. - List versionFiles = QTestUtil.getVersionFiles(queryDirectory, tname); + List versionFiles = qvh.getVersionFiles(queryDirectory, tname); if (versionFiles.size() < 2) { fail("Cannot run " + tname + " with only " + versionFiles.size() + " versions"); } @@ -148,19 +147,18 @@ public void runTest(String tname, String fname, String fpath) { // TODO: will this work? ecode = qt.executeClient(versionFile, fname); if (ecode != 0) { - qt.failed(ecode, fname, debugHint); + qt.failed(ecode, fname, QTestUtil.DEBUG_HINT); } } QTestProcessExecResult result = qt.checkCompareCliDriverResults(fname, outputs); if (result.getReturnCode() != 0) { - String message = Strings.isNullOrEmpty(result.getCapturedOutput()) ? - debugHint : "\r\n" + result.getCapturedOutput(); + String message = Strings.isNullOrEmpty(result.getCapturedOutput()) ? QTestUtil.DEBUG_HINT + : "\r\n" + result.getCapturedOutput(); qt.failedDiff(result.getReturnCode(), fname, message); } - } - catch (Exception e) { - qt.failed(e, fname, debugHint); + } catch (Exception e) { + qt.failed(e, fname, QTestUtil.DEBUG_HINT); } long elapsedTime = System.currentTimeMillis() - startTime; diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreDummy.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreDummy.java index a0d1433536..301b91e54e 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreDummy.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreDummy.java @@ -21,9 +21,10 @@ import java.util.List; -import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.hive.ql.QFileVersionHandler; public class CoreDummy extends CliAdapter { + QFileVersionHandler qvh = new QFileVersionHandler(); public CoreDummy(AbstractCliConfig cliConfig) { super(cliConfig); @@ -47,7 +48,7 @@ public void shutdown() { @Override public void runTest(String name, String name2, String absolutePath) { - List versionFiles = QTestUtil.getVersionFiles(cliConfig.getQueryDirectory(), name); + List versionFiles = qvh.getVersionFiles(cliConfig.getQueryDirectory(), name); if (versionFiles.size() < 2) { fail("Cannot run " + name2 + " with only " + versionFiles.size() + " versions"); } diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreHBaseCliDriver.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreHBaseCliDriver.java index 0d67768967..7412c55387 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreHBaseCliDriver.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreHBaseCliDriver.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.hbase.HBaseQTestUtil; import org.apache.hadoop.hive.hbase.HBaseTestSetup; import org.apache.hadoop.hive.ql.QTestProcessExecResult; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreHBaseNegativeCliDriver.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreHBaseNegativeCliDriver.java index af170a9514..eefdce2e0d 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreHBaseNegativeCliDriver.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreHBaseNegativeCliDriver.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.hbase.HBaseQTestUtil; import org.apache.hadoop.hive.hbase.HBaseTestSetup; import org.apache.hadoop.hive.ql.QTestProcessExecResult; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreNegativeCliDriver.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreNegativeCliDriver.java index 91c3bf87b9..ed8830e103 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreNegativeCliDriver.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreNegativeCliDriver.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.QTestArguments; import org.apache.hadoop.hive.ql.QTestProcessExecResult; import org.apache.hadoop.hive.ql.QTestUtil; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -113,9 +113,6 @@ public void shutdown() { } } - private static String debugHint = "\nSee ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, " - + "or check ./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific test cases logs."; - @Override public void runTest(String tname, String fname, String fpath) throws Exception { long startTime = System.currentTimeMillis(); @@ -127,25 +124,24 @@ public void runTest(String tname, String fname, String fpath) throws Exception { int ecode = qt.executeClient(fname); if (ecode == 0) { - qt.failed(fname, debugHint); + qt.failed(fname, QTestUtil.DEBUG_HINT); } QTestProcessExecResult result = qt.checkCliDriverResults(fname); if (result.getReturnCode() != 0) { - String message = Strings.isNullOrEmpty(result.getCapturedOutput()) ? - debugHint : "\r\n" + result.getCapturedOutput(); + String message = Strings.isNullOrEmpty(result.getCapturedOutput()) ? QTestUtil.DEBUG_HINT + : "\r\n" + result.getCapturedOutput(); qt.failedDiff(result.getReturnCode(), fname, message); } } catch (Error error) { QTestProcessExecResult qTestProcessExecResult = qt.checkNegativeResults(fname, error); if (qTestProcessExecResult.getReturnCode() != 0) { - String message = Strings.isNullOrEmpty(qTestProcessExecResult.getCapturedOutput()) ? debugHint : - "\r\n" + qTestProcessExecResult.getCapturedOutput(); + String message = Strings.isNullOrEmpty(qTestProcessExecResult.getCapturedOutput()) + ? QTestUtil.DEBUG_HINT : "\r\n" + qTestProcessExecResult.getCapturedOutput(); qt.failedDiff(qTestProcessExecResult.getReturnCode(), fname, message); } - } - catch (Exception e) { - qt.failed(e, fname, debugHint); + } catch (Exception e) { + qt.failed(e, fname, QTestUtil.DEBUG_HINT); } long elapsedTime = System.currentTimeMillis() - startTime; diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CorePerfCliDriver.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CorePerfCliDriver.java index cf3d815441..fd73566b4c 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CorePerfCliDriver.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CorePerfCliDriver.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.cli.control; - - import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -28,25 +26,23 @@ import org.apache.hadoop.hive.ql.MetaStoreDumpUtility; import org.apache.hadoop.hive.ql.QTestArguments; import org.apache.hadoop.hive.ql.QTestProcessExecResult; +import org.apache.hadoop.hive.ql.QTestSystemProperties; import org.apache.hadoop.hive.ql.QTestUtil; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.junit.After; import org.junit.AfterClass; import com.google.common.base.Strings; + /** - This is the TestPerformance Cli Driver for integrating performance regression tests - as part of the Hive Unit tests. - Currently this includes support for : - 1. Running explain plans for TPCDS workload (non-partitioned dataset) on 30TB scaleset. - TODO : - 1. Support for partitioned data set - 2. Use HBase Metastore instead of Derby - -This suite differs from TestCliDriver w.r.t the fact that we modify the underlying metastore -database to reflect the dataset before running the queries. -*/ -public class CorePerfCliDriver extends CliAdapter{ + * This is the TestPerformance Cli Driver for integrating performance regression tests as part of + * the Hive Unit tests. Currently this includes support for : 1. Running explain plans for TPCDS + * workload (non-partitioned dataset) on 30TB scaleset. TODO : 1. Support for partitioned data set + * 2. Use HBase Metastore instead of Derby + * This suite differs from TestCliDriver w.r.t the fact that we modify the underlying metastore + * database to reflect the dataset before running the queries. + */ +public class CorePerfCliDriver extends CliAdapter { private static QTestUtil qt; @@ -65,16 +61,10 @@ public void beforeClass() { String cleanupScript = cliConfig.getCleanupScript(); try { - qt = new QTestUtil( - QTestArguments.QTestArgumentsBuilder.instance() - .withOutDir(cliConfig.getResultsDir()) - .withLogDir(cliConfig.getLogDir()) - .withClusterType(miniMR) - .withConfDir(hiveConfDir) - .withInitScript(initScript) - .withCleanupScript(cleanupScript) - .withLlapIo(false) - .build()); + qt = new QTestUtil(QTestArguments.QTestArgumentsBuilder.instance() + .withOutDir(cliConfig.getResultsDir()).withLogDir(cliConfig.getLogDir()) + .withClusterType(miniMR).withConfDir(hiveConfDir).withInitScript(initScript) + .withCleanupScript(cleanupScript).withLlapIo(false).build()); // do a one time initialization qt.newSession(); @@ -83,13 +73,14 @@ public void beforeClass() { // Manually modify the underlying metastore db to reflect statistics corresponding to // the 30TB TPCDS scale set. This way the optimizer will generate plans for a 30 TB set. MetaStoreDumpUtility.setupMetaStoreTableColumnStatsFor30TBTPCDSWorkload(qt.getConf(), - System.getProperty(QTestUtil.TEST_TMP_DIR_PROPERTY)); + QTestSystemProperties.getTempDir()); } catch (Exception e) { System.err.println("Exception: " + e.getMessage()); e.printStackTrace(); System.err.flush(); - throw new RuntimeException("Unexpected exception in static initialization: " + e.getMessage(), e); + throw new RuntimeException("Unexpected exception in static initialization: " + e.getMessage(), + e); } } @@ -126,10 +117,6 @@ public void tearDown() { } } - private static String debugHint = - "\nSee ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, " - + "or check ./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific test cases logs."; - @Override public void runTest(String name, String fname, String fpath) { long startTime = System.currentTimeMillis(); @@ -141,17 +128,17 @@ public void runTest(String name, String fname, String fpath) { int ecode = qt.executeClient(fname); if (ecode != 0) { - qt.failed(ecode, fname, debugHint); + qt.failed(ecode, fname, QTestUtil.DEBUG_HINT); } QTestProcessExecResult result = qt.checkCliDriverResults(fname); if (result.getReturnCode() != 0) { - String message = Strings.isNullOrEmpty(result.getCapturedOutput()) ? - debugHint : "\r\n" + result.getCapturedOutput(); + String message = Strings.isNullOrEmpty(result.getCapturedOutput()) ? QTestUtil.DEBUG_HINT + : "\r\n" + result.getCapturedOutput(); qt.failedDiff(result.getReturnCode(), fname, message); } } catch (Exception e) { - qt.failed(e, fname, debugHint); + qt.failed(e, fname, QTestUtil.DEBUG_HINT); } long elapsedTime = System.currentTimeMillis() - startTime; diff --git itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java index e8e51768e3..a327167c79 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hive.ql.QTestArguments; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.apache.hadoop.hive.ql.QTestUtil; /** @@ -62,7 +63,7 @@ public void createSources(String tname) throws Exception { super.createSources(tname); conf.setBoolean("hive.test.init.phase", true); - initDataset(HBASE_SRC_NAME); + datasetHandler.initDataset(HBASE_SRC_NAME, getCliDriver()); // create a snapshot Admin admin = null; diff --git itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java index ec38aea250..75015808bc 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java +++ itests/util/src/main/java/org/apache/hadoop/hive/hbase/HBaseTestSetup.java @@ -29,15 +29,15 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.hive.ql.QTestMiniClusters.QTestSetup; import org.apache.hadoop.mapred.JobConf; import org.apache.zookeeper.Watcher; @@ -45,7 +45,7 @@ * HBaseTestSetup defines HBase-specific test fixtures which are * reused across testcases. */ -public class HBaseTestSetup extends QTestUtil.QTestSetup { +public class HBaseTestSetup extends QTestSetup { private MiniHBaseCluster hbaseCluster; private HBaseTestingUtility util; diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QFileVersionHandler.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QFileVersionHandler.java new file mode 100644 index 0000000000..4db6cb3b6c --- /dev/null +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QFileVersionHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.io.FilenameFilter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.tools.ant.BuildException; + +import com.google.common.collect.ImmutableList; + +public class QFileVersionHandler { + private static final Pattern QV_SUFFIX = Pattern.compile("_[0-9]+.qv$", Pattern.CASE_INSENSITIVE); + + private String[] cachedQvFileList = null; + private ImmutableList cachedDefaultQvFileList = null; + + public List getVersionFiles(String queryDir, String tname) { + ensureQvFileList(queryDir); + List result = getVersionFilesInternal(tname); + if (result == null) { + result = cachedDefaultQvFileList; + } + return result; + } + + private void ensureQvFileList(String queryDir) { + if (cachedQvFileList != null) { + return; + } + // Not thread-safe. + System.out.println("Getting versions from " + queryDir); + cachedQvFileList = (new File(queryDir)).list(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(".qv"); + } + }); + if (cachedQvFileList == null) { + return; // no files at all + } + Arrays.sort(cachedQvFileList, String.CASE_INSENSITIVE_ORDER); + List defaults = getVersionFilesInternal("default"); + cachedDefaultQvFileList = + (defaults != null) ? ImmutableList.copyOf(defaults) : ImmutableList. of(); + } + + private List getVersionFilesInternal(String tname) { + if (cachedQvFileList == null) { + return new ArrayList(); + } + int pos = Arrays.binarySearch(cachedQvFileList, tname, String.CASE_INSENSITIVE_ORDER); + if (pos >= 0) { + throw new BuildException("Unexpected file list element: " + cachedQvFileList[pos]); + } + List result = null; + for (pos = (-pos - 1); pos < cachedQvFileList.length; ++pos) { + String candidate = cachedQvFileList[pos]; + if (candidate.length() <= tname.length() + || !tname.equalsIgnoreCase(candidate.substring(0, tname.length())) + || !QV_SUFFIX.matcher(candidate.substring(tname.length())).matches()) { + break; + } + if (result == null) { + result = new ArrayList(); + } + result.add(candidate); + } + return result; + } +} diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java index ec61b34a6f..f69f938fc2 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java @@ -34,7 +34,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.hive.ql.QTestUtil.FsType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.FsType; /** * QOutProcessor: produces the final q.out from original q.out by postprocessing (e.g. masks) diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestArguments.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestArguments.java index 33412b2766..7c0e5524ec 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestArguments.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestArguments.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql; +import org.apache.hadoop.hive.ql.QTestMiniClusters.FsType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.QTestSetup; + /** * QTestArguments composite used as arguments holder for QTestUtil initialization. */ @@ -26,21 +29,21 @@ private String outDir; private String logDir; private String confDir; - private QTestUtil.MiniClusterType clusterType; + private QTestMiniClusters.MiniClusterType clusterType; private String initScript; private String cleanupScript; private boolean withLlapIo; - private QTestUtil.FsType fsType; - private QTestUtil.QTestSetup qtestSetup; + private FsType fsType; + private QTestSetup qtestSetup; private QTestArguments() { } - public QTestUtil.MiniClusterType getClusterType() { + public QTestMiniClusters.MiniClusterType getClusterType() { return clusterType; } - private void setClusterType(QTestUtil.MiniClusterType clusterType) { + private void setClusterType(QTestMiniClusters.MiniClusterType clusterType) { this.clusterType = clusterType; } @@ -92,19 +95,19 @@ private void setWithLlapIo(boolean withLlapIo) { this.withLlapIo = withLlapIo; } - public QTestUtil.FsType getFsType() { + public FsType getFsType() { return fsType; } - private void setFsType(QTestUtil.FsType fsType) { + private void setFsType(QTestMiniClusters.FsType fsType) { this.fsType = fsType; } - public QTestUtil.QTestSetup getQTestSetup() { + public QTestSetup getQTestSetup() { return qtestSetup; } - private void setQTestSetup(QTestUtil.QTestSetup qtestSetup) { + private void setQTestSetup(QTestSetup qtestSetup) { this.qtestSetup = qtestSetup; } @@ -116,12 +119,12 @@ private void setQTestSetup(QTestUtil.QTestSetup qtestSetup) { private String outDir; private String logDir; private String confDir; - private QTestUtil.MiniClusterType clusterType; + private QTestMiniClusters.MiniClusterType clusterType; private String initScript; private String cleanupScript; private boolean withLlapIo; - private QTestUtil.FsType fsType; - private QTestUtil.QTestSetup qtestSetup; + private FsType fsType; + private QTestSetup qtestSetup; private QTestArgumentsBuilder(){ } @@ -145,7 +148,7 @@ public QTestArgumentsBuilder withConfDir(String confDir) { return this; } - public QTestArgumentsBuilder withClusterType(QTestUtil.MiniClusterType clusterType) { + public QTestArgumentsBuilder withClusterType(QTestMiniClusters.MiniClusterType clusterType) { this.clusterType = clusterType; return this; } @@ -165,12 +168,12 @@ public QTestArgumentsBuilder withLlapIo(boolean withLlapIo) { return this; } - public QTestArgumentsBuilder withFsType(QTestUtil.FsType fsType) { + public QTestArgumentsBuilder withFsType(QTestMiniClusters.FsType fsType) { this.fsType = fsType; return this; } - public QTestArgumentsBuilder withQTestSetup(QTestUtil.QTestSetup qtestSetup) { + public QTestArgumentsBuilder withQTestSetup(QTestSetup qtestSetup) { this.qtestSetup = qtestSetup; return this; } @@ -189,7 +192,7 @@ public QTestArguments build() { fsType != null ? fsType : clusterType.getDefaultFsType()); testArguments.setQTestSetup( - qtestSetup != null ? qtestSetup : new QTestUtil.QTestSetup()); + qtestSetup != null ? qtestSetup : new QTestSetup()); return testArguments; } diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java new file mode 100644 index 0000000000..e2c7535890 --- /dev/null +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestMiniClusters.java @@ -0,0 +1,611 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.cli.control.AbstractCliConfig; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapItUtils; +import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; +import org.apache.hadoop.hive.ql.QTestMiniClusters.CoreClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.FsType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; +import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; +import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton; +import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim; +import org.apache.hive.druid.MiniDruidCluster; +import org.apache.hive.kafka.SingleNodeKafkaCluster; +import org.apache.hive.kafka.Wikipedia; +import org.apache.logging.log4j.util.Strings; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * QTestMiniClusters: decouples cluster details from QTestUtil (kafka/druid/spark/llap/tez/mr, file + * system) + */ +public class QTestMiniClusters { + private static final Logger LOG = LoggerFactory.getLogger("QTestMiniClusters"); + private static final SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss"); + + // security property names + private static final String SECURITY_KEY_PROVIDER_URI_NAME = "dfs.encryption.key.provider.uri"; + /** + * The default Erasure Coding Policy to use in Erasure Coding tests. + */ + public static final String DEFAULT_TEST_EC_POLICY = "RS-3-2-1024k"; + + private QTestSetup setup; + private QTestArguments testArgs; + private MiniClusterType clusterType; + + private HadoopShims shims; + private SparkSession sparkSession = null; + private FileSystem fs; + private HadoopShims.MiniMrShim mr = null; + private HadoopShims.MiniDFSShim dfs = null; + private HadoopShims.HdfsEncryptionShim hes = null; + private MiniLlapCluster llapCluster = null; + private MiniDruidCluster druidCluster = null; + private SingleNodeKafkaCluster kafkaCluster = null; + + public enum CoreClusterType { + MR, TEZ, SPARK + } + + public enum FsType { + local, hdfs, encrypted_hdfs, erasure_coded_hdfs, + } + + public enum MiniClusterType { + + mr(CoreClusterType.MR, FsType.hdfs), tez(CoreClusterType.TEZ, FsType.hdfs), tez_local( + CoreClusterType.TEZ, + FsType.local), spark(CoreClusterType.SPARK, FsType.local), miniSparkOnYarn( + CoreClusterType.SPARK, FsType.hdfs), llap(CoreClusterType.TEZ, FsType.hdfs), llap_local( + CoreClusterType.TEZ, FsType.local), none(CoreClusterType.MR, + FsType.local), druidLocal(CoreClusterType.TEZ, FsType.local), druid( + CoreClusterType.TEZ, FsType.hdfs), druidKafka(CoreClusterType.TEZ, + FsType.hdfs), kafka(CoreClusterType.TEZ, FsType.hdfs); + + private final CoreClusterType coreClusterType; + private final FsType defaultFsType; + + MiniClusterType(CoreClusterType coreClusterType, FsType defaultFsType) { + this.coreClusterType = coreClusterType; + this.defaultFsType = defaultFsType; + } + + public CoreClusterType getCoreClusterType() { + return coreClusterType; + } + + public FsType getDefaultFsType() { + return defaultFsType; + } + + public static MiniClusterType valueForString(String type) { + // Replace this with valueOf. + if (type.equals("miniMR")) { + return mr; + } else if (type.equals("tez")) { + return tez; + } else if (type.equals("tez_local")) { + return tez_local; + } else if (type.equals("spark")) { + return spark; + } else if (type.equals("miniSparkOnYarn")) { + return miniSparkOnYarn; + } else if (type.equals("llap")) { + return llap; + } else if (type.equals("llap_local")) { + return llap_local; + } else if (type.equals("druidLocal")) { + return druidLocal; + } else if (type.equals("druid")) { + return druid; + } else if (type.equals("druid-kafka")) { + return druidKafka; + } else if (type.equals("kafka")) { + return kafka; + } else { + return none; + } + } + } + + /** + * QTestSetup defines test fixtures which are reused across testcases, and are needed before any + * test can be run + */ + public static class QTestSetup { + private MiniZooKeeperCluster zooKeeperCluster = null; + private int zkPort; + private ZooKeeper zooKeeper; + + public QTestSetup() { + } + + public void preTest(HiveConf conf) throws Exception { + + if (zooKeeperCluster == null) { + // create temp dir + File tmpDir = Files + .createTempDirectory(Paths.get(QTestSystemProperties.getTempDir()), "tmp_").toFile(); + + zooKeeperCluster = new MiniZooKeeperCluster(); + zkPort = zooKeeperCluster.startup(tmpDir); + } + + if (zooKeeper != null) { + zooKeeper.close(); + } + + int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, + TimeUnit.MILLISECONDS); + zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, new Watcher() { + @Override + public void process(WatchedEvent arg0) { + } + }); + + String zkServer = "localhost"; + conf.set("hive.zookeeper.quorum", zkServer); + conf.set("hive.zookeeper.client.port", "" + zkPort); + } + + public void postTest(HiveConf conf) throws Exception { + if (zooKeeperCluster == null) { + return; + } + + if (zooKeeper != null) { + zooKeeper.close(); + } + + ZooKeeperHiveLockManager.releaseAllLocks(conf); + } + + public void tearDown() throws Exception { + CuratorFrameworkSingleton.closeAndReleaseInstance(); + + if (zooKeeperCluster != null) { + zooKeeperCluster.shutdown(); + zooKeeperCluster = null; + } + } + } + + public void setup(QTestArguments testArgs, HiveConf conf, String scriptsDir, + String logDir) throws Exception { + this.shims = ShimLoader.getHadoopShims(); + this.clusterType = testArgs.getClusterType(); + this.testArgs = testArgs; + + setupFileSystem(testArgs.getFsType(), conf); + + this.setup = testArgs.getQTestSetup(); + setup.preTest(conf); + + String uriString = fs.getUri().toString(); + + if (clusterType == MiniClusterType.druidKafka || clusterType == MiniClusterType.druidLocal + || clusterType == MiniClusterType.druid) { + final String tempDir = QTestSystemProperties.getTempDir(); + druidCluster = new MiniDruidCluster( + clusterType == MiniClusterType.druid ? "mini-druid" : "mini-druid-kafka", logDir, tempDir, + setup.zkPort, Utilities.jarFinderGetJar(MiniDruidCluster.class)); + final Path druidDeepStorage = fs.makeQualified(new Path(druidCluster.getDeepStorageDir())); + fs.mkdirs(druidDeepStorage); + final Path scratchDir = + fs.makeQualified(new Path(QTestSystemProperties.getTempDir(), "druidStagingDir")); + fs.mkdirs(scratchDir); + conf.set("hive.druid.working.directory", scratchDir.toUri().getPath()); + druidCluster.init(conf); + druidCluster.start(); + } + + if (clusterType == MiniClusterType.kafka || clusterType == MiniClusterType.druidKafka) { + kafkaCluster = + new SingleNodeKafkaCluster("kafka", QTestSystemProperties.getTempDir() + "/kafka-cluster", + setup.zkPort, clusterType == MiniClusterType.kafka ? 9093 : 9092); + kafkaCluster.init(conf); + kafkaCluster.start(); + kafkaCluster.createTopicWithData("test-topic", new File(scriptsDir, "kafka_init_data.json")); + kafkaCluster.createTopicWithData("wiki_kafka_csv", + new File(scriptsDir, "kafka_init_data.csv")); + kafkaCluster.createTopicWithData("wiki_kafka_avro_table", getAvroRows()); + } + + String confDir = testArgs.getConfDir(); + if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { + if (confDir != null && !confDir.isEmpty()) { + conf.addResource( + new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); + } + int numTrackers = 2; + if (EnumSet + .of(MiniClusterType.llap, MiniClusterType.llap_local, MiniClusterType.druidLocal, + MiniClusterType.druidKafka, MiniClusterType.druid, MiniClusterType.kafka) + .contains(clusterType)) { + llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, setup.zooKeeperCluster, confDir); + } + if (EnumSet + .of(MiniClusterType.llap_local, MiniClusterType.tez_local, MiniClusterType.druidLocal) + .contains(clusterType)) { + mr = shims.getLocalMiniTezCluster(conf, + clusterType == MiniClusterType.llap_local || clusterType == MiniClusterType.druidLocal); + } else { + mr = shims + .getMiniTezCluster(conf, numTrackers, uriString, + EnumSet + .of(MiniClusterType.llap, MiniClusterType.llap_local, + MiniClusterType.druidKafka, MiniClusterType.druid, MiniClusterType.kafka) + .contains(clusterType)); + } + } else if (clusterType == MiniClusterType.miniSparkOnYarn) { + mr = shims.getMiniSparkCluster(conf, 2, uriString, 1); + } else if (clusterType == MiniClusterType.mr) { + mr = shims.getMiniMrCluster(conf, 2, uriString, 1); + } + + if (testArgs.isWithLlapIo() && (clusterType == MiniClusterType.none)) { + LOG.info("initializing llap IO"); + LlapProxy.initializeLlapIo(conf); + } + } + + public void initConf(HiveConf conf) throws IOException { + if (mr != null) { + mr.setupConfiguration(conf); + + // TODO Ideally this should be done independent of whether mr is setup or not. + setFsRelatedProperties(conf, fs.getScheme().equals("file"), fs); + } + + if (llapCluster != null) { + Configuration clusterSpecificConf = llapCluster.getClusterSpecificConfiguration(); + for (Map.Entry confEntry : clusterSpecificConf) { + // Conf.get takes care of parameter replacement, iterator.value does not. + conf.set(confEntry.getKey(), clusterSpecificConf.get(confEntry.getKey())); + } + } + if (druidCluster != null) { + final Path druidDeepStorage = fs.makeQualified(new Path(druidCluster.getDeepStorageDir())); + fs.mkdirs(druidDeepStorage); + conf.set("hive.druid.storage.storageDirectory", druidDeepStorage.toUri().getPath()); + conf.set("hive.druid.metadata.db.type", "derby"); + conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); + conf.set("hive.druid.coordinator.address.default", druidCluster.getCoordinatorURI()); + conf.set("hive.druid.overlord.address.default", druidCluster.getOverlordURI()); + conf.set("hive.druid.broker.address.default", druidCluster.getBrokerURI()); + final Path scratchDir = + fs.makeQualified(new Path(QTestSystemProperties.getTempDir(), "druidStagingDir")); + fs.mkdirs(scratchDir); + conf.set("hive.druid.working.directory", scratchDir.toUri().getPath()); + } + + if (testArgs.isWithLlapIo() && (clusterType == MiniClusterType.none)) { + LOG.info("initializing llap IO"); + LlapProxy.initializeLlapIo(conf); + } + } + + public void postInit(HiveConf conf) { + createRemoteDirs(conf); + } + + public void preTest(HiveConf conf) throws Exception { + setup.preTest(conf); + } + + public void postTest(HiveConf conf) throws Exception { + setup.postTest(conf); + } + + public void restartSessions(boolean canReuseSession, CliSessionState ss, SessionState oldSs) + throws IOException { + if (oldSs != null && canReuseSession + && clusterType.getCoreClusterType() == CoreClusterType.TEZ) { + // Copy the tezSessionState from the old CliSessionState. + TezSessionState tezSessionState = oldSs.getTezSession(); + oldSs.setTezSession(null); + ss.setTezSession(tezSessionState); + oldSs.close(); + } + + if (oldSs != null && clusterType.getCoreClusterType() == CoreClusterType.SPARK) { + sparkSession = oldSs.getSparkSession(); + ss.setSparkSession(sparkSession); + oldSs.setSparkSession(null); + oldSs.close(); + } + } + + public void shutDown() throws Exception { + if (clusterType.getCoreClusterType() == CoreClusterType.TEZ + && SessionState.get().getTezSession() != null) { + SessionState.get().getTezSession().destroy(); + } + + if (druidCluster != null) { + druidCluster.stop(); + druidCluster = null; + } + + if (kafkaCluster != null) { + kafkaCluster.stop(); + kafkaCluster = null; + } + setup.tearDown(); + if (sparkSession != null) { + try { + SparkSessionManagerImpl.getInstance().closeSession(sparkSession); + } catch (Exception ex) { + LOG.error("Error closing spark session.", ex); + } finally { + sparkSession = null; + } + } + if (mr != null) { + mr.shutdown(); + mr = null; + } + FileSystem.closeAll(); + if (dfs != null) { + dfs.shutdown(); + dfs = null; + } + } + + public void setSparkSession(SparkSession sparkSession) { + this.sparkSession = sparkSession; + } + + public SparkSession getSparkSession() { + return sparkSession; + } + + public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() { + return hes; + } + + public HadoopShims.MiniMrShim getMr() { + return mr; + } + + public MiniClusterType getClusterType() { + return this.clusterType; + } + + /** + * Should deleted test tables have their data purged. + * + * @return true if data should be purged + */ + public boolean fsNeedsPurge(FsType type) { + if (type == FsType.encrypted_hdfs || type == FsType.erasure_coded_hdfs) { + return true; + } + return false; + } + + private void createRemoteDirs(HiveConf conf) { + // Create remote dirs once. + if (getMr() != null) { + assert fs != null; + Path warehousePath = fs.makeQualified(new Path(conf.getVar(ConfVars.METASTOREWAREHOUSE))); + assert warehousePath != null; + Path hiveJarPath = fs.makeQualified(new Path(conf.getVar(ConfVars.HIVE_JAR_DIRECTORY))); + assert hiveJarPath != null; + Path userInstallPath = + fs.makeQualified(new Path(conf.getVar(ConfVars.HIVE_USER_INSTALL_DIR))); + assert userInstallPath != null; + try { + fs.mkdirs(warehousePath); + } catch (IOException e) { + LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, + e.getMessage()); + } + try { + fs.mkdirs(hiveJarPath); + } catch (IOException e) { + LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, + e.getMessage()); + } + try { + fs.mkdirs(userInstallPath); + } catch (IOException e) { + LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, + e.getMessage()); + } + } + } + + private void setupFileSystem(FsType fsType, HiveConf conf) throws IOException { + if (fsType == FsType.local) { + fs = FileSystem.getLocal(conf); + } else if (fsType == FsType.hdfs || fsType == FsType.encrypted_hdfs + || fsType == FsType.erasure_coded_hdfs) { + int numDataNodes = 4; + + // Setup before getting dfs + switch (fsType) { + case encrypted_hdfs: + // Set the security key provider so that the MiniDFS cluster is initialized + // with encryption + conf.set(SECURITY_KEY_PROVIDER_URI_NAME, getKeyProviderURI()); + conf.setInt("fs.trash.interval", 50); + break; + case erasure_coded_hdfs: + // We need more NameNodes for EC. + // To fully exercise hdfs code paths we need 5 NameNodes for the RS-3-2-1024k policy. + // With 6 NameNodes we can also run the RS-6-3-1024k policy. + numDataNodes = 6; + break; + default: + break; + } + + dfs = shims.getMiniDfs(conf, numDataNodes, true, null); + fs = dfs.getFileSystem(); + + // Setup after getting dfs + switch (fsType) { + case encrypted_hdfs: + // set up the java key provider for encrypted hdfs cluster + hes = shims.createHdfsEncryptionShim(fs, conf); + LOG.info("key provider is initialized"); + break; + case erasure_coded_hdfs: + // The Erasure policy can't be set in a q_test_init script as QTestUtil runs that code in + // a mode that disallows test-only CommandProcessors. + // Set the default policy on the root of the file system here. + HdfsErasureCodingShim erasureCodingShim = shims.createHdfsErasureCodingShim(fs, conf); + erasureCodingShim.enableErasureCodingPolicy(DEFAULT_TEST_EC_POLICY); + erasureCodingShim.setErasureCodingPolicy(new Path("hdfs:///"), DEFAULT_TEST_EC_POLICY); + break; + default: + break; + } + } else { + throw new IllegalArgumentException("Unknown or unhandled fsType [" + fsType + "]"); + } + } + + private String getKeyProviderURI() { + // Use the target directory if it is not specified + String HIVE_ROOT = AbstractCliConfig.HIVE_ROOT; + String keyDir = HIVE_ROOT + "ql/target/"; + + // put the jks file in the current test path only for test purpose + return "jceks://file" + new Path(keyDir, "test.jks").toUri(); + } + + private static List getAvroRows() { + int numRows = 10; + List events; + final DatumWriter writer = new SpecificDatumWriter<>(Wikipedia.getClassSchema()); + events = IntStream.rangeClosed(0, numRows) + .mapToObj(i -> Wikipedia.newBuilder() + // 1534736225090 -> 08/19/2018 20:37:05 + .setTimestamp(formatter.format(new Timestamp(1534736225090L + 1000 * 3600 * i))) + .setAdded(i * 300).setDeleted(-i).setIsrobot(i % 2 == 0) + .setChannel("chanel number " + i).setComment("comment number " + i).setCommentlength(i) + .setDiffurl(String.format("url %s", i)).setFlags("flag").setIsminor(i % 2 > 0) + .setIsanonymous(i % 3 != 0).setNamespace("namespace") + .setIsunpatrolled(new Boolean(i % 3 == 0)).setIsnew(new Boolean(i % 2 > 0)) + .setPage(String.format("page is %s", i * 100)).setDelta(i).setDeltabucket(i * 100.4) + .setUser("test-user-" + i).build()) + .map(genericRecord -> { + java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + try { + writer.write(genericRecord, encoder); + encoder.flush(); + out.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return out.toByteArray(); + }).collect(Collectors.toList()); + return events; + } + + private void setFsRelatedProperties(HiveConf conf, boolean isLocalFs, FileSystem fs) { + String fsUriString = fs.getUri().toString(); + + // Different paths if running locally vs a remote fileSystem. Ideally this difference should not + // exist. + Path warehousePath; + Path jarPath; + Path userInstallPath; + if (isLocalFs) { + String buildDir = QTestSystemProperties.getBuildDir(); + Preconditions.checkState(Strings.isNotBlank(buildDir)); + Path path = new Path(fsUriString, buildDir); + + // Create a fake fs root for local fs + Path localFsRoot = new Path(path, "localfs"); + warehousePath = new Path(localFsRoot, "warehouse"); + jarPath = new Path(localFsRoot, "jar"); + userInstallPath = new Path(localFsRoot, "user_install"); + } else { + // TODO Why is this changed from the default in hive-conf? + warehousePath = new Path(fsUriString, "/build/ql/test/data/warehouse/"); + jarPath = new Path(new Path(fsUriString, "/user"), "hive"); + userInstallPath = new Path(fsUriString, "/user"); + } + + warehousePath = fs.makeQualified(warehousePath); + jarPath = fs.makeQualified(jarPath); + userInstallPath = fs.makeQualified(userInstallPath); + + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsUriString); + + // Remote dirs + conf.setVar(ConfVars.METASTOREWAREHOUSE, warehousePath.toString()); + conf.setVar(ConfVars.HIVE_JAR_DIRECTORY, jarPath.toString()); + conf.setVar(ConfVars.HIVE_USER_INSTALL_DIR, userInstallPath.toString()); + // ConfVars.SCRATCHDIR - {test.tmp.dir}/scratchdir + + // Local dirs + // ConfVars.LOCALSCRATCHDIR - {test.tmp.dir}/localscratchdir + + // TODO Make sure to cleanup created dirs. + } +} diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestResultProcessor.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestResultProcessor.java new file mode 100644 index 0000000000..b2164ba2bb --- /dev/null +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestResultProcessor.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.common.io.DigestPrintStream; +import org.apache.hadoop.hive.common.io.SessionStream; +import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream; +import org.apache.hadoop.hive.common.io.SortPrintStream; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.common.util.StreamPrinter; + +/** + * QTestResultProcessor: handles file-level q test result postprocessing: sort, diff (similar to + * QOutProcessor, but this works with files, and QOutProcessor is for text-processing within a qtest + * result file) + * + */ +public class QTestResultProcessor { + private static final Pattern SORT_BEFORE_DIFF = Pattern.compile("-- SORT_BEFORE_DIFF"); + private static final Pattern SORT_QUERY_RESULTS = Pattern.compile("-- SORT_QUERY_RESULTS"); + private static final Pattern HASH_QUERY_RESULTS = Pattern.compile("-- HASH_QUERY_RESULTS"); + private static final Pattern SORT_AND_HASH_QUERY_RESULTS = + Pattern.compile("-- SORT_AND_HASH_QUERY_RESULTS"); + private static final Pattern NO_SESSION_REUSE = Pattern.compile("-- NO_SESSION_REUSE"); + + private static final String SORT_SUFFIX = ".sorted"; + + private final Set qSortSet = new HashSet(); + private final Set qSortQuerySet = new HashSet(); + private final Set qHashQuerySet = new HashSet(); + private final Set qSortNHashQuerySet = new HashSet(); + private final Set qNoSessionReuseQuerySet = new HashSet(); + + public void add(File qf, String query) { + if (matches(SORT_BEFORE_DIFF, query)) { + qSortSet.add(qf.getName()); + } else if (matches(SORT_QUERY_RESULTS, query)) { + qSortQuerySet.add(qf.getName()); + } else if (matches(HASH_QUERY_RESULTS, query)) { + qHashQuerySet.add(qf.getName()); + } else if (matches(SORT_AND_HASH_QUERY_RESULTS, query)) { + qSortNHashQuerySet.add(qf.getName()); + } + + if (matches(NO_SESSION_REUSE, query)) { + qNoSessionReuseQuerySet.add(qf.getName()); + } + } + + private boolean matches(Pattern pattern, String query) { + Matcher matcher = pattern.matcher(query); + if (matcher.find()) { + return true; + } + return false; + } + + public boolean shouldSort(String fileName) { + return qSortSet.contains(fileName); + } + + public void setOutputs(CliSessionState ss, OutputStream fo, String fileName) throws Exception { + if (qSortQuerySet.contains(fileName)) { + ss.out = new SortPrintStream(fo, "UTF-8"); + } else if (qHashQuerySet.contains(fileName)) { + ss.out = new DigestPrintStream(fo, "UTF-8"); + } else if (qSortNHashQuerySet.contains(fileName)) { + ss.out = new SortAndDigestPrintStream(fo, "UTF-8"); + } else { + ss.out = new SessionStream(fo, true, "UTF-8"); + } + } + + public boolean shouldNotReuseSession(String fileName) { + return qNoSessionReuseQuerySet.contains(fileName); + } + + public QTestProcessExecResult executeDiffCommand(String inFileName, String outFileName, + boolean ignoreWhiteSpace, String tname) throws Exception { + + QTestProcessExecResult result; + + if (shouldSort(tname)) { + String inSorted = inFileName + SORT_SUFFIX; + String outSorted = outFileName + SORT_SUFFIX; + + sortResult(inFileName, outFileName, inSorted, outSorted); + + inFileName = inSorted; + outFileName = outSorted; + } + + ArrayList diffCommandArgs = new ArrayList(); + diffCommandArgs.add("diff"); + + // Text file comparison + diffCommandArgs.add("-a"); + + // Ignore changes in the amount of white space + if (ignoreWhiteSpace) { + diffCommandArgs.add("-b"); + } + + // Add files to compare to the arguments list + diffCommandArgs.add(getQuotedString(inFileName)); + diffCommandArgs.add(getQuotedString(outFileName)); + + result = executeCmd(diffCommandArgs); + + if (shouldSort(tname)) { + new File(inFileName).delete(); + new File(outFileName).delete(); + } + + return result; + } + + public void overwriteResults(String inFileName, String outFileName) throws Exception { + // This method can be replaced with Files.copy(source, target, REPLACE_EXISTING) + // once Hive uses JAVA 7. + System.out.println("Overwriting results " + inFileName + " to " + outFileName); + int result = executeCmd(new String[]{ + "cp", getQuotedString(inFileName), getQuotedString(outFileName) }).getReturnCode(); + if (result != 0) { + throw new IllegalStateException("Unexpected error while overwriting " + inFileName + " with " + outFileName); + } + } + + private void sortResult(String inFileName, String outFileName, String inSorted, String outSorted) + throws Exception { + // sort will try to open the output file in write mode on windows. We need to + // close it first. + SessionState ss = SessionState.get(); + if (ss != null && ss.out != null && ss.out != System.out) { + ss.out.close(); + } + + sortFiles(inFileName, inSorted); + sortFiles(outFileName, outSorted); + } + + private void sortFiles(String in, String out) throws Exception { + int result = + executeCmd(new String[] { "sort", getQuotedString(in), }, out, null).getReturnCode(); + if (result != 0) { + throw new IllegalStateException("Unexpected error while sorting " + in); + } + } + + private static QTestProcessExecResult executeCmd(Collection args) throws Exception { + return executeCmd(args, null, null); + } + + private static QTestProcessExecResult executeCmd(String[] args) throws Exception { + return executeCmd(args, null, null); + } + + private static QTestProcessExecResult executeCmd(Collection args, String outFile, + String errFile) throws Exception { + String[] cmdArray = args.toArray(new String[args.size()]); + return executeCmd(cmdArray, outFile, errFile); + } + + public static QTestProcessExecResult executeCmd(String[] args, String outFile, String errFile) + throws Exception { + System.out.println("Running: " + org.apache.commons.lang.StringUtils.join(args, ' ')); + + PrintStream out = outFile == null ? SessionState.getConsole().getChildOutStream() + : new PrintStream(new FileOutputStream(outFile), true, "UTF-8"); + PrintStream err = errFile == null ? SessionState.getConsole().getChildErrStream() + : new PrintStream(new FileOutputStream(errFile), true, "UTF-8"); + + Process executor = Runtime.getRuntime().exec(args); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + PrintStream str = new PrintStream(bos, true, "UTF-8"); + + StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, err); + StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, out, str); + + outPrinter.start(); + errPrinter.start(); + + int result = executor.waitFor(); + + outPrinter.join(); + errPrinter.join(); + + if (outFile != null) { + out.close(); + } + + if (errFile != null) { + err.close(); + } + + return QTestProcessExecResult.create(result, + new String(bos.toByteArray(), StandardCharsets.UTF_8)); + } + + private static String getQuotedString(String str) { + return str; + } +} diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestRunnerUtils.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestRunnerUtils.java new file mode 100644 index 0000000000..b5a4d5c1da --- /dev/null +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestRunnerUtils.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; + +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; +import org.apache.logging.log4j.util.Strings; + +public class QTestRunnerUtils { + public static final String DEFAULT_INIT_SCRIPT = "q_test_init.sql"; + public static final String DEFAULT_CLEANUP_SCRIPT = "q_test_cleanup.sql"; + + /** + * QTRunner: Runnable class for running a single query file. + **/ + public static class QTRunner implements Runnable { + private final QTestUtil qt; + private final File file; + + public QTRunner(QTestUtil qt, File file) { + this.qt = qt; + this.file = file; + } + + @Override + public void run() { + try { + qt.startSessionState(false); + // assumption is that environment has already been cleaned once globally + // hence each thread does not call cleanUp() and createSources() again + qt.cliInit(file); + qt.executeClient(file.getName()); + } catch (Throwable e) { + System.err + .println("Query file " + file.getName() + " failed with exception " + e.getMessage()); + e.printStackTrace(); + outputTestFailureHelpMessage(); + } + } + } + + /** + * Setup to execute a set of query files. Uses QTestUtil to do so. + * + * @param qfiles array of input query files containing arbitrary number of hive queries + * @param resDir output directory + * @param logDir log directory + * @return one QTestUtil for each query file + */ + public static QTestUtil[] queryListRunnerSetup(File[] qfiles, String resDir, String logDir, + String initScript, String cleanupScript) throws Exception { + QTestUtil[] qt = new QTestUtil[qfiles.length]; + for (int i = 0; i < qfiles.length; i++) { + + qt[i] = new QTestUtil(QTestArguments.QTestArgumentsBuilder.instance().withOutDir(resDir) + .withLogDir(logDir).withClusterType(MiniClusterType.none).withConfDir(null) + .withInitScript(initScript == null ? DEFAULT_INIT_SCRIPT : initScript) + .withCleanupScript(cleanupScript == null ? DEFAULT_CLEANUP_SCRIPT : cleanupScript) + .withLlapIo(false).build()); + + qt[i].addFile(qfiles[i], false); + qt[i].clearTestSideEffects(); + } + + return qt; + } + + /** + * Executes a set of query files in sequence. + * + * @param qfiles array of input query files containing arbitrary number of hive queries + * @param qt array of QTestUtils, one per qfile + * @return true if all queries passed, false otw + */ + public static boolean queryListRunnerSingleThreaded(File[] qfiles, QTestUtil[] qt) + throws Exception { + boolean failed = false; + qt[0].cleanUp(); + qt[0].createSources(); + for (int i = 0; i < qfiles.length && !failed; i++) { + qt[i].clearTestSideEffects(); + qt[i].cliInit(qfiles[i]); + qt[i].executeClient(qfiles[i].getName()); + QTestProcessExecResult result = qt[i].checkCliDriverResults(qfiles[i].getName()); + if (result.getReturnCode() != 0) { + failed = true; + StringBuilder builder = new StringBuilder(); + builder.append("Test ").append(qfiles[i].getName()) + .append(" results check failed with error code ").append(result.getReturnCode()); + if (Strings.isNotEmpty(result.getCapturedOutput())) { + builder.append(" and diff value ").append(result.getCapturedOutput()); + } + System.err.println(builder.toString()); + outputTestFailureHelpMessage(); + } + qt[i].clearPostTestEffects(); + } + return (!failed); + } + + /** + * Executes a set of query files parallel. + *

+ * Each query file is run in a separate thread. The caller has to arrange that different query + * files do not collide (in terms of destination tables) + * + * @param qfiles array of input query files containing arbitrary number of hive queries + * @param qt array of QTestUtils, one per qfile + * @return true if all queries passed, false otw + */ + public static boolean queryListRunnerMultiThreaded(File[] qfiles, QTestUtil[] qt) + throws Exception { + boolean failed = false; + + // in multithreaded mode - do cleanup/initialization just once + + qt[0].cleanUp(); + qt[0].createSources(); + qt[0].clearTestSideEffects(); + + QTRunner[] qtRunners = new QTRunner[qfiles.length]; + Thread[] qtThread = new Thread[qfiles.length]; + + for (int i = 0; i < qfiles.length; i++) { + qtRunners[i] = new QTRunner(qt[i], qfiles[i]); + qtThread[i] = new Thread(qtRunners[i]); + } + + for (int i = 0; i < qfiles.length; i++) { + qtThread[i].start(); + } + + for (int i = 0; i < qfiles.length; i++) { + qtThread[i].join(); + QTestProcessExecResult result = qt[i].checkCliDriverResults(qfiles[i].getName()); + if (result.getReturnCode() != 0) { + failed = true; + StringBuilder builder = new StringBuilder(); + builder.append("Test ").append(qfiles[i].getName()) + .append(" results check failed with error code ").append(result.getReturnCode()); + if (Strings.isNotEmpty(result.getCapturedOutput())) { + builder.append(" and diff value ").append(result.getCapturedOutput()); + } + System.err.println(builder.toString()); + outputTestFailureHelpMessage(); + } + } + return (!failed); + } + + public static void outputTestFailureHelpMessage() { + System.err.println(QTestUtil.DEBUG_HINT); + System.err.flush(); + } +} diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestSystemProperties.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestSystemProperties.java new file mode 100644 index 0000000000..e4adee9d7b --- /dev/null +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestSystemProperties.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.util.Set; + +public class QTestSystemProperties { + public static final String SYS_PROP_TMP_DIR_PROPERTY = "test.tmp.dir"; // typically target/tmp + private static final String SYS_PROP_SRC_TABLES_PROPERTY = "test.src.tables"; + private static final String SYS_PROP_OUTPUT_OVERWRITE = "test.output.overwrite"; + private static final String SYS_PROP_SRC_UDFS = "test.src.udfs"; + private static final String SYS_PROP_VECTORIZATION_ENABLED = "test.vectorization.enabled"; + private static final String SYS_PROP_BUILD_DIR = "build.dir"; // typically target + + public static String getTempDir() { + return System.getProperty(SYS_PROP_TMP_DIR_PROPERTY); + } + + public static String[] getSrcTables() { + return System.getProperty(SYS_PROP_SRC_TABLES_PROPERTY, "").trim().split(","); + } + + public static void setSrcTables(Set srcTables) { + System.setProperty(SYS_PROP_SRC_TABLES_PROPERTY, String.join(",", srcTables)); + } + + public static String[] getSourceUdfs(String defaultTestSrcUDFs) { + return System.getProperty(SYS_PROP_SRC_UDFS, defaultTestSrcUDFs).trim().split(","); + } + + public static String getBuildDir() { + return System.getProperty(SYS_PROP_BUILD_DIR); + } + + public static boolean isVectorizationEnabled() { + return isTrue(SYS_PROP_VECTORIZATION_ENABLED); + } + + public static boolean shouldOverwriteResults() { + return isTrue(SYS_PROP_OUTPUT_OVERWRITE); + } + + private static boolean isTrue(String propertyName) { + return "true".equals(System.getProperty(propertyName)); + } +} \ No newline at end of file diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 7538d18c51..7173ed502d 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -20,84 +20,45 @@ import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; -import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileWriter; -import java.io.FilenameFilter; import java.io.IOException; -import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.PrintStream; -import java.io.StringWriter; import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; import java.sql.SQLException; -import java.sql.Timestamp; -import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Deque; -import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.output.ByteArrayOutputStream; + +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hive.cli.CliDriver; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.cli.control.AbstractCliConfig; import org.apache.hadoop.hive.common.io.CachingPrintStream; -import org.apache.hadoop.hive.common.io.DigestPrintStream; import org.apache.hadoop.hive.common.io.SessionStream; -import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream; -import org.apache.hadoop.hive.common.io.SortPrintStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapItUtils; -import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; -import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.QTestMiniClusters.FsType; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; -import org.apache.hadoop.hive.ql.dataset.Dataset; -import org.apache.hadoop.hive.ql.dataset.DatasetCollection; -import org.apache.hadoop.hive.ql.dataset.DatasetParser; +import org.apache.hadoop.hive.ql.dataset.QTestDatasetHandler; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; -import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; -import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton; -import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; @@ -115,68 +76,31 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.processors.HiveCommand; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.HadoopShims; -import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hive.common.util.StreamPrinter; -import org.apache.hive.druid.MiniDruidCluster; -import org.apache.hive.kafka.SingleNodeKafkaCluster; -import org.apache.hive.kafka.Wikipedia; -import org.apache.logging.log4j.util.Strings; -import org.apache.tools.ant.BuildException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; /** * QTestUtil. */ public class QTestUtil { - private static final Logger LOG = LoggerFactory.getLogger("QTestUtil"); - private static final String UTF_8 = "UTF-8"; - - // security property names - private static final String SECURITY_KEY_PROVIDER_URI_NAME = "dfs.encryption.key.provider.uri"; - private static final String CRLF = System.getProperty("line.separator"); public static final String QTEST_LEAVE_FILES = "QTEST_LEAVE_FILES"; - private final static String defaultInitScript = "q_test_init.sql"; - private final static String defaultCleanupScript = "q_test_cleanup.sql"; - private static SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss"); private final String[] testOnlyCommands = new String[]{ "crypto", "erasure" }; - - public static final String TEST_TMP_DIR_PROPERTY = "test.tmp.dir"; // typically target/tmp - private static final String BUILD_DIR_PROPERTY = "build.dir"; // typically target - private static final String TEST_SRC_TABLES_PROPERTY = "test.src.tables"; - - /** - * The default Erasure Coding Policy to use in Erasure Coding tests. - */ - public static final String DEFAULT_TEST_EC_POLICY = "RS-3-2-1024k"; + public static String DEBUG_HINT = + "\nSee ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, " + + "or check ./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific test cases logs."; private String testWarehouse; @Deprecated private final String testFiles; - private final File datasetDir; private final String outDir; protected final String logDir; - private final TreeMap qMap; - private final Set qSortSet; - private final Set qSortQuerySet; - private final Set qHashQuerySet; - private final Set qSortNHashQuerySet; - private final Set qNoSessionReuseQuerySet; - private static final String SORT_SUFFIX = ".sorted"; - private static Set srcTables; + private final TreeMap qMap = new TreeMap(); private final Set srcUDFs; - private final MiniClusterType clusterType; private final FsType fsType; private ParseDriver pd; protected Hive db; @@ -185,61 +109,17 @@ protected HiveConf savedConf; private IDriver drv; private BaseSemanticAnalyzer sem; - private final boolean overWrite; private CliDriver cliDriver; - private HadoopShims.MiniMrShim mr = null; - private HadoopShims.MiniDFSShim dfs = null; - private FileSystem fs; - private HadoopShims.HdfsEncryptionShim hes = null; - private MiniLlapCluster llapCluster = null; - private final QTestSetup setup; - private SparkSession sparkSession = null; - private boolean isSessionStateStarted = false; - private QOutProcessor qOutProcessor; + private final QTestMiniClusters miniClusters = new QTestMiniClusters(); + private final QOutProcessor qOutProcessor; + private static QTestResultProcessor qTestResultProcessor = new QTestResultProcessor(); + protected QTestDatasetHandler datasetHandler; private final String initScript; private final String cleanupScript; - private MiniDruidCluster druidCluster; - private SingleNodeKafkaCluster kafkaCluster; - - public static Set getSrcTables() { - if (srcTables == null) { - initSrcTables(); - } - return srcTables; - } - - public static void addSrcTable(String table) { - getSrcTables().add(table); - storeSrcTables(); - } - - public static Set initSrcTables() { - if (srcTables == null) { - initSrcTablesFromSystemProperty(); - storeSrcTables(); - } - - return srcTables; - } - - private static void storeSrcTables() { - System.setProperty(TEST_SRC_TABLES_PROPERTY, String.join(",", srcTables)); - } - - private static void initSrcTablesFromSystemProperty() { - srcTables = new HashSet(); - // FIXME: moved default value to here...for now - // i think this features is never really used from the command line - for (String srcTable : System.getProperty(TEST_SRC_TABLES_PROPERTY, "").trim().split(",")) { - srcTable = srcTable.trim(); - if (!srcTable.isEmpty()) { - srcTables.add(srcTable); - } - } - } + private boolean isSessionStateStarted = false; - private CliDriver getCliDriver() { + protected CliDriver getCliDriver() { if (cliDriver == null) { throw new RuntimeException("no clidriver"); } @@ -256,7 +136,7 @@ private CliDriver getCliDriver() { // FIXME: moved default value to here...for now // i think this features is never really used from the command line String defaultTestSrcUDFs = "qtest_get_java_boolean"; - for (String srcUDF : System.getProperty("test.src.udfs", defaultTestSrcUDFs).trim().split(",")) { + for (String srcUDF : QTestSystemProperties.getSourceUdfs(defaultTestSrcUDFs)) { srcUDF = srcUDF.trim(); if (!srcUDF.isEmpty()) { srcUDFs.add(srcUDF); @@ -273,181 +153,14 @@ public HiveConf getConf() { } public void initConf() throws Exception { - - String vectorizationEnabled = System.getProperty("test.vectorization.enabled"); - if (vectorizationEnabled != null && vectorizationEnabled.equalsIgnoreCase("true")) { + if (QTestSystemProperties.isVectorizationEnabled()) { conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, true); } // Plug verifying metastore in for testing DirectSQL. conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); - if (mr != null) { - mr.setupConfiguration(conf); - - // TODO Ideally this should be done independent of whether mr is setup or not. - setFsRelatedProperties(conf, fs.getScheme().equals("file"), fs); - } - - if (llapCluster != null) { - Configuration clusterSpecificConf = llapCluster.getClusterSpecificConfiguration(); - for (Map.Entry confEntry : clusterSpecificConf) { - // Conf.get takes care of parameter replacement, iterator.value does not. - conf.set(confEntry.getKey(), clusterSpecificConf.get(confEntry.getKey())); - } - } - if (druidCluster != null) { - final Path druidDeepStorage = fs.makeQualified(new Path(druidCluster.getDeepStorageDir())); - fs.mkdirs(druidDeepStorage); - conf.set("hive.druid.storage.storageDirectory", druidDeepStorage.toUri().getPath()); - conf.set("hive.druid.metadata.db.type", "derby"); - conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); - conf.set("hive.druid.coordinator.address.default", druidCluster.getCoordinatorURI()); - conf.set("hive.druid.overlord.address.default", druidCluster.getOverlordURI()); - conf.set("hive.druid.broker.address.default", druidCluster.getBrokerURI()); - final Path scratchDir = fs.makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); - fs.mkdirs(scratchDir); - conf.set("hive.druid.working.directory", scratchDir.toUri().getPath()); - } - } - - private void setFsRelatedProperties(HiveConf conf, boolean isLocalFs, FileSystem fs) { - String fsUriString = fs.getUri().toString(); - - // Different paths if running locally vs a remote fileSystem. Ideally this difference should not exist. - Path warehousePath; - Path jarPath; - Path userInstallPath; - if (isLocalFs) { - String buildDir = System.getProperty(BUILD_DIR_PROPERTY); - Preconditions.checkState(Strings.isNotBlank(buildDir)); - Path path = new Path(fsUriString, buildDir); - - // Create a fake fs root for local fs - Path localFsRoot = new Path(path, "localfs"); - warehousePath = new Path(localFsRoot, "warehouse"); - jarPath = new Path(localFsRoot, "jar"); - userInstallPath = new Path(localFsRoot, "user_install"); - } else { - // TODO Why is this changed from the default in hive-conf? - warehousePath = new Path(fsUriString, "/build/ql/test/data/warehouse/"); - jarPath = new Path(new Path(fsUriString, "/user"), "hive"); - userInstallPath = new Path(fsUriString, "/user"); - } - - warehousePath = fs.makeQualified(warehousePath); - jarPath = fs.makeQualified(jarPath); - userInstallPath = fs.makeQualified(userInstallPath); - - conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, fsUriString); - - // Remote dirs - conf.setVar(ConfVars.METASTOREWAREHOUSE, warehousePath.toString()); - conf.setVar(ConfVars.HIVE_JAR_DIRECTORY, jarPath.toString()); - conf.setVar(ConfVars.HIVE_USER_INSTALL_DIR, userInstallPath.toString()); - // ConfVars.SCRATCHDIR - {test.tmp.dir}/scratchdir - - // Local dirs - // ConfVars.LOCALSCRATCHDIR - {test.tmp.dir}/localscratchdir - - // TODO Make sure to cleanup created dirs. - } - - private void createRemoteDirs() { - assert fs != null; - Path warehousePath = fs.makeQualified(new Path(conf.getVar(ConfVars.METASTOREWAREHOUSE))); - assert warehousePath != null; - Path hiveJarPath = fs.makeQualified(new Path(conf.getVar(ConfVars.HIVE_JAR_DIRECTORY))); - assert hiveJarPath != null; - Path userInstallPath = fs.makeQualified(new Path(conf.getVar(ConfVars.HIVE_USER_INSTALL_DIR))); - assert userInstallPath != null; - try { - fs.mkdirs(warehousePath); - } catch (IOException e) { - LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, e.getMessage()); - } - try { - fs.mkdirs(hiveJarPath); - } catch (IOException e) { - LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, e.getMessage()); - } - try { - fs.mkdirs(userInstallPath); - } catch (IOException e) { - LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, e.getMessage()); - } - } - - private enum CoreClusterType { - MR, TEZ, SPARK - } - - public enum FsType { - local, hdfs, encrypted_hdfs, erasure_coded_hdfs, - } - - public enum MiniClusterType { - - mr(CoreClusterType.MR, FsType.hdfs), tez(CoreClusterType.TEZ, FsType.hdfs), tez_local(CoreClusterType.TEZ, - FsType.local), spark(CoreClusterType.SPARK, FsType.local), miniSparkOnYarn(CoreClusterType.SPARK, - FsType.hdfs), llap(CoreClusterType.TEZ, FsType.hdfs), llap_local(CoreClusterType.TEZ, FsType.local), none( - CoreClusterType.MR, - FsType.local), druidLocal(CoreClusterType.TEZ, FsType.local), druid(CoreClusterType.TEZ, - FsType.hdfs), druidKafka(CoreClusterType.TEZ, FsType.hdfs), kafka(CoreClusterType.TEZ, FsType.hdfs); - - private final CoreClusterType coreClusterType; - private final FsType defaultFsType; - - MiniClusterType(CoreClusterType coreClusterType, FsType defaultFsType) { - this.coreClusterType = coreClusterType; - this.defaultFsType = defaultFsType; - } - - public CoreClusterType getCoreClusterType() { - return coreClusterType; - } - - public FsType getDefaultFsType() { - return defaultFsType; - } - - public static MiniClusterType valueForString(String type) { - // Replace this with valueOf. - if (type.equals("miniMR")) { - return mr; - } else if (type.equals("tez")) { - return tez; - } else if (type.equals("tez_local")) { - return tez_local; - } else if (type.equals("spark")) { - return spark; - } else if (type.equals("miniSparkOnYarn")) { - return miniSparkOnYarn; - } else if (type.equals("llap")) { - return llap; - } else if (type.equals("llap_local")) { - return llap_local; - } else if (type.equals("druidLocal")) { - return druidLocal; - } else if (type.equals("druid")) { - return druid; - } else if (type.equals("druid-kafka")) { - return druidKafka; - } else if (type.equals("kafka")) { - return kafka; - } else { - return none; - } - } - } - - private String getKeyProviderURI() { - // Use the target directory if it is not specified - String HIVE_ROOT = AbstractCliConfig.HIVE_ROOT; - String keyDir = HIVE_ROOT + "ql/target/"; - - // put the jks file in the current test path only for test purpose - return "jceks://file" + new Path(keyDir, "test.jks").toUri(); + miniClusters.initConf(conf); } public QTestUtil(QTestArguments testArgs) throws Exception { @@ -481,58 +194,24 @@ public QTestUtil(QTestArguments testArgs) throws Exception { queryState = new QueryState.Builder().withHiveConf(new HiveConf(IDriver.class)).build(); conf = queryState.getConf(); - qMap = new TreeMap(); - qSortSet = new HashSet(); - qSortQuerySet = new HashSet(); - qHashQuerySet = new HashSet(); - qSortNHashQuerySet = new HashSet(); - qNoSessionReuseQuerySet = new HashSet(); - this.clusterType = testArgs.getClusterType(); - - HadoopShims shims = ShimLoader.getHadoopShims(); - setupFileSystem(shims); - setup = testArgs.getQTestSetup(); - setup.preTest(conf); - - setupMiniCluster(shims, testArgs.getConfDir()); + this.miniClusters.setup(testArgs, conf, getScriptsDir(), logDir); initConf(); - if (testArgs.isWithLlapIo() && (clusterType == MiniClusterType.none)) { - LOG.info("initializing llap IO"); - LlapProxy.initializeLlapIo(conf); - } - - // Use the current directory if it is not specified - String dataDir = conf.get("test.data.files"); - if (dataDir == null) { - dataDir = new File(".").getAbsolutePath() + "/data/files"; - } - testFiles = dataDir; - conf.set("test.data.dir", dataDir); - - // Use path relative to dataDir directory if it is not specified - datasetDir = - conf.get("test.data.set.files") == null ? - new File(new File(dataDir).getAbsolutePath() + "/datasets") : - new File(conf.get("test.data.set.files")); + datasetHandler = new QTestDatasetHandler(this, conf); + testFiles = datasetHandler.getDataDir(conf); + conf.set("test.data.dir", datasetHandler.getDataDir(conf)); String scriptsDir = getScriptsDir(); this.initScript = scriptsDir + File.separator + testArgs.getInitScript(); this.cleanupScript = scriptsDir + File.separator + testArgs.getCleanupScript(); - overWrite = shouldOverwriteResults(); - - init(); + postInit(); savedConf = new HiveConf(conf); } - private boolean shouldOverwriteResults() { - return "true".equalsIgnoreCase(System.getProperty("test.output.overwrite")); - } - private String getScriptsDir() { // Use the current directory if it is not specified String scriptsDir = conf.get("test.data.scripts"); @@ -542,277 +221,45 @@ private String getScriptsDir() { return scriptsDir; } - private void setupFileSystem(HadoopShims shims) throws IOException { - - if (fsType == FsType.local) { - fs = FileSystem.getLocal(conf); - } else if (fsType == FsType.hdfs || fsType == FsType.encrypted_hdfs || fsType == FsType.erasure_coded_hdfs) { - int numDataNodes = 4; - - // Setup before getting dfs - switch (fsType) { - case encrypted_hdfs: - // Set the security key provider so that the MiniDFS cluster is initialized - // with encryption - conf.set(SECURITY_KEY_PROVIDER_URI_NAME, getKeyProviderURI()); - conf.setInt("fs.trash.interval", 50); - break; - case erasure_coded_hdfs: - // We need more NameNodes for EC. - // To fully exercise hdfs code paths we need 5 NameNodes for the RS-3-2-1024k policy. - // With 6 NameNodes we can also run the RS-6-3-1024k policy. - numDataNodes = 6; - break; - default: - break; - } - - dfs = shims.getMiniDfs(conf, numDataNodes, true, null); - fs = dfs.getFileSystem(); - - // Setup after getting dfs - switch (fsType) { - case encrypted_hdfs: - // set up the java key provider for encrypted hdfs cluster - hes = shims.createHdfsEncryptionShim(fs, conf); - LOG.info("key provider is initialized"); - break; - case erasure_coded_hdfs: - // The Erasure policy can't be set in a q_test_init script as QTestUtil runs that code in - // a mode that disallows test-only CommandProcessors. - // Set the default policy on the root of the file system here. - HdfsErasureCodingShim erasureCodingShim = shims.createHdfsErasureCodingShim(fs, conf); - erasureCodingShim.enableErasureCodingPolicy(DEFAULT_TEST_EC_POLICY); - erasureCodingShim.setErasureCodingPolicy(new Path("hdfs:///"), DEFAULT_TEST_EC_POLICY); - break; - default: - break; - } - } else { - throw new IllegalArgumentException("Unknown or unhandled fsType [" + fsType + "]"); - } - } - - private void setupMiniCluster(HadoopShims shims, String confDir) throws IOException { - - String uriString = fs.getUri().toString(); - - if (clusterType == MiniClusterType.druidKafka - || clusterType == MiniClusterType.druidLocal - || clusterType == MiniClusterType.druid) { - final String tempDir = System.getProperty("test.tmp.dir"); - druidCluster = - new MiniDruidCluster(clusterType == MiniClusterType.druid ? "mini-druid" : "mini-druid-kafka", - logDir, - tempDir, - setup.zkPort, - Utilities.jarFinderGetJar(MiniDruidCluster.class)); - final Path druidDeepStorage = fs.makeQualified(new Path(druidCluster.getDeepStorageDir())); - fs.mkdirs(druidDeepStorage); - final Path scratchDir = fs.makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); - fs.mkdirs(scratchDir); - conf.set("hive.druid.working.directory", scratchDir.toUri().getPath()); - druidCluster.init(conf); - druidCluster.start(); - } - - if (clusterType == MiniClusterType.kafka || clusterType == MiniClusterType.druidKafka) { - kafkaCluster = - new SingleNodeKafkaCluster("kafka", - System.getProperty("test.tmp.dir") + "/kafka-cluster", - setup.zkPort, - clusterType == MiniClusterType.kafka ? 9093 : 9092); - kafkaCluster.init(conf); - kafkaCluster.start(); - kafkaCluster.createTopicWithData("test-topic", new File(getScriptsDir(), "kafka_init_data.json")); - kafkaCluster.createTopicWithData("wiki_kafka_csv", new File(getScriptsDir(), "kafka_init_data.csv")); - kafkaCluster.createTopicWithData("wiki_kafka_avro_table", getAvroRows()); - } - - if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { - if (confDir != null && !confDir.isEmpty()) { - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); - } - int numTrackers = 2; - if (EnumSet.of(MiniClusterType.llap, - MiniClusterType.llap_local, - MiniClusterType.druidLocal, - MiniClusterType.druidKafka, - MiniClusterType.druid, - MiniClusterType.kafka).contains(clusterType)) { - llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, setup.zooKeeperCluster, confDir); - } - if (EnumSet.of(MiniClusterType.llap_local, MiniClusterType.tez_local, MiniClusterType.druidLocal) - .contains(clusterType)) { - mr = - shims.getLocalMiniTezCluster(conf, - clusterType == MiniClusterType.llap_local || clusterType == MiniClusterType.druidLocal); - } else { - mr = - shims.getMiniTezCluster(conf, - numTrackers, - uriString, - EnumSet.of(MiniClusterType.llap, - MiniClusterType.llap_local, - MiniClusterType.druidKafka, - MiniClusterType.druid, - MiniClusterType.kafka).contains(clusterType)); - } - } else if (clusterType == MiniClusterType.miniSparkOnYarn) { - mr = shims.getMiniSparkCluster(conf, 2, uriString, 1); - } else if (clusterType == MiniClusterType.mr) { - mr = shims.getMiniMrCluster(conf, 2, uriString, 1); - } - } - - private static List getAvroRows() { - int numRows = 10; - List events; - final DatumWriter writer = new SpecificDatumWriter<>(Wikipedia.getClassSchema()); - events = IntStream.rangeClosed(0, numRows).mapToObj(i -> Wikipedia.newBuilder() - // 1534736225090 -> 08/19/2018 20:37:05 - .setTimestamp(formatter.format(new Timestamp(1534736225090L + 1000 * 3600 * i))) - .setAdded(i * 300) - .setDeleted(-i) - .setIsrobot(i % 2 == 0) - .setChannel("chanel number " + i) - .setComment("comment number " + i) - .setCommentlength(i) - .setDiffurl(String.format("url %s", i)) - .setFlags("flag") - .setIsminor(i % 2 > 0) - .setIsanonymous(i % 3 != 0) - .setNamespace("namespace") - .setIsunpatrolled(new Boolean(i % 3 == 0)) - .setIsnew(new Boolean(i % 2 > 0)) - .setPage(String.format("page is %s", i * 100)) - .setDelta(i) - .setDeltabucket(i * 100.4) - .setUser("test-user-" + i) - .build()).map(genericRecord -> { - java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); - try { - writer.write(genericRecord, encoder); - encoder.flush(); - out.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - return out.toByteArray(); - }).collect(Collectors.toList()); - return events; - } - public void shutdown() throws Exception { if (System.getenv(QTEST_LEAVE_FILES) == null) { cleanUp(); } - if (clusterType.getCoreClusterType() == CoreClusterType.TEZ && SessionState.get().getTezSession() != null) { - SessionState.get().getTezSession().destroy(); - } - if (druidCluster != null) { - druidCluster.stop(); - druidCluster = null; - } + miniClusters.shutDown(); - if (kafkaCluster != null) { - kafkaCluster.stop(); - kafkaCluster = null; - } - setup.tearDown(); - if (sparkSession != null) { - try { - SparkSessionManagerImpl.getInstance().closeSession(sparkSession); - } catch (Exception ex) { - LOG.error("Error closing spark session.", ex); - } finally { - sparkSession = null; - } - } - if (mr != null) { - mr.shutdown(); - mr = null; - } - FileSystem.closeAll(); - if (dfs != null) { - dfs.shutdown(); - dfs = null; - } Hive.closeCurrent(); } - public String readEntireFileIntoString(File queryFile) throws IOException { - InputStreamReader - isr = - new InputStreamReader(new BufferedInputStream(new FileInputStream(queryFile)), QTestUtil.UTF_8); - StringWriter sw = new StringWriter(); - try { - IOUtils.copy(isr, sw); - } finally { - if (isr != null) { - isr.close(); - } - } - return sw.toString(); - } - public void addFile(String queryFile) throws IOException { addFile(new File(queryFile), false); } public void addFile(File qf, boolean partial) throws IOException { - String query = readEntireFileIntoString(qf); + String query = FileUtils.readFileToString(qf); qMap.put(qf.getName(), query); if (partial) { return; } - if (matches(SORT_BEFORE_DIFF, query)) { - qSortSet.add(qf.getName()); - } else if (matches(SORT_QUERY_RESULTS, query)) { - qSortQuerySet.add(qf.getName()); - } else if (matches(HASH_QUERY_RESULTS, query)) { - qHashQuerySet.add(qf.getName()); - } else if (matches(SORT_AND_HASH_QUERY_RESULTS, query)) { - qSortNHashQuerySet.add(qf.getName()); - } - if (matches(NO_SESSION_REUSE, query)) { - qNoSessionReuseQuerySet.add(qf.getName()); - } - + qTestResultProcessor.add(qf, query); qOutProcessor.initMasks(qf, query); } - private static final Pattern SORT_BEFORE_DIFF = Pattern.compile("-- SORT_BEFORE_DIFF"); - private static final Pattern SORT_QUERY_RESULTS = Pattern.compile("-- SORT_QUERY_RESULTS"); - private static final Pattern HASH_QUERY_RESULTS = Pattern.compile("-- HASH_QUERY_RESULTS"); - private static final Pattern SORT_AND_HASH_QUERY_RESULTS = Pattern.compile("-- SORT_AND_HASH_QUERY_RESULTS"); - private static final Pattern NO_SESSION_REUSE = Pattern.compile("-- NO_SESSION_REUSE"); - - private boolean matches(Pattern pattern, String query) { - Matcher matcher = pattern.matcher(query); - if (matcher.find()) { - return true; - } - return false; - } - /** * Clear out any side effects of running tests */ public void clearPostTestEffects() throws Exception { - setup.postTest(conf); + miniClusters.postTest(conf); } public void clearKeysCreatedInTests() { - if (hes == null) { + if (miniClusters.getHdfsEncryptionShim() == null) { return; } try { - for (String keyAlias : hes.getKeys()) { - hes.deleteKey(keyAlias); + for (String keyAlias : miniClusters.getHdfsEncryptionShim().getKeys()) { + miniClusters.getHdfsEncryptionShim().deleteKey(keyAlias); } } catch (IOException e) { LOG.error("Fail to clean the keys created in test due to the error", e); @@ -867,15 +314,14 @@ public void clearTablesCreatedDuringTests() throws Exception { for (String dbName : db.getAllDatabases()) { SessionState.get().setCurrentDatabase(dbName); for (String tblName : db.getAllTables()) { - if (!DEFAULT_DATABASE_NAME.equals(dbName) || !getSrcTables().contains(tblName)) { - Table tblObj = null; + if (!DEFAULT_DATABASE_NAME.equals(dbName) || !QTestDatasetHandler.isSourceTable(tblName)) { try { - tblObj = db.getTable(tblName); + db.getTable(tblName); } catch (InvalidTableException e) { LOG.warn("Trying to drop table " + e.getTableName() + ". But it does not exist."); continue; } - db.dropTable(dbName, tblName, true, true, fsNeedsPurge(fsType)); + db.dropTable(dbName, tblName, true, true, miniClusters.fsNeedsPurge(fsType)); } } if (!DEFAULT_DATABASE_NAME.equals(dbName)) { @@ -890,7 +336,7 @@ public void clearTablesCreatedDuringTests() throws Exception { FileSystem fileSystem = p.getFileSystem(conf); if (fileSystem.exists(p)) { for (FileStatus status : fileSystem.listStatus(p)) { - if (status.isDirectory() && !getSrcTables().contains(status.getPath().getName())) { + if (status.isDirectory() && !QTestDatasetHandler.isSourceTable(status.getPath().getName())) { fileSystem.delete(status.getPath(), true); } } @@ -929,7 +375,7 @@ public void newSession(boolean canReuseSession) throws Exception { ss.in = System.in; SessionState oldSs = SessionState.get(); - restartSessions(canReuseSession, ss, oldSs); + miniClusters.restartSessions(canReuseSession, ss, oldSs); closeSession(oldSs); SessionState.start(ss); @@ -964,7 +410,7 @@ public void clearTestSideEffects() throws Exception { } protected void initConfFromSetup() throws Exception { - setup.preTest(conf); + miniClusters.preTest(conf); } public void cleanUp() throws Exception { @@ -972,7 +418,7 @@ public void cleanUp() throws Exception { } public void cleanUp(String fileName) throws Exception { - boolean canReuseSession = (fileName == null) || !qNoSessionReuseQuerySet.contains(fileName); + boolean canReuseSession = (fileName == null) || !qTestResultProcessor.shouldNotReuseSession(fileName); if (!isSessionStateStarted) { startSessionState(canReuseSession); } @@ -1009,7 +455,7 @@ public void cleanUp(String fileName) throws Exception { private void cleanupFromFile() throws IOException { File cleanupFile = new File(cleanupScript); if (cleanupFile.isFile()) { - String cleanupCommands = readEntireFileIntoString(cleanupFile); + String cleanupCommands = FileUtils.readFileToString(cleanupFile); LOG.info("Cleanup (" + cleanupScript + "):\n" + cleanupCommands); int result = getCliDriver().processLine(cleanupCommands); @@ -1027,7 +473,7 @@ public void createSources() throws Exception { } public void createSources(String fileName) throws Exception { - boolean canReuseSession = (fileName == null) || !qNoSessionReuseQuerySet.contains(fileName); + boolean canReuseSession = (fileName == null) || !qTestResultProcessor.shouldNotReuseSession(fileName); if (!isSessionStateStarted) { startSessionState(canReuseSession); } @@ -1048,7 +494,7 @@ private void initFromScript() throws IOException { return; } - String initCommands = readEntireFileIntoString(scriptFile); + String initCommands = FileUtils.readFileToString(scriptFile); LOG.info("Initial setup (" + initScript + "):\n" + initCommands); int result = cliDriver.processLine(initCommands); @@ -1058,51 +504,8 @@ private void initFromScript() throws IOException { } } - private void initDataSetForTest(File file) throws Exception { - synchronized (QTestUtil.class) { - DatasetParser parser = new DatasetParser(); - parser.parse(file); - - DatasetCollection datasets = parser.getDatasets(); - - Set missingDatasets = datasets.getTables(); - missingDatasets.removeAll(getSrcTables()); - if (missingDatasets.isEmpty()) { - return; - } - newSession(true); - for (String table : missingDatasets) { - initDataset(table); - } - newSession(true); - } - } - - protected void initDataset(String table) throws Exception { - - File tableFile = new File(new File(datasetDir, table), Dataset.INIT_FILE_NAME); - String commands = null; - try { - commands = readEntireFileIntoString(tableFile); - } catch (IOException e) { - throw new RuntimeException(String.format("dataset file not found %s", tableFile), e); - } - - int result = getCliDriver().processLine(commands); - LOG.info("Result from cliDrriver.processLine in initFromDatasets=" + result); - if (result != 0) { - Assert.fail("Failed during initFromDatasets processLine with code=" + result); - } - - addSrcTable(table); - } - - public void init() throws Exception { - - // Create remote dirs once. - if (mr != null) { - createRemoteDirs(); - } + public void postInit() throws Exception { + miniClusters.postInit(conf); testWarehouse = conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); String execEngine = conf.get("hive.execution.engine"); @@ -1110,14 +513,20 @@ public void init() throws Exception { SessionState.start(conf); conf.set("hive.execution.engine", execEngine); db = Hive.get(conf); + // Create views registry + initMaterializedViews(); + + drv = DriverFactory.newDriver(conf); + pd = new ParseDriver(); + sem = new SemanticAnalyzer(queryState); + } + + private void initMaterializedViews() { String registryImpl = db.getConf().get("hive.server2.materializedviews.registry.impl"); db.getConf().set("hive.server2.materializedviews.registry.impl", "DUMMY"); HiveMaterializedViewsRegistry.get().init(db); db.getConf().set("hive.server2.materializedviews.registry.impl", registryImpl); - drv = DriverFactory.newDriver(conf); - pd = new ParseDriver(); - sem = new SemanticAnalyzer(queryState); } public void init(String fileName) throws Exception { @@ -1129,9 +538,9 @@ public void init(String fileName) throws Exception { public String cliInit(File file) throws Exception { String fileName = file.getName(); - initDataSetForTest(file); + datasetHandler.initDataSetForTest(file, getCliDriver()); - if (qNoSessionReuseQuerySet.contains(fileName)) { + if (qTestResultProcessor.shouldNotReuseSession(fileName)) { newSession(false); } @@ -1166,37 +575,14 @@ private void setSessionOutputs(String fileName, CliSessionState ss, File outf) t if (ss.err != null) { ss.out.flush(); } - if (qSortQuerySet.contains(fileName)) { - ss.out = new SortPrintStream(fo, "UTF-8"); - } else if (qHashQuerySet.contains(fileName)) { - ss.out = new DigestPrintStream(fo, "UTF-8"); - } else if (qSortNHashQuerySet.contains(fileName)) { - ss.out = new SortAndDigestPrintStream(fo, "UTF-8"); - } else { - ss.out = new SessionStream(fo, true, "UTF-8"); - } - ss.err = new CachingPrintStream(fo, true, "UTF-8"); - ss.setIsSilent(true); - } - private void restartSessions(boolean canReuseSession, CliSessionState ss, SessionState oldSs) throws IOException { - if (oldSs != null && canReuseSession && clusterType.getCoreClusterType() == CoreClusterType.TEZ) { - // Copy the tezSessionState from the old CliSessionState. - TezSessionState tezSessionState = oldSs.getTezSession(); - oldSs.setTezSession(null); - ss.setTezSession(tezSessionState); - oldSs.close(); - } + qTestResultProcessor.setOutputs(ss, fo, fileName); - if (oldSs != null && clusterType.getCoreClusterType() == CoreClusterType.SPARK) { - sparkSession = oldSs.getSparkSession(); - ss.setSparkSession(sparkSession); - oldSs.setSparkSession(null); - oldSs.close(); - } + ss.err = new CachingPrintStream(fo, true, "UTF-8"); + ss.setIsSilent(true); } - private CliSessionState startSessionState(boolean canReuseSession) throws IOException { + public CliSessionState startSessionState(boolean canReuseSession) throws IOException { HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, @@ -1212,7 +598,7 @@ private CliSessionState startSessionState(boolean canReuseSession) throws IOExce SessionState oldSs = SessionState.get(); - restartSessions(canReuseSession, ss, oldSs); + miniClusters.restartSessions(canReuseSession, ss, oldSs); closeSession(oldSs); SessionState.start(ss); @@ -1248,7 +634,7 @@ public int execute(String tname) { } public int executeClient(String tname1, String tname2) { - String commands = getCommand(tname1) + CRLF + getCommand(tname2); + String commands = getCommand(tname1) + System.getProperty("line.separator") + getCommand(tname2); return executeClientInternal(commands); } @@ -1424,9 +810,9 @@ public QTestProcessExecResult checkNegativeResults(String tname, Exception e) th outfd.write(e.getMessage()); outfd.close(); - QTestProcessExecResult result = executeDiffCommand(outf.getPath(), expf, false, qSortSet.contains(qf.getName())); - if (overWrite) { - overwriteResults(outf.getPath(), expf); + QTestProcessExecResult result = qTestResultProcessor.executeDiffCommand(outf.getPath(), expf, false, qf.getName()); + if (QTestSystemProperties.shouldOverwriteResults()) { + qTestResultProcessor.overwriteResults(outf.getPath(), expf); return QTestProcessExecResult.createWithoutOutput(0); } @@ -1455,9 +841,9 @@ public QTestProcessExecResult checkNegativeResults(String tname, Error e) throws + "\n"); outfd.close(); - QTestProcessExecResult result = executeDiffCommand(outf.getPath(), expf, false, qSortSet.contains(qf.getName())); - if (overWrite) { - overwriteResults(outf.getPath(), expf); + QTestProcessExecResult result = qTestResultProcessor.executeDiffCommand(outf.getPath(), expf, false, qf.getName()); + if (QTestSystemProperties.shouldOverwriteResults()) { + qTestResultProcessor.overwriteResults(outf.getPath(), expf); return QTestProcessExecResult.createWithoutOutput(0); } @@ -1476,7 +862,7 @@ public String outPath(String outDir, String testName) { String ret = (new File(outDir, testName)).getPath(); // List of configurations. Currently the list consists of hadoop version and execution mode only List configs = new ArrayList(); - configs.add(this.clusterType.toString()); + configs.add(miniClusters.getClusterType().toString()); Deque stack = new LinkedList(); StringBuilder sb = new StringBuilder(); @@ -1509,10 +895,10 @@ public QTestProcessExecResult checkCliDriverResults(String tname) throws Excepti File f = new File(logDir, tname + outFileExtension); qOutProcessor.maskPatterns(f.getPath(), tname); - QTestProcessExecResult exitVal = executeDiffCommand(f.getPath(), outFileName, false, qSortSet.contains(tname)); + QTestProcessExecResult exitVal = qTestResultProcessor.executeDiffCommand(f.getPath(), outFileName, false, tname); - if (overWrite) { - overwriteResults(f.getPath(), outFileName); + if (QTestSystemProperties.shouldOverwriteResults()) { + qTestResultProcessor.overwriteResults(f.getPath(), outFileName); return QTestProcessExecResult.createWithoutOutput(0); } @@ -1526,7 +912,7 @@ public QTestProcessExecResult checkCompareCliDriverResults(String tname, List diffCommandArgs = new ArrayList(); - diffCommandArgs.add("diff"); - - // Text file comparison - diffCommandArgs.add("-a"); - - // Ignore changes in the amount of white space - if (ignoreWhiteSpace) { - diffCommandArgs.add("-b"); - } - - // Add files to compare to the arguments list - diffCommandArgs.add(getQuotedString(inFileName)); - diffCommandArgs.add(getQuotedString(outFileName)); - - result = executeCmd(diffCommandArgs); - - if (sortResults) { - new File(inFileName).delete(); - new File(outFileName).delete(); - } - - return result; - } - - private static void sortFiles(String in, String out) throws Exception { - int result = executeCmd(new String[]{ - "sort", getQuotedString(in), }, out, null).getReturnCode(); - if (result != 0) { - throw new IllegalStateException("Unexpected error while sorting " + in); - } - } - - private static QTestProcessExecResult executeCmd(Collection args) throws Exception { - return executeCmd(args, null, null); - } - - private static QTestProcessExecResult executeCmd(String[] args) throws Exception { - return executeCmd(args, null, null); - } - - private static QTestProcessExecResult executeCmd(Collection args, String outFile, String errFile) - throws Exception { - String[] cmdArray = args.toArray(new String[args.size()]); - return executeCmd(cmdArray, outFile, errFile); - } - - private static QTestProcessExecResult executeCmd(String[] args, String outFile, String errFile) throws Exception { - System.out.println("Running: " + org.apache.commons.lang.StringUtils.join(args, ' ')); - - PrintStream - out = - outFile == null ? - SessionState.getConsole().getChildOutStream() : - new PrintStream(new FileOutputStream(outFile), true, "UTF-8"); - PrintStream - err = - errFile == null ? - SessionState.getConsole().getChildErrStream() : - new PrintStream(new FileOutputStream(errFile), true, "UTF-8"); - - Process executor = Runtime.getRuntime().exec(args); - - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - PrintStream str = new PrintStream(bos, true, "UTF-8"); - - StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, err); - StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, out, str); - - outPrinter.start(); - errPrinter.start(); - - int result = executor.waitFor(); - - outPrinter.join(); - errPrinter.join(); - - if (outFile != null) { - out.close(); - } - - if (errFile != null) { - err.close(); - } - - return QTestProcessExecResult. - create(result, new String(bos.toByteArray(), StandardCharsets.UTF_8)); - } - - private static String getQuotedString(String str) { - return str; - } - public ASTNode parseQuery(String tname) throws Exception { return pd.parse(qMap.get(tname)); } @@ -1682,280 +939,6 @@ public ASTNode parseQuery(String tname) throws Exception { return sem.getRootTasks(); } - /** - * QTestSetup defines test fixtures which are reused across testcases, - * and are needed before any test can be run - */ - public static class QTestSetup { - private MiniZooKeeperCluster zooKeeperCluster = null; - private int zkPort; - private ZooKeeper zooKeeper; - - public QTestSetup() { - } - - public void preTest(HiveConf conf) throws Exception { - - if (zooKeeperCluster == null) { - //create temp dir - String tmpBaseDir = System.getProperty(TEST_TMP_DIR_PROPERTY); - File tmpDir = Files.createTempDirectory(Paths.get(tmpBaseDir), "tmp_").toFile(); - - zooKeeperCluster = new MiniZooKeeperCluster(); - zkPort = zooKeeperCluster.startup(tmpDir); - } - - if (zooKeeper != null) { - zooKeeper.close(); - } - - int - sessionTimeout = - (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); - zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, new Watcher() { - @Override public void process(WatchedEvent arg0) { - } - }); - - String zkServer = "localhost"; - conf.set("hive.zookeeper.quorum", zkServer); - conf.set("hive.zookeeper.client.port", "" + zkPort); - } - - public void postTest(HiveConf conf) throws Exception { - if (zooKeeperCluster == null) { - return; - } - - if (zooKeeper != null) { - zooKeeper.close(); - } - - ZooKeeperHiveLockManager.releaseAllLocks(conf); - } - - public void tearDown() throws Exception { - CuratorFrameworkSingleton.closeAndReleaseInstance(); - - if (zooKeeperCluster != null) { - zooKeeperCluster.shutdown(); - zooKeeperCluster = null; - } - } - } - - /** - * QTRunner: Runnable class for running a single query file. - **/ - public static class QTRunner implements Runnable { - private final QTestUtil qt; - private final File file; - - public QTRunner(QTestUtil qt, File file) { - this.qt = qt; - this.file = file; - } - - @Override public void run() { - try { - qt.startSessionState(false); - // assumption is that environment has already been cleaned once globally - // hence each thread does not call cleanUp() and createSources() again - qt.cliInit(file); - qt.executeClient(file.getName()); - } catch (Throwable e) { - System.err.println("Query file " + file.getName() + " failed with exception " + e.getMessage()); - e.printStackTrace(); - outputTestFailureHelpMessage(); - } - } - } - - /** - * Setup to execute a set of query files. Uses QTestUtil to do so. - * - * @param qfiles array of input query files containing arbitrary number of hive - * queries - * @param resDir output directory - * @param logDir log directory - * @return one QTestUtil for each query file - */ - public static QTestUtil[] queryListRunnerSetup(File[] qfiles, - String resDir, - String logDir, - String initScript, - String cleanupScript) throws Exception { - QTestUtil[] qt = new QTestUtil[qfiles.length]; - for (int i = 0; i < qfiles.length; i++) { - - qt[i] = - new QTestUtil(QTestArguments.QTestArgumentsBuilder.instance() - .withOutDir(resDir) - .withLogDir(logDir) - .withClusterType(MiniClusterType.none) - .withConfDir(null) - .withInitScript(initScript == null ? defaultInitScript : initScript) - .withCleanupScript(cleanupScript == null ? defaultCleanupScript : cleanupScript) - .withLlapIo(false) - .build()); - - qt[i].addFile(qfiles[i], false); - qt[i].clearTestSideEffects(); - } - - return qt; - } - - /** - * Executes a set of query files in sequence. - * - * @param qfiles array of input query files containing arbitrary number of hive - * queries - * @param qt array of QTestUtils, one per qfile - * @return true if all queries passed, false otw - */ - public static boolean queryListRunnerSingleThreaded(File[] qfiles, QTestUtil[] qt) throws Exception { - boolean failed = false; - qt[0].cleanUp(); - qt[0].createSources(); - for (int i = 0; i < qfiles.length && !failed; i++) { - qt[i].clearTestSideEffects(); - qt[i].cliInit(qfiles[i]); - qt[i].executeClient(qfiles[i].getName()); - QTestProcessExecResult result = qt[i].checkCliDriverResults(qfiles[i].getName()); - if (result.getReturnCode() != 0) { - failed = true; - StringBuilder builder = new StringBuilder(); - builder.append("Test ") - .append(qfiles[i].getName()) - .append(" results check failed with error code ") - .append(result.getReturnCode()); - if (Strings.isNotEmpty(result.getCapturedOutput())) { - builder.append(" and diff value ").append(result.getCapturedOutput()); - } - System.err.println(builder.toString()); - outputTestFailureHelpMessage(); - } - qt[i].clearPostTestEffects(); - } - return (!failed); - } - - /** - * Executes a set of query files parallel. - *

- * Each query file is run in a separate thread. The caller has to arrange - * that different query files do not collide (in terms of destination tables) - * - * @param qfiles array of input query files containing arbitrary number of hive - * queries - * @param qt array of QTestUtils, one per qfile - * @return true if all queries passed, false otw - */ - public static boolean queryListRunnerMultiThreaded(File[] qfiles, QTestUtil[] qt) throws Exception { - boolean failed = false; - - // in multithreaded mode - do cleanup/initialization just once - - qt[0].cleanUp(); - qt[0].createSources(); - qt[0].clearTestSideEffects(); - - QTRunner[] qtRunners = new QTRunner[qfiles.length]; - Thread[] qtThread = new Thread[qfiles.length]; - - for (int i = 0; i < qfiles.length; i++) { - qtRunners[i] = new QTRunner(qt[i], qfiles[i]); - qtThread[i] = new Thread(qtRunners[i]); - } - - for (int i = 0; i < qfiles.length; i++) { - qtThread[i].start(); - } - - for (int i = 0; i < qfiles.length; i++) { - qtThread[i].join(); - QTestProcessExecResult result = qt[i].checkCliDriverResults(qfiles[i].getName()); - if (result.getReturnCode() != 0) { - failed = true; - StringBuilder builder = new StringBuilder(); - builder.append("Test ") - .append(qfiles[i].getName()) - .append(" results check failed with error code ") - .append(result.getReturnCode()); - if (Strings.isNotEmpty(result.getCapturedOutput())) { - builder.append(" and diff value ").append(result.getCapturedOutput()); - } - System.err.println(builder.toString()); - outputTestFailureHelpMessage(); - } - } - return (!failed); - } - - public static void outputTestFailureHelpMessage() { - System.err.println("See ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, or check " - + "./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific " - + "test cases logs."); - System.err.flush(); - } - - private static String[] cachedQvFileList = null; - private static ImmutableList cachedDefaultQvFileList = null; - private static Pattern qvSuffix = Pattern.compile("_[0-9]+.qv$", Pattern.CASE_INSENSITIVE); - - public static List getVersionFiles(String queryDir, String tname) { - ensureQvFileList(queryDir); - List result = getVersionFilesInternal(tname); - if (result == null) { - result = cachedDefaultQvFileList; - } - return result; - } - - private static void ensureQvFileList(String queryDir) { - if (cachedQvFileList != null) { - return; - } - // Not thread-safe. - System.out.println("Getting versions from " + queryDir); - cachedQvFileList = (new File(queryDir)).list(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.toLowerCase().endsWith(".qv"); - } - }); - if (cachedQvFileList == null) { - return; // no files at all - } - Arrays.sort(cachedQvFileList, String.CASE_INSENSITIVE_ORDER); - List defaults = getVersionFilesInternal("default"); - cachedDefaultQvFileList = (defaults != null) ? ImmutableList.copyOf(defaults) : ImmutableList.of(); - } - - private static List getVersionFilesInternal(String tname) { - if (cachedQvFileList == null) { - return new ArrayList(); - } - int pos = Arrays.binarySearch(cachedQvFileList, tname, String.CASE_INSENSITIVE_ORDER); - if (pos >= 0) { - throw new BuildException("Unexpected file list element: " + cachedQvFileList[pos]); - } - List result = null; - for (pos = (-pos - 1); pos < cachedQvFileList.length; ++pos) { - String candidate = cachedQvFileList[pos]; - if (candidate.length() <= tname.length() - || !tname.equalsIgnoreCase(candidate.substring(0, tname.length())) - || !qvSuffix.matcher(candidate.substring(tname.length())).matches()) { - break; - } - if (result == null) { - result = new ArrayList(); - } - result.add(candidate); - } - return result; - } - public void failed(int ecode, String fname, String debugHint) { String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null; String @@ -2005,16 +988,4 @@ public QOutProcessor getQOutProcessor() { public static void initEventNotificationPoll() throws Exception { NotificationEventPoll.initialize(SessionState.get().getConf()); } - - /** - * Should deleted test tables have their data purged. - * - * @return true if data should be purged - */ - private static boolean fsNeedsPurge(FsType type) { - if (type == FsType.encrypted_hdfs || type == FsType.erasure_coded_hdfs) { - return true; - } - return false; - } } diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/dataset/QTestDatasetHandler.java itests/util/src/main/java/org/apache/hadoop/hive/ql/dataset/QTestDatasetHandler.java new file mode 100644 index 0000000000..e7095d9fc6 --- /dev/null +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/dataset/QTestDatasetHandler.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.dataset; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hive.cli.CliDriver; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QTestSystemProperties; +import org.apache.hadoop.hive.ql.QTestUtil; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QTestDatasetHandler { + private static final Logger LOG = LoggerFactory.getLogger("QTestDatasetHandler"); + + private File datasetDir; + private QTestUtil qt; + private static Set srcTables; + + public QTestDatasetHandler(QTestUtil qTestUtil, HiveConf conf) { + // Use path relative to dataDir directory if it is not specified + String dataDir = getDataDir(conf); + + datasetDir = conf.get("test.data.set.files") == null ? new File(dataDir + "/datasets") + : new File(conf.get("test.data.set.files")); + this.qt = qTestUtil; + } + + public String getDataDir(HiveConf conf) { + String dataDir = conf.get("test.data.files"); + // Use the current directory if it is not specified + if (dataDir == null) { + dataDir = new File(".").getAbsolutePath() + "/data/files"; + } + + return dataDir; + } + + public void initDataSetForTest(File file, CliDriver cliDriver) throws Exception { + synchronized (QTestUtil.class) { + DatasetParser parser = new DatasetParser(); + parser.parse(file); + + DatasetCollection datasets = parser.getDatasets(); + + Set missingDatasets = datasets.getTables(); + missingDatasets.removeAll(getSrcTables()); + if (missingDatasets.isEmpty()) { + return; + } + qt.newSession(true); + for (String table : missingDatasets) { + if (initDataset(table, cliDriver)) { + addSrcTable(table); + } + } + qt.newSession(true); + } + } + + public boolean initDataset(String table, CliDriver cliDriver) throws Exception { + File tableFile = new File(new File(datasetDir, table), Dataset.INIT_FILE_NAME); + String commands = null; + try { + commands = FileUtils.readFileToString(tableFile); + } catch (IOException e) { + throw new RuntimeException(String.format("dataset file not found %s", tableFile), e); + } + + int result = cliDriver.processLine(commands); + LOG.info("Result from cliDrriver.processLine in initFromDatasets=" + result); + if (result != 0) { + Assert.fail("Failed during initFromDatasets processLine with code=" + result); + } + + return true; + } + + public static Set getSrcTables() { + if (srcTables == null) { + initSrcTables(); + } + return srcTables; + } + + public static void addSrcTable(String table) { + getSrcTables().add(table); + storeSrcTables(); + } + + public static Set initSrcTables() { + if (srcTables == null) { + initSrcTablesFromSystemProperty(); + storeSrcTables(); + } + + return srcTables; + } + + public static boolean isSourceTable(String name) { + return getSrcTables().contains(name); + } + + private static void storeSrcTables() { + QTestSystemProperties.setSrcTables(srcTables); + } + + private static void initSrcTablesFromSystemProperty() { + srcTables = new HashSet(); + // FIXME: moved default value to here...for now + // i think this features is never really used from the command line + for (String srcTable : QTestSystemProperties.getSrcTables()) { + srcTable = srcTable.trim(); + if (!srcTable.isEmpty()) { + srcTables.add(srcTable); + } + } + } +} diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/parse/CoreParseNegative.java itests/util/src/main/java/org/apache/hadoop/hive/ql/parse/CoreParseNegative.java index a7972ecea3..f4b9391969 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/parse/CoreParseNegative.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/parse/CoreParseNegative.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.QTestArguments; import org.apache.hadoop.hive.ql.QTestProcessExecResult; import org.apache.hadoop.hive.ql.QTestUtil; -import org.apache.hadoop.hive.ql.QTestUtil.MiniClusterType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.MiniClusterType; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -40,7 +40,6 @@ private static QTestUtil qt; private static CliConfigs.ParseNegativeConfig cliConfig = new CliConfigs.ParseNegativeConfig(); - private static boolean firstRun; public CoreParseNegative(AbstractCliConfig testCliConfig) { super(testCliConfig); @@ -52,7 +51,6 @@ public void beforeClass() { MiniClusterType miniMR = cliConfig.getClusterType(); String initScript = cliConfig.getInitScript(); String cleanupScript = cliConfig.getCleanupScript(); - firstRun = true; try { qt = new QTestUtil( @@ -67,6 +65,8 @@ public void beforeClass() { .build()); qt.newSession(); + qt.cleanUp(); + qt.createSources(); } catch (Exception e) { System.err.println("Exception: " + e.getMessage()); @@ -102,9 +102,6 @@ public void shutdown() { } } - private static String debugHint = "\nSee ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, " - + "or check ./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific test cases logs."; - @Override public void runTest(String tname, String fname, String fpath) throws Exception { long startTime = System.currentTimeMillis(); @@ -112,33 +109,26 @@ public void runTest(String tname, String fname, String fpath) throws Exception { System.err.println("Begin query: " + fname); qt.addFile(fpath); - if (firstRun) { - qt.init(fname); - firstRun = false; - } - qt.cliInit(new File(fpath)); ASTNode tree = qt.parseQuery(fname); qt.analyzeAST(tree); - fail("Unexpected success for query: " + fname + debugHint); - } - catch (ParseException pe) { + fail("Unexpected success for query: " + fname + QTestUtil.DEBUG_HINT); + } catch (ParseException pe) { QTestProcessExecResult result = qt.checkNegativeResults(fname, pe); if (result.getReturnCode() != 0) { - qt.failed(result.getReturnCode(), fname, result.getCapturedOutput() + "\r\n" + debugHint); + qt.failed(result.getReturnCode(), fname, + result.getCapturedOutput() + "\r\n" + QTestUtil.DEBUG_HINT); } - } - catch (SemanticException se) { + } catch (SemanticException se) { QTestProcessExecResult result = qt.checkNegativeResults(fname, se); if (result.getReturnCode() != 0) { - String message = Strings.isNullOrEmpty(result.getCapturedOutput()) ? - debugHint : "\r\n" + result.getCapturedOutput(); + String message = Strings.isNullOrEmpty(result.getCapturedOutput()) ? QTestUtil.DEBUG_HINT + : "\r\n" + result.getCapturedOutput(); qt.failedDiff(result.getReturnCode(), fname, message); } - } - catch (Exception e) { - qt.failed(e, fname, debugHint); + } catch (Exception e) { + qt.failed(e, fname, QTestUtil.DEBUG_HINT); } long elapsedTime = System.currentTimeMillis() - startTime; diff --git itests/util/src/main/java/org/apache/hive/beeline/QFile.java itests/util/src/main/java/org/apache/hive/beeline/QFile.java index 6e64c53d32..34e7113683 100644 --- itests/util/src/main/java/org/apache/hive/beeline/QFile.java +++ itests/util/src/main/java/org/apache/hive/beeline/QFile.java @@ -20,7 +20,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.ql.QTestProcessExecResult; -import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.hive.ql.dataset.QTestDatasetHandler; import org.apache.hadoop.util.Shell; import org.apache.hive.common.util.StreamPrinter; import org.apache.hive.beeline.ConvertedOutputFile.Converter; @@ -43,7 +43,7 @@ * input and output files, and provides methods for filtering the output of the runs. */ public final class QFile { - private static final Set srcTables = QTestUtil.getSrcTables(); + private static final Set srcTables = QTestDatasetHandler.getSrcTables(); private static final String DEBUG_HINT = "The following files can help you identifying the problem:%n" + " - Query file: %1s%n" diff --git itests/util/src/main/java/org/apache/hive/beeline/QFileBeeLineClient.java itests/util/src/main/java/org/apache/hive/beeline/QFileBeeLineClient.java index be4c6e89cd..2a1bb2cb28 100644 --- itests/util/src/main/java/org/apache/hive/beeline/QFileBeeLineClient.java +++ itests/util/src/main/java/org/apache/hive/beeline/QFileBeeLineClient.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.hive.ql.dataset.QTestDatasetHandler; import org.apache.hive.beeline.ConvertedOutputFile.Converter; import java.io.File; @@ -152,7 +153,7 @@ private void beforeExecute(QFile qFile) throws Exception { .map(database -> "DROP DATABASE `" + database + "` CASCADE;") .collect(Collectors.toSet()); - Set srcTables = QTestUtil.getSrcTables(); + Set srcTables = QTestDatasetHandler.getSrcTables(); dropCommands.addAll(getTables().stream() .filter(table -> !srcTables.contains(table)) .map(table -> "DROP TABLE `" + table + "` PURGE;") diff --git itests/util/src/test/java/org/apache/hadoop/hive/ql/TestQOutProcessor.java itests/util/src/test/java/org/apache/hadoop/hive/ql/TestQOutProcessor.java index 9e19f27b7f..7242aec566 100644 --- itests/util/src/test/java/org/apache/hadoop/hive/ql/TestQOutProcessor.java +++ itests/util/src/test/java/org/apache/hadoop/hive/ql/TestQOutProcessor.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hive.ql; -import org.apache.hadoop.hive.ql.QTestUtil.FsType; +import org.apache.hadoop.hive.ql.QTestMiniClusters.FsType; import org.junit.Assert; import org.junit.Test;