commit d859de430ff4a4bba9ae88a39fe079a3c2bf1a59 Author: Sahil Takiar Date: Thu Jun 28 17:50:41 2018 -0700 HIVE-20032: Don't serialize hashCode when groupByShuffle and RDD cacheing is disabled diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4ed1636591..c9446c9562 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4233,7 +4233,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If this is set to true, mapjoin optimization in Hive/Spark will use statistics from\n" + "TableScan operators at the root of operator tree, instead of parent ReduceSink\n" + "operators of the Join operator."), - SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", false, + SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", true, "If this is set to true, Hive on Spark will register custom serializers for data types\n" + "in shuffle. This should result in less shuffled data."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java new file mode 100644 index 0000000000..366c74cd05 --- /dev/null +++ b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.SparkConf; +import org.apache.spark.serializer.KryoSerializer; + + +/** + * A {@link KryoSerializer} that does not serialize hash codes while serializing a + * {@link HiveKey}. This decreases the amount of data to be shuffled during a Spark shuffle. + */ +public class NoHashCodeKryoSerializer extends KryoSerializer { + + private static final long serialVersionUID = 3350910170041648022L; + + public NoHashCodeKryoSerializer(SparkConf conf) { + super(conf); + } + + @Override + public Kryo newKryo() { + Kryo kryo = super.newKryo(); + kryo.register(HiveKey.class, new HiveKeySerializer()); + kryo.register(BytesWritable.class, new BytesWritableSerializer()); + return kryo; + } + + private static class HiveKeySerializer extends Serializer { + + public void write(Kryo kryo, Output output, HiveKey object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + } + + public HiveKey read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new HiveKey(bytes); + } + } + + static class BytesWritableSerializer extends Serializer { + + public void write(Kryo kryo, Output output, BytesWritable object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + } + + public BytesWritable read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new BytesWritable(bytes); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 22b598f0b4..1bf5a562c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -23,19 +23,24 @@ import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.rdd.ShuffledRDD; +import org.apache.spark.serializer.KryoSerializer; import org.apache.spark.storage.StorageLevel; + public class SortByShuffler implements SparkShuffler { private final boolean totalOrder; private final SparkPlan sparkPlan; + private final KryoSerializer shuffleSerializer; /** * @param totalOrder whether this shuffler provides total order shuffle. */ - public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { + public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan, KryoSerializer shuffleSerializer) { this.totalOrder = totalOrder; this.sparkPlan = sparkPlan; + this.shuffleSerializer = shuffleSerializer; } @Override @@ -56,6 +61,11 @@ public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { Partitioner partitioner = new HashPartitioner(numPartitions); rdd = input.repartitionAndSortWithinPartitions(partitioner); } + if (shuffleSerializer != null) { + if (rdd.rdd() instanceof ShuffledRDD) { + ((ShuffledRDD) rdd.rdd()).setSerializer(shuffleSerializer); + } + } return rdd; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index d71d705c78..927091edfd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -19,12 +19,15 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.spark.SparkConf; import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,10 +71,12 @@ @SuppressWarnings("rawtypes") public class SparkPlanGenerator { + private static final String CLASS_NAME = SparkPlanGenerator.class.getName(); - private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final Logger LOG = LoggerFactory.getLogger(SparkPlanGenerator.class); + private static final String HIVE_SHUFFLE_KRYO_SERIALIZER = "org.apache.hive.spark.NoHashCodeKryoSerializer"; + private final PerfLogger perfLogger = SessionState.getPerfLogger(); private final JavaSparkContext sc; private final JobConf jobConf; private final Context context; @@ -82,6 +87,7 @@ private final Map workToParentWorkTranMap; // a map from each BaseWork to its cloned JobConf private final Map workToJobConf; + private final org.apache.spark.serializer.KryoSerializer shuffleSerializer; public SparkPlanGenerator( JavaSparkContext sc, @@ -98,6 +104,7 @@ public SparkPlanGenerator( this.workToParentWorkTranMap = new HashMap(); this.sparkReporter = sparkReporter; this.workToJobConf = new HashMap(); + this.shuffleSerializer = getKyroSerializerForShuffle(sc.getConf()); } public SparkPlan generate(SparkWork sparkWork) throws Exception { @@ -227,9 +234,9 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); SparkShuffler shuffler; if (edge.isMRShuffle()) { - shuffler = new SortByShuffler(false, sparkPlan); + shuffler = new SortByShuffler(false, sparkPlan, shuffleSerializer); } else if (edge.isShuffleSort()) { - shuffler = new SortByShuffler(true, sparkPlan); + shuffler = new SortByShuffler(true, sparkPlan, shuffleSerializer); } else { shuffler = new GroupByShuffler(); } @@ -375,4 +382,19 @@ private void initStatsPublisher(BaseWork work) throws HiveException { } } + private org.apache.spark.serializer.KryoSerializer getKyroSerializerForShuffle( + SparkConf sparkConf) { + if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE)) { + try { + return (org.apache.spark.serializer.KryoSerializer) Class.forName( + HIVE_SHUFFLE_KRYO_SERIALIZER).getConstructor( + SparkConf.class).newInstance(sparkConf); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) { + throw new IllegalStateException("Unable to create kryo serializer for shuffle RDDs using " + + "class " + HIVE_SHUFFLE_KRYO_SERIALIZER, e); + } + } else { + return null; + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java index a1f944621b..b6f78d8abc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java @@ -38,6 +38,10 @@ public HiveKey() { hashCodeValid = false; } + public HiveKey(byte[] bytes) { + super(bytes); + } + public HiveKey(byte[] bytes, int hashcode) { super(bytes); hashCode = hashcode; diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java index b2b5201c98..da74faf4c2 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java @@ -305,6 +305,8 @@ public void cancel(String jobId) { } } + allProps.put(DRIVER_EXTRA_CLASSPATH, allProps.get(DRIVER_EXTRA_CLASSPATH) + ":" + "hive-kryo-registrator-4.0.0-SNAPSHOT.jar"); + Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8); try { allProps.store(writer, "Spark Context configuration");