diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java index 5d4414a..93674c1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; @@ -34,7 +35,8 @@ private final Set leafTrans = new HashSet(); private final Map> transGraph = new HashMap>(); private final Map> invertedTransGraph = new HashMap>(); - private final Map>> unionInputs = new HashMap>>(); + private final Map>> unionInputs = + new HashMap>>(); private final Map> mapInputs = new HashMap>(); public void addRootTranWithInput(SparkTran tran, JavaPairRDD input) { @@ -50,7 +52,8 @@ public void addRootTranWithInput(SparkTran tran, JavaPairRDD> resultRDDs = new HashMap>(); + Map> resultRDDs = + new HashMap>(); for (SparkTran tran : rootTrans) { // make sure all the root trans are MapTran if (!(tran instanceof MapTran)) { @@ -60,16 +63,16 @@ public void execute() throws IllegalStateException { if (input == null) { throw new IllegalStateException("input is missing for transformation!"); } - JavaPairRDD rdd = tran.transform(input); + JavaPairRDD rdd = tran.transform(input); while (getChildren(tran).size() > 0) { SparkTran childTran = getChildren(tran).get(0); if (childTran instanceof UnionTran) { - List> unionInputList = unionInputs + List> unionInputList = unionInputs .get(childTran); if (unionInputList == null) { // process the first union input RDD, cache it in the hash map - unionInputList = new LinkedList>(); + unionInputList = new LinkedList>(); unionInputList.add(rdd); unionInputs.put(childTran, unionInputList); break; @@ -79,7 +82,7 @@ public void execute() throws IllegalStateException { break; } else if (unionInputList.size() == this.getParents(childTran).size() - 1) { // process // process the last input RDD - for (JavaPairRDD inputRDD : unionInputList) { + for (JavaPairRDD inputRDD : unionInputList) { ((UnionTran) childTran).setOtherInput(inputRDD); rdd = childTran.transform(rdd); } @@ -94,7 +97,7 @@ public void execute() throws IllegalStateException { resultRDDs.put(tran, rdd); } } - for (JavaPairRDD resultRDD : resultRDDs.values()) { + for (JavaPairRDD resultRDD : resultRDDs.values()) { resultRDD.foreach(HiveVoidFunction.getInstance()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java index 3d06275..ecdac92 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java @@ -1,13 +1,14 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; public class GroupByShuffler implements SparkShuffler { @Override - public JavaPairRDD> shuffle( - JavaPairRDD input, int numPartitions) { + public JavaPairRDD> shuffle( + JavaPairRDD input, int numPartitions) { if (numPartitions > 0) { return input.groupByKey(numPartitions); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java index 89b5462..0df2580 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.OutputCollector; import scala.Tuple2; @@ -38,7 +39,7 @@ * through Iterator interface. */ public abstract class HiveBaseFunctionResultList implements - Iterable, OutputCollector, Serializable { + Iterable, OutputCollector, Serializable { private final Iterator inputIterator; private boolean isClosed = false; @@ -60,8 +61,16 @@ public Iterator iterator() { } @Override - public void collect(BytesWritable key, BytesWritable value) throws IOException { - lastRecordOutput.add(copyBytesWritable(key), copyBytesWritable(value)); + public void collect(HiveKey key, BytesWritable value) throws IOException { + lastRecordOutput.add(copyHiveKey(key), copyBytesWritable(value)); + } + + private static HiveKey copyHiveKey(HiveKey key) { + HiveKey copy = new HiveKey(); + copy.setDistKeyLength(key.getDistKeyLength()); + copy.setHashCode(key.hashCode()); + copy.set(key); + return copy; } private static BytesWritable copyBytesWritable(BytesWritable bw) { @@ -125,7 +134,7 @@ public boolean hasNext(){ } @Override - public Tuple2 next() { + public Tuple2 next() { if (hasNext()) { return lastRecordOutput.next(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java index 9725c4f..a6b9037 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -72,14 +73,16 @@ private void initRowContainer(Configuration conf) { } } - public void add(BytesWritable key, BytesWritable value) { + public void add(HiveKey key, BytesWritable value) { + byte[] hiveKeyBytes = KryoSerializer.serialize(key); + BytesWritable wrappedHiveKey = new BytesWritable(hiveKeyBytes); List row = new ArrayList(2); - row.add(key); + row.add(wrappedHiveKey); row.add(value); try { container.addRow(row); - } catch(HiveException ex) { + } catch (HiveException ex) { throw new RuntimeException("Failed to add KV pair to RowContainer", ex); } } @@ -97,7 +100,7 @@ public boolean hasNext() { return container.rowCount() > 0 && cursor < container.rowCount(); } - public Tuple2 next() { + public Tuple2 next() { Preconditions.checkState(hasNext()); try { @@ -108,8 +111,9 @@ public boolean hasNext() { row = container.next(); } cursor++; - return new Tuple2(row.get(0), row.get(1)); - } catch(HiveException ex) { + HiveKey key = KryoSerializer.deserialize(row.get(0).getBytes(), HiveKey.class); + return new Tuple2(key, row.get(1)); + } catch (HiveException ex) { throw new RuntimeException("Failed to get row from RowContainer", ex); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java index 4d6e197..5078a3a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java @@ -20,6 +20,7 @@ import java.util.Iterator; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; @@ -28,7 +29,7 @@ import scala.Tuple2; public class HiveMapFunction implements PairFlatMapFunction>, -BytesWritable, BytesWritable> { + HiveKey, BytesWritable> { private static final long serialVersionUID = 1L; private transient JobConf jobConf; @@ -40,7 +41,7 @@ public HiveMapFunction(byte[] buffer) { } @Override - public Iterable> + public Iterable> call(Iterator> it) throws Exception { if (jobConf == null) { jobConf = KryoSerializer.deserializeJobConf(this.buffer); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index 1dd5a93..0b8b7c9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -21,6 +21,7 @@ import java.util.Iterator; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; @@ -28,8 +29,8 @@ import scala.Tuple2; -public class HiveReduceFunction implements PairFlatMapFunction>>, -BytesWritable, BytesWritable> { +public class HiveReduceFunction implements PairFlatMapFunction< + Iterator>>, HiveKey, BytesWritable> { private static final long serialVersionUID = 1L; private transient JobConf jobConf; @@ -41,14 +42,15 @@ public HiveReduceFunction(byte[] buffer) { } @Override - public Iterable> - call(Iterator>> it) throws Exception { + public Iterable> + call(Iterator>> it) throws Exception { if (jobConf == null) { jobConf = KryoSerializer.deserializeJobConf(this.buffer); } SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler(); - HiveReduceFunctionResultList result = new HiveReduceFunctionResultList(jobConf, it, reducerRecordhandler); + HiveReduceFunctionResultList result = + new HiveReduceFunctionResultList(jobConf, it, reducerRecordhandler); reducerRecordhandler.init(jobConf, result, Reporter.NULL); return result; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java index c33bd1e..c153ad8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import scala.Tuple2; @@ -25,7 +26,7 @@ import java.util.Iterator; public class HiveReduceFunctionResultList extends - HiveBaseFunctionResultList>> { + HiveBaseFunctionResultList>> { private final SparkReduceRecordHandler reduceRecordHandler; /** @@ -35,14 +36,14 @@ * @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance. */ public HiveReduceFunctionResultList(Configuration conf, - Iterator>> inputIterator, + Iterator>> inputIterator, SparkReduceRecordHandler reducer) { super(conf, inputIterator); this.reduceRecordHandler = reducer; } @Override - protected void processNextRecord(Tuple2> inputRecord) + protected void processNextRecord(Tuple2> inputRecord) throws IOException { reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2().iterator()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java index c66cd80..67c567f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.function.VoidFunction; @@ -25,9 +26,8 @@ /** * Implementation of a voidFunction that does nothing. - * */ -public class HiveVoidFunction implements VoidFunction> { +public class HiveVoidFunction implements VoidFunction> { private static final long serialVersionUID = 1L; private static HiveVoidFunction instance = new HiveVoidFunction(); @@ -40,7 +40,7 @@ private HiveVoidFunction() { } @Override - public void call(Tuple2 t) throws Exception { + public void call(Tuple2 t) throws Exception { } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java index 55cc5bf..6288ff2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java @@ -35,25 +35,21 @@ import com.esotericsoftware.kryo.io.Output; public class KryoSerializer { - private static final Log LOG = LogFactory.getLog("KryoSerializer"); - private static final Kryo kryo = Utilities.runtimeSerializationKryo.get(); - - static { - kryo.register(ExecMapper.class); - } + private static final Log LOG = LogFactory.getLog(KryoSerializer.class); public static byte[] serialize(Object object) { ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); - kryo.writeObject(output, object); + Utilities.runtimeSerializationKryo.get().writeObject(output, object); output.close(); // close() also calls flush() return stream.toByteArray(); } - public static T deserialize(byte[] buffer,Class clazz) { - return kryo.readObject(new Input(new ByteArrayInputStream(buffer)), clazz); + public static T deserialize(byte[] buffer, Class clazz) { + return Utilities.runtimeSerializationKryo.get().readObject( + new Input(new ByteArrayInputStream(buffer)), clazz); } public static byte[] serializeJobConf(JobConf jobConf) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index b03a51c..e62527c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -18,14 +18,15 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; -public class MapTran implements SparkTran { +public class MapTran implements SparkTran { private HiveMapFunction mapFunc; @Override - public JavaPairRDD transform( + public JavaPairRDD transform( JavaPairRDD input) { return input.mapPartitionsToPair(mapFunc); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index 76b74e7..52ac724 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -18,17 +18,18 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; -public class ReduceTran implements SparkTran { +public class ReduceTran implements SparkTran { private SparkShuffler shuffler; private HiveReduceFunction reduceFunc; private int numPartitions; @Override - public JavaPairRDD transform( - JavaPairRDD input) { + public JavaPairRDD transform( + JavaPairRDD input) { return shuffler.shuffle(input, numPartitions).mapPartitionsToPair(reduceFunc); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 70e20b0..446e3cc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; @@ -28,9 +29,9 @@ public class SortByShuffler implements SparkShuffler { @Override - public JavaPairRDD> shuffle( - JavaPairRDD input, int numPartitions) { - JavaPairRDD rdd; + public JavaPairRDD> shuffle( + JavaPairRDD input, int numPartitions) { + JavaPairRDD rdd; if (numPartitions > 0) { rdd = input.sortByKey(true, numPartitions); } else { @@ -40,64 +41,64 @@ } private static class ShuffleFunction implements - PairFlatMapFunction>, - BytesWritable, Iterable> { + PairFlatMapFunction>, + HiveKey, Iterable> { // make eclipse happy private static final long serialVersionUID = 1L; @Override - public Iterable>> call( - final Iterator> it) throws Exception { + public Iterable>> call( + final Iterator> it) throws Exception { // Use input iterator to back returned iterable object. - final Iterator>> resultIt = - new Iterator>>() { - BytesWritable curKey = null; - List curValues = new ArrayList(); + final Iterator>> resultIt = + new Iterator>>() { + HiveKey curKey = null; + List curValues = new ArrayList(); - @Override - public boolean hasNext() { - return it.hasNext() || curKey != null; - } + @Override + public boolean hasNext() { + return it.hasNext() || curKey != null; + } - @Override - public Tuple2> next() { - // TODO: implement this by accumulating rows with the same key into a list. - // Note that this list needs to improved to prevent excessive memory usage, but this - // can be done in later phase. - while (it.hasNext()) { - Tuple2 pair = it.next(); - if (curKey != null && !curKey.equals(pair._1())) { - BytesWritable key = curKey; - List values = curValues; - curKey = pair._1(); - curValues = new ArrayList(); - curValues.add(pair._2()); - return new Tuple2>(key, values); + @Override + public Tuple2> next() { + // TODO: implement this by accumulating rows with the same key into a list. + // Note that this list needs to improved to prevent excessive memory usage, but this + // can be done in later phase. + while (it.hasNext()) { + Tuple2 pair = it.next(); + if (curKey != null && !curKey.equals(pair._1())) { + HiveKey key = curKey; + List values = curValues; + curKey = pair._1(); + curValues = new ArrayList(); + curValues.add(pair._2()); + return new Tuple2>(key, values); + } + curKey = pair._1(); + curValues.add(pair._2()); + } + if (curKey == null) { + throw new NoSuchElementException(); + } + // if we get here, this should be the last element we have + HiveKey key = curKey; + curKey = null; + return new Tuple2>(key, curValues); } - curKey = pair._1(); - curValues.add(pair._2()); - } - if (curKey == null) { - throw new NoSuchElementException(); - } - // if we get here, this should be the last element we have - BytesWritable key = curKey; - curKey = null; - return new Tuple2>(key, curValues); - } - @Override - public void remove() { - // Not implemented. - // throw Unsupported Method Invocation Exception. - throw new UnsupportedOperationException(); - } + @Override + public void remove() { + // Not implemented. + // throw Unsupported Method Invocation Exception. + throw new UnsupportedOperationException(); + } - }; + }; - return new Iterable>>() { + return new Iterable>>() { @Override - public Iterator>> iterator() { + public Iterator>> iterator() { return resultIt; } }; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java index bb4465a..3eea26a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java @@ -33,7 +33,7 @@ import java.util.Iterator; public abstract class SparkRecordHandler { - private static final Log LOG = LogFactory.getLog(SparkRecordHandler.class); + private final Log LOG = LogFactory.getLog(this.getClass()); // used to log memory usage periodically protected final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); @@ -83,7 +83,7 @@ protected void logMemoryInfo() { rowNumber++; if (rowNumber == nextLogThreshold) { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("ExecReducer: processing " + rowNumber + LOG.info("processing " + rowNumber + " rows: used memory = " + used_memory); nextLogThreshold = getNextLogThreshold(rowNumber); } @@ -96,7 +96,7 @@ protected void logMemoryInfo() { */ protected void logCloseInfo() { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("ExecMapper: processed " + rowNumber + " rows: used memory = " + LOG.info("processed " + rowNumber + " rows: used memory = " + used_memory); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java index 2475359..53845a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java @@ -18,12 +18,13 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; public interface SparkShuffler { - JavaPairRDD> shuffle( - JavaPairRDD input, int numPartitions); + JavaPairRDD> shuffle( + JavaPairRDD input, int numPartitions); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index 19894b0..e770158 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -21,7 +21,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; -public interface SparkTran { - JavaPairRDD transform( - JavaPairRDD input); +public interface SparkTran { + JavaPairRDD transform( + JavaPairRDD input); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java index 5ec7d0f..40f22a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java @@ -18,23 +18,24 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; -public class UnionTran implements SparkTran { - JavaPairRDD otherInput; +public class UnionTran implements SparkTran { + JavaPairRDD otherInput; @Override - public JavaPairRDD transform( - JavaPairRDD input) { + public JavaPairRDD transform( + JavaPairRDD input) { return input.union(otherInput); } - public void setOtherInput(JavaPairRDD otherInput) { + public void setOtherInput(JavaPairRDD otherInput) { this.otherInput = otherInput; } - public JavaPairRDD getOtherInput() { + public JavaPairRDD getOtherInput() { return this.otherInput; } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java index 64a4c5a..496a11f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.junit.Test; import scala.Tuple2; @@ -31,13 +32,13 @@ public void testSimple() throws Exception { HiveConf conf = new HiveConf(); HiveKVResultCache cache = new HiveKVResultCache(conf); - BytesWritable key = new BytesWritable("key".getBytes()); + HiveKey key = new HiveKey("key".getBytes(), "key".hashCode()); BytesWritable value = new BytesWritable("value".getBytes()); cache.add(key, value); assertTrue("KV result cache should have at least one element", cache.hasNext()); - Tuple2 row = cache.next(); + Tuple2 row = cache.next(); assertTrue("Incorrect key", row._1().equals(key)); assertTrue("Incorrect value", row._2().equals(value)); @@ -64,7 +65,7 @@ private void testSpillingHelper(HiveKVResultCache cache, int numRecords) { for(int i=0; i row = cache.next(); + Tuple2 row = cache.next(); assertTrue("Unexpected key at position: " + recordsSeen, new String(row._1().getBytes()).equals(key)); assertTrue("Unexpected value at position: " + recordsSeen,