diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 20ea977..8f26d99 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -20,6 +20,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -30,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; +import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -88,6 +91,17 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { mo = new MapOperator(); } mo.setConf(mrwork); + + // If the current thread's IOContext is not initialized (because it's reading from a + // cached input HadoopRDD), retrieve from the saved result. + if (HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + IOContext ioContext = IOContext.get(); + if (ioContext.getInputPath() == null) { + IOContext ioc = IOContext.getMap().get("SPARK"); + ioContext.setInputPath(ioc.getInputPath()); + } + } + // initialize map operator mo.setChildren(job); l4j.info(mo.dump(0)); @@ -199,6 +213,12 @@ public void close() { } finally { MapredContext.close(); Utilities.clearWorkMap(); + + // It's possible that a thread get reused for different queries, so we need to + // clear the IOContext at end. + if (HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + IOContext.get().setInputPath(null); + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 00a6f3d..b2c6cf3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -294,8 +294,7 @@ private MapInput generateMapInput(MapWork mapWork) JavaPairRDD hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); - MapInput result = new MapInput(hadoopRDD, - false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/); + MapInput result = new MapInput(hadoopRDD, cloneToWork.containsKey(mapWork)); return result; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index 58e1ceb..1036e25 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.FooterBuffer; @@ -171,6 +172,18 @@ private void initIOContext(long startPos, boolean isBlockPointer, ioCxtRef.isBlockPointer = isBlockPointer; ioCxtRef.inputPath = inputPath; LOG.info("Processing file " + inputPath); + + // In spark, in multi-insert an input HadoopRDD maybe shared by multiple downstream + // mappers, and if we cache the input Rdd, then only the first thread asks the iterator + // will get its associated thread-local IOContext initialized, while the rest will not. + // To solve this issue, we need to save a copy of the initialized IOContext, and later + // be used for other threads. + if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + IOContext copy = new IOContext(); + copy.setInputPath(inputPath); + IOContext.getMap().put("SPARK", copy); + } + initDone = true; }