diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 73e0290..2f1d787 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3306,6 +3306,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "If this is set to true, mapjoin optimization in Hive/Spark will use statistics from\n" + "TableScan operators at the root of operator tree, instead of parent ReduceSink\n" + "operators of the Join operator."), + SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", false, + "If this is set to true, Hive on Spark will register custom serializers for data types\n" + + "in shuffle. This should result in less shuffled data."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), "Timeout for requests from Hive client to remote Spark driver."), diff --git a/ql/pom.xml b/ql/pom.xml index 40a216b..07fd3cf 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -891,16 +891,6 @@ joda-time:joda-time - - - com.esotericsoftware - org.apache.hive.com.esotericsoftware - - - org.objenesis - org.apache.hive.org.objenesis - - diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 6e9ba7c..6fca8e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -21,17 +21,23 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.commons.compress.utils.CharsetNames; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hive.common.LogUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.spark.serializer.KryoRegistrator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -198,15 +204,31 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex } } + final boolean optShuffleSerDe = hiveConf.getBoolVar( + HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE); + Set classes = Sets.newHashSet( - Splitter.on(",").trimResults().omitEmptyStrings().split( - Strings.nullToEmpty(sparkConf.get("spark.kryo.classesToRegister")))); + Splitter.on(",").trimResults().omitEmptyStrings().split( + Strings.nullToEmpty(sparkConf.get("spark.kryo.classesToRegister")))); classes.add(Writable.class.getName()); classes.add(VectorizedRowBatch.class.getName()); - classes.add(BytesWritable.class.getName()); - classes.add(HiveKey.class.getName()); + if (!optShuffleSerDe) { + classes.add(HiveKey.class.getName()); + classes.add(BytesWritable.class.getName()); + } sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes)); + if (optShuffleSerDe) { + List registrators = new ArrayList<>(2); + if (hiveConf.getBoolVar(HiveConf.ConfVars.SPARK_USE_GROUPBY_SHUFFLE)) { + registrators.add(HiveKeyRegistrator.class.getName()); + } else { + registrators.add(HiveKeyRegistratorNoGroup.class.getName()); + } + registrators.add(BytesWritableRegistrator.class.getName()); + sparkConf.put("spark.kryo.registrator", Joiner.on(",").join(registrators)); + } + // set yarn queue name final String sparkQueueNameKey = "spark.yarn.queue"; if (SparkClientUtilities.isYarnMaster(sparkMaster) && hiveConf.get(sparkQueueNameKey) == null) { @@ -245,4 +267,76 @@ static SparkConf generateSparkConf(Map conf) { } return sparkConf; } + + public static class HiveKeyRegistrator implements KryoRegistrator { + + @Override + public void registerClasses(com.esotericsoftware.kryo.Kryo kryo) { + kryo.register(HiveKey.class, new HiveKeySerializer(true)); + } + } + + public static class HiveKeyRegistratorNoGroup implements KryoRegistrator { + + @Override + public void registerClasses(Kryo kryo) { + kryo.register(HiveKey.class, new HiveKeySerializer(false)); + } + } + + public static class BytesWritableRegistrator implements KryoRegistrator { + + @Override + public void registerClasses(Kryo kryo) { + kryo.register(BytesWritable.class, new BytesWritableSerializer()); + } + } + + private static class HiveKeySerializer extends Serializer { + + // in case of group by shuffle, the hash code is still needed after deserialization + private final boolean useGroupBy; + + HiveKeySerializer(boolean useGroupBy) { + this.useGroupBy = useGroupBy; + } + + @Override + public void write(Kryo kryo, Output output, HiveKey object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + if (useGroupBy) { + output.writeVarInt(object.hashCode(), false); + } + } + + @Override + public HiveKey read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + HiveKey hiveKey = new HiveKey(bytes); + if (useGroupBy) { + hiveKey.setHashCode(input.readVarInt(false)); + } + return hiveKey; + } + } + + private static class BytesWritableSerializer extends Serializer { + + @Override + public void write(Kryo kryo, Output output, BytesWritable object) { + output.writeVarInt(object.getLength(), true); + output.write(object.getBytes(), 0, object.getLength()); + } + + @Override + public BytesWritable read(Kryo kryo, Input input, Class type) { + int len = input.readVarInt(true); + byte[] bytes = new byte[len]; + input.readBytes(bytes); + return new BytesWritable(bytes); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java index f9cf2bd..052381e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java @@ -39,11 +39,16 @@ public HiveKey() { } public HiveKey(byte[] bytes, int hashcode) { - super(bytes); + this(bytes); hashCode = hashcode; hashCodeValid = true; } + public HiveKey(byte[] bytes) { + super(bytes); + hashCodeValid = false; + } + public void setHashCode(int myHashCode) { hashCodeValid = true; hashCode = myHashCode;