commit 15532fb25f73ee96243753f383d4d94dac657d33 Author: Sahil Takiar Date: Thu Jun 28 17:50:41 2018 -0700 HIVE-20032: Don't serialize hashCode when groupByShuffle and RDD cacheing is disabled diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e630e8819a..c1ad480983 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4235,7 +4235,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If this is set to true, mapjoin optimization in Hive/Spark will use statistics from\n" + "TableScan operators at the root of operator tree, instead of parent ReduceSink\n" + "operators of the Join operator."), - SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", false, + SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", true, "If this is set to true, Hive on Spark will register custom serializers for data types\n" + "in shuffle. This should result in less shuffled data."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java index d383873acf..191d5f5a8a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java @@ -33,6 +33,9 @@ import org.junit.Assert; import org.junit.Test; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; import java.nio.file.Paths; import java.util.List; import java.util.Map; @@ -41,13 +44,10 @@ public class TestSparkStatistics { @Test - public void testSparkStatistics() { + public void testSparkStatistics() throws MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); HiveConf conf = new HiveConf(); - conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - SQLStdHiveAuthorizerFactory.class.getName()); - conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); - conf.set("spark.master", "local-cluster[1,2,1024]"); conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), "TestSparkStatistics-local-dir").toString()); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java index fe8a32f801..341da33f1a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.File; +import java.net.MalformedURLException; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; @@ -66,12 +68,10 @@ public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLExcepti private Connection hs2Conn = null; private Statement stmt; - private static HiveConf createHiveConf() { + private static HiveConf createHiveConf() throws MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); HiveConf conf = new HiveConf(); - conf.set("hive.execution.engine", "spark"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.set("spark.master", "local-cluster[2,2,1024]"); - conf.set("hive.spark.client.connect.timeout", "30000ms"); // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout // while spark2 is still using Hadoop2. // Spark requires Hive to support Hadoop3 first then Spark can start diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java index b0a0145a4e..d025b9d39e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java @@ -18,7 +18,9 @@ package org.apache.hive.jdbc; +import java.io.File; import java.io.IOException; +import java.net.MalformedURLException; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; @@ -30,7 +32,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim; import org.apache.hadoop.hive.shims.HadoopShims.MiniDFSShim; @@ -55,19 +56,17 @@ private static HiveConf conf; private Connection hs2Conn = null; - private static HiveConf createHiveOnSparkConf() { + private static HiveConf createHiveOnSparkConf() throws MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); HiveConf hiveConf = new HiveConf(); // Tell dfs not to consider load when choosing a datanode as this can cause failure as // in a test we do not have spare datanode capacity. hiveConf.setBoolean("dfs.namenode.redundancy.considerLoad", false); - hiveConf.set("hive.execution.engine", "spark"); - hiveConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - hiveConf.set("spark.master", "local-cluster[2,2,1024]"); hiveConf.set("hive.spark.client.connect.timeout", "30000ms"); hiveConf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), "TestJdbcWithMiniHS2ErasureCoding-local-dir") .toString()); - hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); // avoid ZK errors return hiveConf; } diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java index 79d56f5633..f7586c108d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java @@ -18,6 +18,8 @@ package org.apache.hive.jdbc; +import java.io.File; +import java.net.MalformedURLException; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; @@ -73,14 +75,12 @@ public void run(HiveSessionHookContext sessionHookContext) throws HiveSQLExcepti private ExecutorService pool = null; - private static HiveConf createHiveConf() { + private static HiveConf createHiveConf() throws MalformedURLException { + String confDir = "../../data/conf/spark/standalone/hive-site.xml"; + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); HiveConf conf = new HiveConf(); conf.set("hive.exec.parallel", "true"); - conf.set("hive.execution.engine", "spark"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.set("spark.master", "local-cluster[2,2,1024]"); conf.set("spark.deploy.defaultCores", "2"); - conf.set("hive.spark.client.connect.timeout", "30000ms"); // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout // while spark2 is still using Hadoop2. // Spark requires Hive to support Hadoop3 first then Spark can start diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java new file mode 100644 index 0000000000..366c74cd05 --- /dev/null +++ b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.SparkConf; +import org.apache.spark.serializer.KryoSerializer; + + +/** + * A {@link KryoSerializer} that does not serialize hash codes while serializing a + * {@link HiveKey}. This decreases the amount of data to be shuffled during a Spark shuffle. + */ +public class NoHashCodeKryoSerializer extends KryoSerializer { + + private static final long serialVersionUID = 3350910170041648022L; + + public NoHashCodeKryoSerializer(SparkConf conf) { + super(conf); + } + + @Override + public Kryo newKryo() { + Kryo kryo = super.newKryo(); + kryo.register(HiveKey.class, new HiveKeySerializer()); + kryo.register(BytesWritable.class, new BytesWritableSerializer()); + return kryo; + } + + private static class HiveKeySerializer extends Serializer { + + public void write(Kryo kryo, Output output, HiveKey object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + } + + public HiveKey read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new HiveKey(bytes); + } + } + + static class BytesWritableSerializer extends Serializer { + + public void write(Kryo kryo, Output output, BytesWritable object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + } + + public BytesWritable read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new BytesWritable(bytes); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 72ff53e3bd..3349797d6a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -90,18 +90,16 @@ public static synchronized LocalHiveSparkClient getInstance( private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf) throws FileNotFoundException, MalformedURLException { - String regJar = null; + File regJar = null; // the registrator jar should already be in CP when not in test mode - if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST)) { - String kryoReg = sparkConf.get("spark.kryo.registrator", ""); - if (SparkClientUtilities.HIVE_KRYO_REG_NAME.equals(kryoReg)) { - regJar = SparkClientUtilities.findKryoRegistratorJar(hiveConf); - SparkClientUtilities.addJarToContextLoader(new File(regJar)); - } + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST) && + HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE)) { + regJar = SparkClientUtilities.findKryoRegistratorJar(hiveConf); + SparkClientUtilities.addJarToContextLoader(regJar); } sc = new JavaSparkContext(sparkConf); if (regJar != null) { - sc.addJar(regJar); + sc.addJar(regJar.getAbsolutePath()); } jobMetricsListener = new JobMetricsListener(); sc.sc().addSparkListener(jobMetricsListener); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 22b598f0b4..1bf5a562c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -23,19 +23,24 @@ import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.rdd.ShuffledRDD; +import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.storage.StorageLevel; + public class SortByShuffler implements SparkShuffler { private final boolean totalOrder; private final SparkPlan sparkPlan; + private final KryoSerializer shuffleSerializer; /** * @param totalOrder whether this shuffler provides total order shuffle. */ - public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { + public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan, KryoSerializer shuffleSerializer) { this.totalOrder = totalOrder; this.sparkPlan = sparkPlan; + this.shuffleSerializer = shuffleSerializer; } @Override @@ -56,6 +61,11 @@ public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { Partitioner partitioner = new HashPartitioner(numPartitions); rdd = input.repartitionAndSortWithinPartitions(partitioner); } + if (shuffleSerializer != null) { + if (rdd.rdd() instanceof ShuffledRDD) { + ((ShuffledRDD) rdd.rdd()).setSerializer(shuffleSerializer); + } + } return rdd; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index d71d705c78..e021093c86 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -18,13 +18,19 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.MalformedURLException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.spark.SparkConf; import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,10 +74,12 @@ @SuppressWarnings("rawtypes") public class SparkPlanGenerator { + private static final String CLASS_NAME = SparkPlanGenerator.class.getName(); - private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final Logger LOG = LoggerFactory.getLogger(SparkPlanGenerator.class); + private static final String HIVE_SHUFFLE_KRYO_SERIALIZER = "org.apache.hive.spark.NoHashCodeKryoSerializer"; + private final PerfLogger perfLogger = SessionState.getPerfLogger(); private final JavaSparkContext sc; private final JobConf jobConf; private final Context context; @@ -82,6 +90,7 @@ private final Map workToParentWorkTranMap; // a map from each BaseWork to its cloned JobConf private final Map workToJobConf; + private final org.apache.spark.serializer.KryoSerializer shuffleSerializer; public SparkPlanGenerator( JavaSparkContext sc, @@ -98,6 +107,7 @@ public SparkPlanGenerator( this.workToParentWorkTranMap = new HashMap(); this.sparkReporter = sparkReporter; this.workToJobConf = new HashMap(); + this.shuffleSerializer = getKyroSerializerForShuffle(sc.getConf()); } public SparkPlan generate(SparkWork sparkWork) throws Exception { @@ -227,9 +237,9 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); SparkShuffler shuffler; if (edge.isMRShuffle()) { - shuffler = new SortByShuffler(false, sparkPlan); + shuffler = new SortByShuffler(false, sparkPlan, shuffleSerializer); } else if (edge.isShuffleSort()) { - shuffler = new SortByShuffler(true, sparkPlan); + shuffler = new SortByShuffler(true, sparkPlan, shuffleSerializer); } else { shuffler = new GroupByShuffler(); } @@ -375,4 +385,32 @@ private void initStatsPublisher(BaseWork work) throws HiveException { } } + private org.apache.spark.serializer.KryoSerializer getKyroSerializerForShuffle( + SparkConf sparkConf) { + if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE)) { + try { + String sparkMaster = sc.getConf().get("spark.master"); + + // In local mode we can't rely on using spark.driver.extraClassPath to add the + // hive-kryo-registrator jar to the classpath since everything is run in the same + // process, so instead we manually add it to the current classloader + if ("local".equals(sparkMaster) || sparkMaster.startsWith("local[")) { + try { + SparkClientUtilities.addJarToContextLoader( + SparkClientUtilities.findKryoRegistratorJar(this.jobConf)); + } catch (MalformedURLException | FileNotFoundException e) { + throw new RuntimeException(e); + } + } + return (org.apache.spark.serializer.KryoSerializer) Thread.currentThread().getContextClassLoader().loadClass( + HIVE_SHUFFLE_KRYO_SERIALIZER).getConstructor(SparkConf.class).newInstance( + sparkConf); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) { + throw new IllegalStateException("Unable to create kryo serializer for shuffle RDDs using " + + "class " + HIVE_SHUFFLE_KRYO_SERIALIZER, e); + } + } else { + return null; + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java index a1f944621b..b6f78d8abc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java @@ -38,6 +38,10 @@ public HiveKey() { hashCodeValid = false; } + public HiveKey(byte[] bytes) { + super(bytes); + } + public HiveKey(byte[] bytes, int hashcode) { super(bytes); hashCode = hashcode; diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java index b2b5201c98..c897310a6e 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java @@ -305,6 +305,20 @@ public void cancel(String jobId) { } } + String deployMode = conf.get(SPARK_DEPLOY_MODE); + + if (hiveConf.getBoolVar(ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE)) { + File hiveKryoJar = SparkClientUtilities.findKryoRegistratorJar(hiveConf); + addJars(hiveKryoJar.getAbsolutePath()); + if ("cluster".equals(deployMode)) { + allProps.put(DRIVER_EXTRA_CLASSPATH, allProps.get(DRIVER_EXTRA_CLASSPATH) + ":" + + hiveKryoJar.getName()); + } else { + allProps.put(DRIVER_EXTRA_CLASSPATH, allProps.get(DRIVER_EXTRA_CLASSPATH) + ":" + + hiveKryoJar.getAbsolutePath()); + } + } + Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8); try { allProps.store(writer, "Spark Context configuration"); @@ -317,7 +331,6 @@ public void cancel(String jobId) { // SparkSubmit will take care of that for us. String master = conf.get("spark.master"); Preconditions.checkArgument(master != null, "spark.master is not defined."); - String deployMode = conf.get(SPARK_DEPLOY_MODE); if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) { String executorCores = conf.get("spark.executor.cores"); @@ -364,11 +377,6 @@ public void cancel(String jobId) { } } - String regStr = conf.get("spark.kryo.registrator"); - if (HIVE_KRYO_REG_NAME.equals(regStr)) { - addJars(SparkClientUtilities.findKryoRegistratorJar(hiveConf)); - } - addPropertiesFile(properties.getAbsolutePath()); addClass(RemoteDriver.class.getName()); diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index d3cb3dd7a1..3e0766a1f6 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -148,7 +148,7 @@ public static String getDeployModeFromMaster(String master) { return null; } - public static String findKryoRegistratorJar(HiveConf conf) throws FileNotFoundException { + public static File findKryoRegistratorJar(Configuration conf) throws FileNotFoundException { // find the jar in local maven repo for testing if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { String repo = System.getProperty("maven.local.repository"); @@ -160,7 +160,7 @@ public static String findKryoRegistratorJar(HiveConf conf) throws FileNotFoundEx if (!new File(jar).exists()) { throw new FileNotFoundException(jar + " doesn't exist."); } - return jar; + return new File(jar); } Option option = SparkContext.jarOfClass(SparkClientUtilities.class); if (!option.isDefined()) { @@ -170,7 +170,7 @@ public static String findKryoRegistratorJar(HiveConf conf) throws FileNotFoundEx File[] jars = path.getParentFile().listFiles((dir, name) -> name.startsWith(HIVE_KRYO_REG_JAR_NAME)); if (jars != null && jars.length > 0) { - return jars[0].getAbsolutePath(); + return jars[0]; } throw new FileNotFoundException("Cannot find the " + HIVE_KRYO_REG_JAR_NAME + " jar under " + path.getParent()); diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index 681463e405..d7380038fa 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -35,6 +35,7 @@ import java.io.FileOutputStream; import java.io.InputStream; import java.io.Serializable; +import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Paths; import java.util.Arrays; @@ -70,7 +71,13 @@ private static final HiveConf HIVECONF = new HiveConf(); static { - HIVECONF.set("hive.spark.client.connect.timeout", "30000ms"); + String confDir = "../data/conf/spark/standalone/hive-site.xml"; + try { + HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + HIVECONF.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, true); HIVECONF.setVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE, HiveConf.HIVE_SPARK_LAUNCHER_CLIENT); }