diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index 96481f1..349a07e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -143,7 +143,11 @@ private void generateLocalWork(SparkTask originalTask) { for (BaseWork work : allBaseWorks) { if (containsOp(work, SparkHashTableSinkOperator.class) || containsOp(work, MapJoinOperator.class)) { - work.setMapRedLocalWork(new MapredLocalWork()); + MapredLocalWork localWork = new MapredLocalWork(); + localWork.setDummyParentOp(new ArrayList>()); + localWork.setAliasToFetchWork(new LinkedHashMap()); + localWork.setAliasToWork(new LinkedHashMap>()); + work.setMapRedLocalWork(localWork); } } @@ -160,16 +164,10 @@ private void generateLocalWork(SparkTask originalTask) { for (BaseWork parentWork : originalWork.getParents(work)) { if (containsOp(parentWork,SparkHashTableSinkOperator.class)) { parentWork.getMapRedLocalWork().setTmpHDFSPath(tmpPath); - parentWork.getMapRedLocalWork().setDummyParentOp( - new ArrayList>()); } } - bigTableLocalWork.setAliasToWork( - new LinkedHashMap>()); - bigTableLocalWork.setAliasToFetchWork(new LinkedHashMap()); bigTableLocalWork.setTmpPath(tmpPath); - // TODO: set inputFileChangeSensitive and BucketMapjoinContext, // TODO: enable non-staged mapjoin }