diff --git itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index adb8a71..f54735d 100644 --- itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -59,7 +59,7 @@ private static final AtomicLong hs2Counter = new AtomicLong(); private MiniMrShim mr; private MiniDFSShim dfs; - private FileSystem localFS; + private final FileSystem localFS; private boolean useMiniKdc = false; private final String serverPrincipal; private final boolean isMetastoreRemote; @@ -181,7 +181,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM // Initialize the execution engine based on cluster type switch (miniClusterType) { case TEZ: - mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString, 1); + mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString, 1, false); break; case MR: mr = ShimLoader.getHadoopShims().getMiniMrCluster(hiveConf, 4, uriString, 1); diff --git itests/qtest/pom.xml itests/qtest/pom.xml index b3028bc..27f8b82 100644 --- itests/qtest/pom.xml +++ itests/qtest/pom.xml @@ -103,6 +103,13 @@ test + org.apache.hive + hive-llap-server + ${project.version} + test-jar + test + + org.apache.hadoop hadoop-yarn-registry ${hadoop-23.version} @@ -206,12 +213,6 @@ - org.apache.hive - hive-llap-server - ${project.version} - test - - com.sun.jersey jersey-servlet ${jersey.version} @@ -516,7 +517,7 @@ clusterMode="tez" runDisabled="${run_disabled}" hiveConfDir="${basedir}/${hive.path.to.root}/data/conf/tez" - resultsDirectory="${basedir}/${hive.path.to.root}/ql/src/test/results/clientpositive/tez" + resultsDirectory="${basedir}/${hive.path.to.root}/ql/src/test/results/clientpositive/tez" className="TestMiniTezCliDriver" logFile="${project.build.directory}/testminitezclidrivergen.log" logDirectory="${project.build.directory}/qfile-results/clientpositive/" @@ -529,6 +530,24 @@ templatePath="${basedir}/${hive.path.to.root}/ql/src/test/templates/" template="TestCliDriver.vm" queryDirectory="${basedir}/${hive.path.to.root}/ql/src/test/queries/clientpositive/" queryFile="${qfile}" + includeQueryFile="${minitez.query.files},${minitez.query.files.shared}" + queryFileRegex="${qfile_regex}" + clusterMode="llap" + runDisabled="${run_disabled}" + hiveConfDir="${basedir}/${hive.path.to.root}/data/conf/llap" + resultsDirectory="${basedir}/${hive.path.to.root}/ql/src/test/results/clientpositive/llap" + className="TestMiniLlapCliDriver" + logFile="${project.build.directory}/testminitezclidrivergen.log" + logDirectory="${project.build.directory}/qfile-results/clientpositive/" + hadoopVersion="${active.hadoop.version}" + initScript="q_test_init.sql" + cleanupScript="q_test_cleanup.sql"/> + + hive-common ${project.version} + + + org.apache.hive + hive-llap-server + ${project.version} + + + + org.apache.hive + hive-llap-server + test-jar + ${project.version} + + org.apache.hive hive-cli diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index c9ed0d4..a1c7925 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -46,8 +46,10 @@ import java.util.Collection; import java.util.Deque; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -62,6 +64,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -82,6 +85,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton; import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -150,7 +154,7 @@ private HadoopShims.MiniMrShim mr = null; private HadoopShims.MiniDFSShim dfs = null; private HadoopShims.HdfsEncryptionShim hes = null; - private boolean miniMr = false; + private final boolean miniMr = false; private String hadoopVer = null; private QTestSetup setup = null; private SparkSession sparkSession = null; @@ -311,6 +315,7 @@ public void initConf() throws Exception { spark, encrypted, miniSparkOnYarn, + llap, none; public static MiniClusterType valueForString(String type) { @@ -324,6 +329,8 @@ public static MiniClusterType valueForString(String type) { return encrypted; } else if (type.equals("miniSparkOnYarn")) { return miniSparkOnYarn; + } else if (type.equals("llap")) { + return llap; } else { return none; } @@ -400,8 +407,24 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, } String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString()); + boolean isLlap = false; if (clusterType == MiniClusterType.tez) { - mr = shims.getMiniTezCluster(conf, 4, uriString, 1); + mr = shims.getMiniTezCluster(conf, 4, uriString, 1, isLlap); + } else if (clusterType == MiniClusterType.llap) { + mr = shims.getMiniTezCluster(conf, 2, uriString, 1, isLlap); + if (isLlap) { + MiniLlapCluster llapCluster = + MiniLlapCluster.create("llap", 1, 1000000000, true, true, 500000000, 2); + conf = (HiveConf) mr.getConf(); + llapCluster.init(conf); + llapCluster.start(); + Configuration llapConf = llapCluster.getClusterSpecificConfiguration(); + Iterator> confIter = llapConf.iterator(); + while (confIter.hasNext()) { + Entry entry = confIter.next(); + conf.set(entry.getKey(), entry.getValue()); + } + } } else if (clusterType == MiniClusterType.miniSparkOnYarn) { mr = shims.getMiniSparkCluster(conf, 4, uriString, 1); } else { diff --git llap-server/pom.xml llap-server/pom.xml index 1d64992..a4889e7 100644 --- llap-server/pom.xml +++ llap-server/pom.xml @@ -266,6 +266,18 @@ + + org.apache.maven.plugins + maven-jar-plugin + ${maven.jar.plugin.version} + + + + test-jar + + + + diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index d49d83e..8717b96 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -254,7 +254,7 @@ private boolean checkExpression(ExprNodeDesc expr) { private boolean checkAggregator(AggregationDesc agg) throws SemanticException { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Checking '%s'", agg.getExprString())); + LOG.debug(String.format("Checking '%s'", agg.getExprString())); } boolean result = checkExpressions(agg.getParameters()); @@ -277,12 +277,12 @@ private boolean checkExpressions(Collection exprs) { private boolean checkAggregators(Collection aggs) { boolean result = true; try { - for (AggregationDesc agg: aggs) { - result = result && checkAggregator(agg); - } + for (AggregationDesc agg : aggs) { + result = result && checkAggregator(agg); + } } catch (SemanticException e) { - LOG.warn("Exception testing aggregators.",e); - result = false; + LOG.warn("Exception testing aggregators.", e); + result = false; } return result; } diff --git shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index 574461b..8e1179f 100644 --- shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -231,7 +231,7 @@ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, @Override public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode, int numDir) throws IOException { + String nameNode, int numDir, boolean isLlap) throws IOException { throw new IOException("Cannot run tez on current hadoop, Version: " + VersionInfo.getVersion()); } @@ -278,6 +278,11 @@ public void shutdown() throws IOException { public void setupConfiguration(Configuration conf) { setJobLauncherRpcAddress(conf, "localhost:" + mr.getJobTrackerPort()); } + + @Override + public Configuration getConf() { + throw new RuntimeException("Invalid call for hadoop-1"); + } } // Don't move this code to the parent class. There's a binary diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 21dde51..ff06d5b 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -121,7 +121,7 @@ public Hadoop23Shims() { zcr = true; } catch (ClassNotFoundException ce) { } - + if (zcr) { // in-memory HDFS is only available after zcr try { @@ -349,6 +349,11 @@ public void setupConfiguration(Configuration conf) { conf.set(pair.getKey(), pair.getValue()); } } + + @Override + public Configuration getConf() { + return this.conf; + } } /** @@ -356,8 +361,8 @@ public void setupConfiguration(Configuration conf) { */ @Override public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode, int numDir) throws IOException { - return new MiniTezShim(conf, numberOfTaskTrackers, nameNode, numDir); + String nameNode, int numDir, boolean isLlap) throws IOException { + return new MiniTezShim(conf, numberOfTaskTrackers, nameNode, numDir, isLlap); } /** @@ -368,8 +373,8 @@ public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers private final MiniTezCluster mr; private final Configuration conf; - public MiniTezShim(Configuration conf, int numberOfTaskTrackers, - String nameNode, int numDir) throws IOException { + public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir, + boolean isLlap) throws IOException { mr = new MiniTezCluster("hive", numberOfTaskTrackers); conf.set("fs.defaultFS", nameNode); @@ -397,6 +402,10 @@ public void shutdown() throws IOException { mr.stop(); } + public Configuration getClusterConfiguration() { + return this.conf; + } + @Override public void setupConfiguration(Configuration conf) { Configuration config = mr.getConfig(); @@ -913,6 +922,7 @@ public LocatedFileStatus next() throws IOException { * Cannot add Override annotation since FileSystem.access() may not exist in * the version of hadoop used to build Hive. */ + @Override public void access(Path path, FsAction action) throws AccessControlException, FileNotFoundException, IOException { Path underlyingFsPath = swizzleParamPath(path); @@ -1179,7 +1189,7 @@ public void setStoragePolicy(Path path, StoragePolicyValue policy) } } } - + @Override public HadoopShims.StoragePolicyShim getStoragePolicyShim(FileSystem fs) { @@ -1250,7 +1260,7 @@ public static boolean isHdfsEncryptionSupported() { */ private KeyProvider keyProvider = null; - private Configuration conf; + private final Configuration conf; public HdfsEncryptionShim(URI uri, Configuration conf) throws IOException { DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.get(uri, conf); @@ -1409,6 +1419,7 @@ public int readByteBuffer(FSDataInputStream file, ByteBuffer dest) throws IOExce } return result; } + @Override public void addDelegationTokens(FileSystem fs, Credentials cred, String uname) throws IOException { // Use method addDelegationTokens instead of getDelegationToken to get all the tokens including KMS. fs.addDelegationTokens(uname, cred); diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 2b6f322..4d6da21 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -95,7 +95,7 @@ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException; public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode, int numDir) throws IOException; + String nameNode, int numDir, boolean isLlap) throws IOException; public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException; @@ -107,6 +107,8 @@ public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTracke public int getJobTrackerPort() throws UnsupportedOperationException; public void shutdown() throws IOException; public void setupConfiguration(Configuration conf); + + public Configuration getConf(); } /** @@ -416,11 +418,11 @@ public void setFullFileStatus(Configuration conf, HdfsFileStatus sourceStatus, public FileSystem createProxyFileSystem(FileSystem fs, URI uri); public Map getHadoopConfNames(); - + /** * Create a shim for DFS storage policy. */ - + public enum StoragePolicyValue { MEMORY, /* 1-replica memory */ SSD, /* 3-replica ssd */ @@ -433,11 +435,11 @@ public static StoragePolicyValue lookup(String name) { return StoragePolicyValue.valueOf(name.toUpperCase().trim()); } }; - + public interface StoragePolicyShim { void setStoragePolicy(Path path, StoragePolicyValue policy) throws IOException; } - + /** * obtain a storage policy shim associated with the filesystem. * Returns null when the filesystem has no storage policies.