commit e15b10ccc6ecac6ec8c81aa4cf2895200f997ad8 Author: Sahil Takiar Date: Thu Jun 28 17:50:41 2018 -0700 HIVE-20032: Don't serialize hashCode when groupByShuffle and RDD cacheing is disabled diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6ea68c3500..5bc751a1c0 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4241,7 +4241,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If this is set to true, mapjoin optimization in Hive/Spark will use statistics from\n" + "TableScan operators at the root of operator tree, instead of parent ReduceSink\n" + "operators of the Join operator."), - SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", false, + SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", true, "If this is set to true, Hive on Spark will register custom serializers for data types\n" + "in shuffle. This should result in less shuffled data."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java index 838ad99982..758f0812b8 100644 --- a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java +++ b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java @@ -32,6 +32,7 @@ * problems because kryo is relocated in hive-exec. */ public class HiveKryoRegistrator implements KryoRegistrator { + @Override public void registerClasses(Kryo kryo) { kryo.register(HiveKey.class, new HiveKeySerializer()); diff --git a/ql/pom.xml b/ql/pom.xml index 0c181e515c..6c83102caa 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -948,16 +948,6 @@ joda-time:joda-time - - - com.esotericsoftware - org.apache.hive.com.esotericsoftware - - - org.objenesis - org.apache.hive.org.objenesis - - diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java new file mode 100644 index 0000000000..ee329b0fa9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.SparkConf; +import org.apache.spark.serializer.KryoSerializer; + +class ShuffleKryoSerializer extends KryoSerializer { + + private static final long serialVersionUID = 3350910170041648022L; + + ShuffleKryoSerializer(SparkConf conf) { + super(conf); + } + + @Override + public Kryo newKryo() { + Kryo kryo = super.newKryo(); + kryo.register(HiveKey.class, new HiveKeySerializer()); + kryo.register(BytesWritable.class, new BytesWritableSerializer()); + return kryo; + } + + private static class HiveKeySerializer extends Serializer { + + public void write(Kryo kryo, Output output, HiveKey object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + } + + public HiveKey read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new HiveKey(bytes); + } + } + + static class BytesWritableSerializer extends Serializer { + + public void write(Kryo kryo, Output output, BytesWritable object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + } + + public BytesWritable read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new BytesWritable(bytes); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 22b598f0b4..51a2ac8ac0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -22,20 +22,25 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.rdd.ShuffledRDD; import org.apache.spark.storage.StorageLevel; public class SortByShuffler implements SparkShuffler { private final boolean totalOrder; private final SparkPlan sparkPlan; + private final SparkConf sparkConf; /** * @param totalOrder whether this shuffler provides total order shuffle. + * @param sparkConf */ - public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { + public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan, SparkConf sparkConf) { this.totalOrder = totalOrder; this.sparkPlan = sparkPlan; + this.sparkConf = sparkConf; } @Override @@ -56,6 +61,9 @@ public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { Partitioner partitioner = new HashPartitioner(numPartitions); rdd = input.repartitionAndSortWithinPartitions(partitioner); } + if (rdd.rdd() instanceof ShuffledRDD) { + ((ShuffledRDD) rdd.rdd()).setSerializer(new ShuffleKryoSerializer(this.sparkConf)); + } return rdd; } @@ -63,5 +71,4 @@ public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { public String getName() { return "SortBy"; } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index d71d705c78..e1c640db41 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -227,9 +227,9 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); SparkShuffler shuffler; if (edge.isMRShuffle()) { - shuffler = new SortByShuffler(false, sparkPlan); + shuffler = new SortByShuffler(false, sparkPlan, sc.getConf()); } else if (edge.isShuffleSort()) { - shuffler = new SortByShuffler(true, sparkPlan); + shuffler = new SortByShuffler(true, sparkPlan, sc.getConf()); } else { shuffler = new GroupByShuffler(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java index a1f944621b..b6f78d8abc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java @@ -38,6 +38,10 @@ public HiveKey() { hashCodeValid = false; } + public HiveKey(byte[] bytes) { + super(bytes); + } + public HiveKey(byte[] bytes, int hashcode) { super(bytes); hashCode = hashcode; diff --git a/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q b/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q index 94d4d7a8a2..c208e29cf4 100644 --- a/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q +++ b/ql/src/test/queries/clientpositive/spark_opt_shuffle_serde.q @@ -6,3 +6,8 @@ select key, count(*) from src group by key order by key limit 100; set hive.spark.use.groupby.shuffle=false; select key, count(*) from src group by key order by key limit 100; + +-- Disable dynamic RDD caching, which will trigger a custom Kryo Registrator +-- that doesn't serialize hash codes +set hive.combine.equivalent.work.optimization=false; +select key, count(*) from src group by key order by key limit 100; diff --git a/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out b/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out index ba07b44bcb..059ee85aca 100644 --- a/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out +++ b/ql/src/test/results/clientpositive/spark/spark_opt_shuffle_serde.q.out @@ -214,3 +214,111 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 235 1 237 2 238 2 +PREHOOK: query: select key, count(*) from src group by key order by key limit 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select key, count(*) from src group by key order by key limit 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 3 +10 1 +100 2 +103 2 +104 2 +105 1 +11 1 +111 1 +113 2 +114 1 +116 1 +118 2 +119 3 +12 2 +120 2 +125 2 +126 1 +128 3 +129 2 +131 1 +133 1 +134 2 +136 1 +137 2 +138 4 +143 1 +145 1 +146 2 +149 2 +15 2 +150 1 +152 2 +153 1 +155 1 +156 1 +157 1 +158 1 +160 1 +162 1 +163 1 +164 2 +165 2 +166 1 +167 3 +168 1 +169 4 +17 1 +170 1 +172 2 +174 2 +175 2 +176 2 +177 1 +178 1 +179 2 +18 2 +180 1 +181 1 +183 1 +186 1 +187 3 +189 1 +19 1 +190 1 +191 2 +192 1 +193 3 +194 1 +195 2 +196 1 +197 2 +199 3 +2 1 +20 1 +200 2 +201 1 +202 1 +203 2 +205 2 +207 2 +208 3 +209 2 +213 2 +214 1 +216 2 +217 2 +218 1 +219 2 +221 2 +222 1 +223 2 +224 2 +226 1 +228 1 +229 2 +230 5 +233 2 +235 1 +237 2 +238 2