commit 3b83677d6771eeb838ba1dc91ba6809fb4102450 Author: Sahil Takiar Date: Thu Jun 28 17:50:41 2018 -0700 HIVE-20032: Don't serialize hashCode when groupByShuffle and RDD cacheing is disabled diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoNoHashCodeRegistrator.java b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoNoHashCodeRegistrator.java new file mode 100644 index 0000000000..ddb6d56091 --- /dev/null +++ b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoNoHashCodeRegistrator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.io.BytesWritable; + +import org.apache.spark.serializer.KryoRegistrator; + + +/** + * Similar to {@link HiveKryoRegistrator} except it does not serialize hash codes. + */ +public class HiveKryoNoHashCodeRegistrator implements KryoRegistrator { + + @Override + public void registerClasses(Kryo kryo) { + kryo.register(HiveKey.class, new HiveKeyNoHashCodeSerializer()); + kryo.register(BytesWritable.class, new HiveKryoRegistrator.BytesWritableSerializer()); + } + + static class HiveKeyNoHashCodeSerializer extends Serializer { + + public void write(Kryo kryo, Output output, HiveKey object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + } + + public HiveKey read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new HiveKey(bytes); + } + } +} diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java index 838ad99982..599014b360 100644 --- a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java +++ b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java @@ -32,13 +32,14 @@ * problems because kryo is relocated in hive-exec. */ public class HiveKryoRegistrator implements KryoRegistrator { + @Override public void registerClasses(Kryo kryo) { kryo.register(HiveKey.class, new HiveKeySerializer()); kryo.register(BytesWritable.class, new BytesWritableSerializer()); } - private static class HiveKeySerializer extends Serializer { + static class HiveKeySerializer extends Serializer { public void write(Kryo kryo, Output output, HiveKey object) { output.writeVarInt(object.getLength(), true); @@ -54,7 +55,7 @@ public HiveKey read(Kryo kryo, Input input, Class type) { } } - private static class BytesWritableSerializer extends Serializer { + static class BytesWritableSerializer extends Serializer { public void write(Kryo kryo, Output output, BytesWritable object) { output.writeVarInt(object.getLength(), true); @@ -67,6 +68,5 @@ public BytesWritable read(Kryo kryo, Input input, Class type) { input.readBytes(bytes); return new BytesWritable(bytes); } - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 5ed5d4214e..b236c47160 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -212,7 +212,13 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String se classes.add(HiveKey.class.getName()); classes.add(BytesWritable.class.getName()); } else { - sparkConf.put("spark.kryo.registrator", SparkClientUtilities.HIVE_KRYO_REG_NAME); + if (!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION) && + !hiveConf.getBoolVar(HiveConf.ConfVars.SPARK_USE_GROUPBY_SHUFFLE)) { + sparkConf.put("spark.kryo.registrator", + SparkClientUtilities.HIVE_KRYO_NO_HASH_CODE_REG_NAME); + } else { + sparkConf.put("spark.kryo.registrator", SparkClientUtilities.HIVE_KRYO_REG_NAME); + } } sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 72ff53e3bd..37bb33c2e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -94,7 +94,8 @@ private LocalHiveSparkClient(SparkConf sparkConf, HiveConf hiveConf) // the registrator jar should already be in CP when not in test mode if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST)) { String kryoReg = sparkConf.get("spark.kryo.registrator", ""); - if (SparkClientUtilities.HIVE_KRYO_REG_NAME.equals(kryoReg)) { + if (SparkClientUtilities.HIVE_KRYO_REG_NAME.equals(kryoReg) || + SparkClientUtilities.HIVE_KRYO_NO_HASH_CODE_REG_NAME.equals(kryoReg)) { regJar = SparkClientUtilities.findKryoRegistratorJar(hiveConf); SparkClientUtilities.addJarToContextLoader(new File(regJar)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java index a1f944621b..b6f78d8abc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java @@ -38,6 +38,10 @@ public HiveKey() { hashCodeValid = false; } + public HiveKey(byte[] bytes) { + super(bytes); + } + public HiveKey(byte[] bytes, int hashcode) { super(bytes); hashCode = hashcode; diff --git a/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q b/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q index 94d4d7a8a2..c208e29cf4 100644 --- a/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q +++ b/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q @@ -6,3 +6,8 @@ select key, count(*) from src group by key order by key limit 100; set hive.spark.use.groupby.shuffle=false; select key, count(*) from src group by key order by key limit 100; + +-- Disable dynamic RDD caching, which will trigger a custom Kryo Registrator +-- that doesn't serialize hash codes +set hive.combine.equivalent.work.optimization=false; +select key, count(*) from src group by key order by key limit 100; diff --git a/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out b/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out index ba07b44bcb..059ee85aca 100644 --- a/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out +++ b/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out @@ -214,3 +214,111 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 235 1 237 2 238 2 +PREHOOK: query: select key, count(*) from src group by key order by key limit 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select key, count(*) from src group by key order by key limit 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 3 +10 1 +100 2 +103 2 +104 2 +105 1 +11 1 +111 1 +113 2 +114 1 +116 1 +118 2 +119 3 +12 2 +120 2 +125 2 +126 1 +128 3 +129 2 +131 1 +133 1 +134 2 +136 1 +137 2 +138 4 +143 1 +145 1 +146 2 +149 2 +15 2 +150 1 +152 2 +153 1 +155 1 +156 1 +157 1 +158 1 +160 1 +162 1 +163 1 +164 2 +165 2 +166 1 +167 3 +168 1 +169 4 +17 1 +170 1 +172 2 +174 2 +175 2 +176 2 +177 1 +178 1 +179 2 +18 2 +180 1 +181 1 +183 1 +186 1 +187 3 +189 1 +19 1 +190 1 +191 2 +192 1 +193 3 +194 1 +195 2 +196 1 +197 2 +199 3 +2 1 +20 1 +200 2 +201 1 +202 1 +203 2 +205 2 +207 2 +208 3 +209 2 +213 2 +214 1 +216 2 +217 2 +218 1 +219 2 +221 2 +222 1 +223 2 +224 2 +226 1 +228 1 +229 2 +230 5 +233 2 +235 1 +237 2 +238 2 diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java index ed9222cfec..36c7fbe2cb 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.hive.spark.client.SparkClientUtilities.HIVE_KRYO_NO_HASH_CODE_REG_NAME; import static org.apache.hive.spark.client.SparkClientUtilities.HIVE_KRYO_REG_NAME; import com.google.common.base.Charsets; @@ -366,7 +367,7 @@ public void cancel(String jobId) { } String regStr = conf.get("spark.kryo.registrator"); - if (HIVE_KRYO_REG_NAME.equals(regStr)) { + if (HIVE_KRYO_REG_NAME.equals(regStr) || HIVE_KRYO_NO_HASH_CODE_REG_NAME.equals(regStr)) { addJars(SparkClientUtilities.findKryoRegistratorJar(hiveConf)); } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index d3cb3dd7a1..f0db90bf79 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -48,6 +48,7 @@ private static final Map downloadedFiles = new ConcurrentHashMap<>(); public static final String HIVE_KRYO_REG_NAME = "org.apache.hive.spark.HiveKryoRegistrator"; + public static final String HIVE_KRYO_NO_HASH_CODE_REG_NAME = "org.apache.hive.spark.HiveKryoNoHashCodeRegistrator"; private static final String HIVE_KRYO_REG_JAR_NAME = "hive-kryo-registrator"; /**