diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java index 2895d80..2fe33f5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java @@ -67,7 +67,13 @@ public void load( MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException { - String currentInputPath = context.getCurrentInputPath().toString(); + // Note: it's possible that a MJ operator is in a ReduceWork, in which case the + // currentInputPath will be null. But, since currentInputPath is only interesting + // for bucket join case, and for bucket join the MJ operator will always be in + // a MapWork, this should be OK. + String currentInputPath = + context.getCurrentInputPath() == null ? null : context.getCurrentInputPath().toString(); + LOG.info("******* Load from HashTable for input file: " + currentInputPath); MapredLocalWork localWork = context.getLocalWork(); try { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 141ae6f..a9fbf6c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -34,11 +34,13 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -101,6 +103,7 @@ private StructObjectInspector[] valueStructInspectors; /* this is only used in the error code path */ private List[] valueStringWriters; + private MapredLocalWork localWork = null; public void init(JobConf job, OutputCollector output, Reporter reporter) { super.init(job, output, reporter); @@ -197,8 +200,9 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { } ExecMapperContext execContext = new ExecMapperContext(job); + localWork = gWork.getMapRedLocalWork(); execContext.setJc(jc); - execContext.setLocalWork(gWork.getMapRedLocalWork()); + execContext.setLocalWork(localWork); reducer.setExecContext(execContext); reducer.setReporter(rp); @@ -209,6 +213,14 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { try { LOG.info(reducer.dump(0)); reducer.initialize(jc, rowObjectInspector); + + if (localWork != null) { + for (Operator dummyOp : localWork.getDummyParentOp()) { + dummyOp.setExecContext(execContext); + dummyOp.initialize(jc, null); + } + } + } catch (Throwable e) { abort = true; if (e instanceof OutOfMemoryError) { @@ -218,6 +230,7 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { throw new RuntimeException("Reduce operator initialization failed", e); } } + } @Override @@ -416,6 +429,13 @@ public void close() { } reducer.close(abort); + + if (localWork != null) { + for (Operator dummyOp : localWork.getDummyParentOp()) { + dummyOp.close(abort); + } + } + ReportStats rps = new ReportStats(rp, jc); reducer.preorderMap(rps);