diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java index dc5d148..a84660e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java @@ -63,15 +63,8 @@ public Iterator iterator() { @Override public void collect(HiveKey key, BytesWritable value) throws IOException { - lastRecordOutput.add(copyHiveKey(key), SparkUtilities.copyBytesWritable(value)); - } - - private static HiveKey copyHiveKey(HiveKey key) { - HiveKey copy = new HiveKey(); - copy.setDistKeyLength(key.getDistKeyLength()); - copy.setHashCode(key.hashCode()); - copy.set(key); - return copy; + lastRecordOutput.add(SparkUtilities.copyHiveKey(key), + SparkUtilities.copyBytesWritable(value)); } /** Process the given record. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveCopyFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveCopyFunction.java new file mode 100644 index 0000000..9516959 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveCopyFunction.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +public class HiveCopyFunction implements PairFunction, + WritableComparable, Writable> { + + private transient JobConf jobConf; + private byte[] buffer; + + public HiveCopyFunction(byte[] jobConfBuffer) { + this.buffer = jobConfBuffer; + } + + @Override + public Tuple2 + call(Tuple2 tuple) throws Exception { + if (jobConf == null) { + jobConf = KryoSerializer.deserializeJobConf(buffer); + } + + return new Tuple2(tuple._1(), + WritableUtils.clone(tuple._2(), jobConf)); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index 9849b49..5ac5137 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -18,24 +18,23 @@ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.hadoop.hive.ql.io.HiveKey; -import org.apache.hadoop.io.BinaryComparable; -import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.spark.api.java.JavaPairRDD; - import com.google.common.base.Preconditions; -import org.apache.spark.api.java.function.PairFunction; -import scala.Tuple2; -public class MapInput implements SparkTran { - private JavaPairRDD hadoopRDD; + +public class MapInput implements SparkTran { + private JavaPairRDD hadoopRDD; private boolean toCache; + private HiveCopyFunction copyFunction; - public MapInput(JavaPairRDD hadoopRDD) { - this.hadoopRDD = hadoopRDD; + public MapInput(JavaPairRDD hadoopRDD) { + this(hadoopRDD, false); } - public MapInput(JavaPairRDD hadoopRDD, boolean toCache) { + public MapInput(JavaPairRDD hadoopRDD, boolean toCache) { this.hadoopRDD = hadoopRDD; this.toCache = toCache; } @@ -44,29 +43,24 @@ public void setToCache(boolean toCache) { this.toCache = toCache; } + public void setCopyFunction(HiveCopyFunction copyFunction) { + this.copyFunction = copyFunction; + } + @Override - public JavaPairRDD transform( - JavaPairRDD input) { + public JavaPairRDD transform( + JavaPairRDD input) { Preconditions.checkArgument(input == null, "AssertionError: MapInput doesn't take any input"); - JavaPairRDD result = hadoopRDD; + JavaPairRDD result = hadoopRDD; if (toCache) { - result = result.mapToPair(new CopyFunction()); + Preconditions.checkArgument(copyFunction == null, + "AssertionError: when toCache is true, copyFunction shouldn't be null"); + result = result.mapToPair(copyFunction); return result.cache(); } else { return result; } } - private static class CopyFunction implements PairFunction, - BytesWritable, BytesWritable> { - - @Override - public Tuple2 call(Tuple2 tup) throws Exception { - // no need to copy key since it never get used in HiveMapFunction - BytesWritable value = SparkUtilities.copyBytesWritable(tup._2()); - return new Tuple2(tup._1(), value); - } - } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 25a4515..b1bf4ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; @@ -57,7 +56,6 @@ import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; @@ -294,9 +292,17 @@ private MapInput generateMapInput(MapWork mapWork) JobConf jobConf = cloneJobConf(mapWork); Class ifClass = getInputFormat(jobConf, mapWork); - JavaPairRDD hadoopRDD = sc.hadoopRDD(jobConf, ifClass, + JavaPairRDD hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); - return new MapInput(hadoopRDD, false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/); + boolean needCache = cloneToWork.containsKey(mapWork); + MapInput result = new MapInput(hadoopRDD, + false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/); + if (needCache) { + byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); + HiveCopyFunction copyFunc = new HiveCopyFunction(confBytes); + result.setCopyFunction(copyFunc); + } + return result; } private ShuffleTran generate(SparkEdgeProperty edge, boolean needCache) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index 8a3dbf2..c413952 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.spark.api.java.JavaPairRDD; -public interface SparkTran { +public interface SparkTran { JavaPairRDD transform( JavaPairRDD input); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 0f21b46..4de3ad4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; /** @@ -24,6 +25,14 @@ */ public class SparkUtilities { + public static HiveKey copyHiveKey(HiveKey key) { + HiveKey copy = new HiveKey(); + copy.setDistKeyLength(key.getDistKeyLength()); + copy.setHashCode(key.hashCode()); + copy.set(key); + return copy; + } + public static BytesWritable copyBytesWritable(BytesWritable bw) { BytesWritable copy = new BytesWritable(); copy.set(bw);