diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java index dc5d148..a84660e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java @@ -63,15 +63,8 @@ public Iterator iterator() { @Override public void collect(HiveKey key, BytesWritable value) throws IOException { - lastRecordOutput.add(copyHiveKey(key), SparkUtilities.copyBytesWritable(value)); - } - - private static HiveKey copyHiveKey(HiveKey key) { - HiveKey copy = new HiveKey(); - copy.setDistKeyLength(key.getDistKeyLength()); - copy.setHashCode(key.hashCode()); - copy.set(key); - return copy; + lastRecordOutput.add(SparkUtilities.copyHiveKey(key), + SparkUtilities.copyBytesWritable(value)); } /** Process the given record. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index 9849b49..79baea7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -18,24 +18,26 @@ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.hadoop.hive.ql.io.HiveKey; -import org.apache.hadoop.io.BinaryComparable; -import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; import org.apache.spark.api.java.JavaPairRDD; - import com.google.common.base.Preconditions; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; -public class MapInput implements SparkTran { - private JavaPairRDD hadoopRDD; + +public class MapInput implements SparkTran { + private JavaPairRDD hadoopRDD; private boolean toCache; - public MapInput(JavaPairRDD hadoopRDD) { - this.hadoopRDD = hadoopRDD; + public MapInput(JavaPairRDD hadoopRDD) { + this(hadoopRDD, false); } - public MapInput(JavaPairRDD hadoopRDD, boolean toCache) { + public MapInput(JavaPairRDD hadoopRDD, boolean toCache) { this.hadoopRDD = hadoopRDD; this.toCache = toCache; } @@ -45,28 +47,28 @@ public void setToCache(boolean toCache) { } @Override - public JavaPairRDD transform( - JavaPairRDD input) { + public JavaPairRDD transform( + JavaPairRDD input) { Preconditions.checkArgument(input == null, "AssertionError: MapInput doesn't take any input"); - JavaPairRDD result = hadoopRDD; - if (toCache) { - result = result.mapToPair(new CopyFunction()); - return result.cache(); - } else { - return result; - } + return toCache ? hadoopRDD.mapToPair(new CopyFunction()).cache() : hadoopRDD; } - private static class CopyFunction implements PairFunction, - BytesWritable, BytesWritable> { + private static class CopyFunction implements PairFunction, + WritableComparable, Writable> { + + private transient Configuration conf; @Override - public Tuple2 call(Tuple2 tup) throws Exception { - // no need to copy key since it never get used in HiveMapFunction - BytesWritable value = SparkUtilities.copyBytesWritable(tup._2()); - return new Tuple2(tup._1(), value); + public Tuple2 + call(Tuple2 tuple) throws Exception { + if (conf == null) { + conf = new Configuration(); + } + + return new Tuple2(tuple._1(), + WritableUtils.clone(tuple._2(), conf)); } - } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 25a4515..00a6f3d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; @@ -57,7 +56,6 @@ import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; @@ -294,9 +292,11 @@ private MapInput generateMapInput(MapWork mapWork) JobConf jobConf = cloneJobConf(mapWork); Class ifClass = getInputFormat(jobConf, mapWork); - JavaPairRDD hadoopRDD = sc.hadoopRDD(jobConf, ifClass, + JavaPairRDD hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); - return new MapInput(hadoopRDD, false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/); + MapInput result = new MapInput(hadoopRDD, + false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/); + return result; } private ShuffleTran generate(SparkEdgeProperty edge, boolean needCache) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index 8a3dbf2..c413952 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; import org.apache.spark.api.java.JavaPairRDD; -public interface SparkTran { +public interface SparkTran { JavaPairRDD transform( JavaPairRDD input); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 0f21b46..4de3ad4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; /** @@ -24,6 +25,14 @@ */ public class SparkUtilities { + public static HiveKey copyHiveKey(HiveKey key) { + HiveKey copy = new HiveKey(); + copy.setDistKeyLength(key.getDistKeyLength()); + copy.setHashCode(key.hashCode()); + copy.set(key); + return copy; + } + public static BytesWritable copyBytesWritable(BytesWritable bw) { BytesWritable copy = new BytesWritable(); copy.set(bw);