diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bd85577..0ae163a 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3417,6 +3417,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If this is set to true, mapjoin optimization in Hive/Spark will use statistics from\n" + "TableScan operators at the root of operator tree, instead of parent ReduceSink\n" + "operators of the Join operator."), + SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", true, + "If this is set to true, Hive on Spark will register custom serializers for data types\n" + + "in shuffle. This should result in less shuffled data."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), "Timeout for requests from Hive client to remote Spark driver."), diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 8d92da3..9459e77 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1439,7 +1439,8 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\ spark_vectorized_dynamic_partition_pruning.q,\ spark_use_ts_stats_for_mapjoin.q,\ spark_use_op_stats.q,\ - spark_explain_groupbyshuffle.q + spark_explain_groupbyshuffle.q,\ + spark_opt_shuffle_serde.q miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ bucket4.q,\ diff --git kryo-registrator/pom.xml kryo-registrator/pom.xml new file mode 100644 index 0000000..a5279fa --- /dev/null +++ kryo-registrator/pom.xml @@ -0,0 +1,49 @@ + + + + + 4.0.0 + + hive + org.apache.hive + 3.0.0-SNAPSHOT + + + hive-kryo-registrator + jar + Hive Kryo Registrator + + + .. + + + + + org.apache.hive + hive-exec + ${project.version} + true + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + true + + + + \ No newline at end of file diff --git kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java new file mode 100644 index 0000000..62ba0eb --- /dev/null +++ kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.serializer.KryoRegistrator; + +/** + * Kryo registrator for shuffle data, i.e. HiveKey and BytesWritable. + * + * Active use (e.g. reflection to get a class instance) of this class on hive side can cause + * problems because kryo is relocated in hive-exec. + */ +public class HiveKryoRegistrator implements KryoRegistrator { + @Override + public void registerClasses(Kryo kryo) { + kryo.register(HiveKey.class, new HiveKeySerializer()); + kryo.register(BytesWritable.class, new BytesWritableSerializer()); + } + + private static class HiveKeySerializer extends Serializer { + + public void write(Kryo kryo, Output output, HiveKey object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + output.writeVarInt(object.hashCode(), false); + } + + public HiveKey read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new HiveKey(bytes, input.readVarInt(false)); + } + } + + private static class BytesWritableSerializer extends Serializer { + + public void write(Kryo kryo, Output output, BytesWritable object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + } + + public BytesWritable read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new BytesWritable(bytes); + } + + } +} diff --git packaging/pom.xml packaging/pom.xml index beddd1c..52ad6a2 100644 --- packaging/pom.xml +++ packaging/pom.xml @@ -287,6 +287,11 @@ hive-webhcat-java-client ${project.version} + + org.apache.hive + hive-kryo-registrator + ${project.version} + diff --git pom.xml pom.xml index b521cc4..64f4b92 100644 --- pom.xml +++ pom.xml @@ -55,6 +55,7 @@ llap-server shims spark-client + kryo-registrator storage-api testutils packaging diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 597fcab..eaffaff 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -70,7 +70,7 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex String master = sparkConf.get("spark.master"); if (master.equals("local") || master.startsWith("local[")) { // With local spark context, all user sessions share the same spark context. - return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf)); + return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf), hiveconf); } else { return new RemoteHiveSparkClient(hiveconf, sparkConf); } @@ -200,13 +200,20 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex } } + final boolean optShuffleSerDe = hiveConf.getBoolVar( + HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE); + Set classes = Sets.newHashSet( - Splitter.on(",").trimResults().omitEmptyStrings().split( - Strings.nullToEmpty(sparkConf.get("spark.kryo.classesToRegister")))); + Splitter.on(",").trimResults().omitEmptyStrings().split( + Strings.nullToEmpty(sparkConf.get("spark.kryo.classesToRegister")))); classes.add(Writable.class.getName()); classes.add(VectorizedRowBatch.class.getName()); - classes.add(BytesWritable.class.getName()); - classes.add(HiveKey.class.getName()); + if (!optShuffleSerDe) { + classes.add(HiveKey.class.getName()); + classes.add(BytesWritable.class.getName()); + } else { + sparkConf.put("spark.kryo.registrator", SparkClientUtilities.HIVE_KRYO_REG_NAME); + } sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes)); // set yarn queue name diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 72f2f91..8d71a9b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.io.File; +import java.io.FileNotFoundException; +import java.net.MalformedURLException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.hive.spark.client.SparkClientUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -66,9 +70,10 @@ private static LocalHiveSparkClient client; - public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf) { + public static synchronized LocalHiveSparkClient getInstance( + SparkConf sparkConf, HiveConf hiveConf) throws FileNotFoundException, MalformedURLException { if (client == null) { - client = new LocalHiveSparkClient(sparkConf); + client = new LocalHiveSparkClient(sparkConf, hiveConf); } return client; } @@ -81,8 +86,21 @@ public static synchronized LocalHiveSparkClient getInstance(SparkConf sparkConf) private final JobMetricsListener jobMetricsListener; - private LocalHiveSparkClient(SparkConf sparkConf) { + private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf) + throws FileNotFoundException, MalformedURLException { + String regJar = null; + // the registrator jar should already be in CP when not in test mode + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST)) { + String kryoReg = sparkConf.get("spark.kryo.registrator", ""); + if (kryoReg != null && kryoReg.contains(SparkClientUtilities.HIVE_KRYO_REG_NAME)) { + regJar = SparkClientUtilities.findKryoRegistratorJar(hiveConf); + SparkClientUtilities.addJarToContextLoader(new File(regJar)); + } + } sc = new JavaSparkContext(sparkConf); + if (regJar != null) { + sc.addJar(regJar); + } jobMetricsListener = new JobMetricsListener(); sc.sc().listenerBus().addListener(jobMetricsListener); } diff --git ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q new file mode 100644 index 0000000..2c4691a --- /dev/null +++ ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q @@ -0,0 +1,7 @@ +set hive.spark.optimize.shuffle.serde=true; + +set hive.spark.use.groupby.shuffle=true; +select key, count(*) from src group by key order by key limit 100; + +set hive.spark.use.groupby.shuffle=false; +select key, count(*) from src group by key order by key limit 100; diff --git ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out new file mode 100644 index 0000000..cd9c7bc --- /dev/null +++ ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out @@ -0,0 +1,216 @@ +PREHOOK: query: select key, count(*) from src group by key order by key limit 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key, count(*) from src group by key order by key limit 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 3 +10 1 +100 2 +103 2 +104 2 +105 1 +11 1 +111 1 +113 2 +114 1 +116 1 +118 2 +119 3 +12 2 +120 2 +125 2 +126 1 +128 3 +129 2 +131 1 +133 1 +134 2 +136 1 +137 2 +138 4 +143 1 +145 1 +146 2 +149 2 +15 2 +150 1 +152 2 +153 1 +155 1 +156 1 +157 1 +158 1 +160 1 +162 1 +163 1 +164 2 +165 2 +166 1 +167 3 +168 1 +169 4 +17 1 +170 1 +172 2 +174 2 +175 2 +176 2 +177 1 +178 1 +179 2 +18 2 +180 1 +181 1 +183 1 +186 1 +187 3 +189 1 +19 1 +190 1 +191 2 +192 1 +193 3 +194 1 +195 2 +196 1 +197 2 +199 3 +2 1 +20 1 +200 2 +201 1 +202 1 +203 2 +205 2 +207 2 +208 3 +209 2 +213 2 +214 1 +216 2 +217 2 +218 1 +219 2 +221 2 +222 1 +223 2 +224 2 +226 1 +228 1 +229 2 +230 5 +233 2 +235 1 +237 2 +238 2 +PREHOOK: query: select key, count(*) from src group by key order by key limit 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key, count(*) from src group by key order by key limit 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 3 +10 1 +100 2 +103 2 +104 2 +105 1 +11 1 +111 1 +113 2 +114 1 +116 1 +118 2 +119 3 +12 2 +120 2 +125 2 +126 1 +128 3 +129 2 +131 1 +133 1 +134 2 +136 1 +137 2 +138 4 +143 1 +145 1 +146 2 +149 2 +15 2 +150 1 +152 2 +153 1 +155 1 +156 1 +157 1 +158 1 +160 1 +162 1 +163 1 +164 2 +165 2 +166 1 +167 3 +168 1 +169 4 +17 1 +170 1 +172 2 +174 2 +175 2 +176 2 +177 1 +178 1 +179 2 +18 2 +180 1 +181 1 +183 1 +186 1 +187 3 +189 1 +19 1 +190 1 +191 2 +192 1 +193 3 +194 1 +195 2 +196 1 +197 2 +199 3 +2 1 +20 1 +200 2 +201 1 +202 1 +203 2 +205 2 +207 2 +208 3 +209 2 +213 2 +214 1 +216 2 +217 2 +218 1 +219 2 +221 2 +222 1 +223 2 +224 2 +226 1 +228 1 +229 2 +230 5 +233 2 +235 1 +237 2 +238 2 diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index e0ec3b7..00687d5 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.hive.spark.client.SparkClientUtilities.HIVE_KRYO_REG_NAME; import com.google.common.base.Charsets; import com.google.common.base.Joiner; @@ -442,6 +443,12 @@ public void run() { } } + String regStr = conf.get("spark.kryo.registrator"); + if (regStr != null && regStr.contains(HIVE_KRYO_REG_NAME)) { + argv.add("--jars"); + argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf)); + } + argv.add("--properties-file"); argv.add(properties.getAbsolutePath()); argv.add("--class"); diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index 210da2a..27bc198 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,9 +18,12 @@ package org.apache.hive.spark.client; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; import java.io.File; +import java.io.FileNotFoundException; +import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; @@ -29,17 +32,24 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.spark.SparkContext; +import org.apache.spark.util.MutableURLClassLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import scala.Option; public class SparkClientUtilities { protected static final transient Logger LOG = LoggerFactory.getLogger(SparkClientUtilities.class); private static final Map downloadedFiles = new ConcurrentHashMap<>(); + public static final String HIVE_KRYO_REG_NAME = "org.apache.hive.spark.HiveKryoRegistrator"; + private static final String HIVE_KRYO_REG_JAR_NAME = "hive-kryo-registrator"; + /** * Add new elements to the classpath. * @@ -74,7 +84,8 @@ /** * Create a URL from a string representing a path to a local file. * The path string can be just a path, or can start with file:/, file:/// - * @param path path string + * + * @param path path string * @return */ private static URL urlFromPathString(String path, Long timeStamp, @@ -136,4 +147,43 @@ public static String getDeployModeFromMaster(String master) { } return null; } + + public static String findKryoRegistratorJar(HiveConf conf) throws FileNotFoundException { + // find the jar in local maven repo for testing + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + String repo = System.getProperty("maven.local.repository"); + String version = System.getProperty("hive.version"); + String jarName = HIVE_KRYO_REG_JAR_NAME + "-" + version + ".jar"; + String[] parts = new String[]{repo, "org", "apache", "hive", + HIVE_KRYO_REG_JAR_NAME, version, jarName}; + String jar = Joiner.on(File.separator).join(parts); + if (!new File(jar).exists()) { + throw new FileNotFoundException(jar + " doesn't exist."); + } + return jar; + } + Option option = SparkContext.jarOfClass(SparkClientUtilities.class); + if (!option.isDefined()) { + throw new FileNotFoundException("Cannot find the path to hive-exec.jar"); + } + File path = new File(option.get()); + File[] jars = path.getParentFile().listFiles((dir, name) -> + name.startsWith(HIVE_KRYO_REG_JAR_NAME)); + if (jars != null && jars.length > 0) { + return jars[0].getAbsolutePath(); + } + throw new FileNotFoundException("Cannot find the " + HIVE_KRYO_REG_JAR_NAME + + " jar under " + path.getParent()); + } + + public static void addJarToContextLoader(File jar) throws MalformedURLException { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + if (loader instanceof MutableURLClassLoader) { + ((MutableURLClassLoader) loader).addURL(jar.toURI().toURL()); + } else { + URLClassLoader newLoader = + new URLClassLoader(new URL[]{jar.toURI().toURL()}, loader); + Thread.currentThread().setContextClassLoader(newLoader); + } + } }