diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 20ea977..0bb554a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -88,6 +89,14 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { mo = new MapOperator(); } mo.setConf(mrwork); + + // If the current thread's IOContext is not initialized (because it's reading from a + // cached input HadoopRDD), copy from the saved result. + IOContext ioContext = IOContext.get(); + if (ioContext.getInputPath() == null) { + IOContext.copy(ioContext, IOContext.getMap().get(SparkUtilities.MAP_IO_CONTEXT)); + } + // initialize map operator mo.setChildren(job); l4j.info(mo.dump(0)); @@ -199,6 +208,10 @@ public void close() { } finally { MapredContext.close(); Utilities.clearWorkMap(); + + // It's possible that a thread get reused for different queries, so we need to + // reset the input path. + IOContext.get().setInputPath(null); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 00a6f3d..b2c6cf3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -294,8 +294,7 @@ private MapInput generateMapInput(MapWork mapWork) JavaPairRDD hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); - MapInput result = new MapInput(hadoopRDD, - false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/); + MapInput result = new MapInput(hadoopRDD, cloneToWork.containsKey(mapWork)); return result; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 4de3ad4..37761c9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -25,6 +25,9 @@ */ public class SparkUtilities { + // Used to save and retrieve IOContext for multi-insertion. + public static final String MAP_IO_CONTEXT = "MAP_IO_CONTEXT"; + public static HiveKey copyHiveKey(HiveKey key) { HiveKey copy = new HiveKey(); copy.setDistKeyLength(key.getDistKeyLength()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index 58e1ceb..b4c2c1f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -27,9 +27,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.FooterBuffer; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.io.IOContext.Comparison; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -171,6 +173,18 @@ private void initIOContext(long startPos, boolean isBlockPointer, ioCxtRef.isBlockPointer = isBlockPointer; ioCxtRef.inputPath = inputPath; LOG.info("Processing file " + inputPath); + + // In spark, in multi-insert an input HadoopRDD maybe be shared by multiple + // mappers, and if we cache it, only the first thread will have its thread-local + // IOContext initialized, while the rest will not. + // To solve this issue, we need to save a copy of the initialized IOContext, so that + // later it can be used for other threads. + if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + IOContext iocontext = new IOContext(); + IOContext.copy(iocontext, ioCxtRef); + IOContext.getMap().put(SparkUtilities.MAP_IO_CONTEXT, iocontext); + } + initDone = true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index 5fb3b13..7b6df27 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -112,6 +112,27 @@ public IOContext() { this.ioExceptions = false; } + /** + * Copy all fields values from orig to dest, all existing fields in dest will be overwritten. + * + * @param dest the IOContext to copy to + * @param orig the IOContext to copy from + */ + public static void copy(IOContext dest, IOContext orig) { + dest.currentBlockStart = orig.currentBlockStart; + dest.nextBlockStart = orig.nextBlockStart; + dest.currentRow = orig.currentRow; + dest.isBlockPointer = orig.isBlockPointer; + dest.ioExceptions = orig.ioExceptions; + dest.useSorted = orig.useSorted; + dest.isBinarySearching = orig.isBinarySearching; + dest.endBinarySearch = orig.endBinarySearch; + dest.comparison = orig.comparison; + dest.genericUDFClassName = orig.genericUDFClassName; + dest.ri = orig.ri; + dest.inputPath = orig.inputPath; + } + public long getCurrentBlockStart() { return currentBlockStart; } @@ -224,4 +245,5 @@ public void resetSortingValues() { this.comparison = null; this.genericUDFClassName = null; } + }