diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index b5d5454..1dd5a93 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -45,7 +45,6 @@ public HiveReduceFunction(byte[] buffer) { call(Iterator>> it) throws Exception { if (jobConf == null) { jobConf = KryoSerializer.deserializeJobConf(this.buffer); - jobConf.set("mapred.reducer.class", ExecReducer.class.getName()); } SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index a62a02a..d16f1be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -80,8 +81,9 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { "The roots in the SparkWork must be MapWork instances!"); } MapWork mapWork = (MapWork) w; - SparkTran tran = generate(w); - JavaPairRDD input = generateRDD(mapWork); + JobConf newJobConf = cloneJobConf(mapWork); + SparkTran tran = generate(newJobConf, mapWork); + JavaPairRDD input = generateRDD(newJobConf, mapWork); trans.addTranWithInput(tran, input); while (sparkWork.getChildren(w).size() > 0) { @@ -116,18 +118,11 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { return plan; } - private JavaPairRDD generateRDD(MapWork mapWork) + private JavaPairRDD generateRDD(JobConf jobConf, MapWork mapWork) throws Exception { - JobConf newJobConf = new JobConf(jobConf); - List inputPaths = Utilities.getInputPaths(newJobConf, mapWork, - scratchDir, context, false); - Utilities.setInputPaths(newJobConf, inputPaths); - Utilities.setMapWork(newJobConf, mapWork, scratchDir, true); Class ifClass = getInputFormat(mapWork); - // The mapper class is expected by the HiveInputFormat. - newJobConf.set("mapred.mapper.class", ExecMapper.class.getName()); - return sc.hadoopRDD(newJobConf, ifClass, WritableComparable.class, + return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); } @@ -159,43 +154,10 @@ private Class getInputFormat(MapWork mWork) throws HiveException { return inputFormatClass; } - private SparkTran generate(BaseWork bw) throws Exception { - // initialize stats publisher if necessary - if (bw.isGatheringStats()) { - StatsPublisher statsPublisher; - StatsFactory factory = StatsFactory.newFactory(jobConf); - if (factory != null) { - statsPublisher = factory.getStatsPublisher(); - if (!statsPublisher.init(jobConf)) { // creating stats table if not exists - if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { - throw new HiveException( - ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); - } - } - } - } - if (bw instanceof MapWork) { - return generate((MapWork) bw); - } else if (bw instanceof ReduceWork) { - return generate((ReduceWork) bw); - } else { - throw new IllegalArgumentException( - "Only MapWork and ReduceWork are expected"); - } - } - - private MapTran generate(MapWork mw) throws Exception { - JobConf newJobConf = new JobConf(jobConf); + private MapTran generate(JobConf jobConf, MapWork mw) throws Exception { + initStatsPublisher(mw); MapTran result = new MapTran(); - - List inputPaths = Utilities.getInputPaths(newJobConf, mw, - scratchDir, context, false); - Utilities.setInputPaths(newJobConf, inputPaths); - - Utilities.setMapWork(newJobConf, mw, scratchDir, true); - Utilities.createTmpDirs(newJobConf, mw); - newJobConf.set("mapred.mapper.class", ExecMapper.class.getName()); - byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); + byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); HiveMapFunction mapFunc = new HiveMapFunction(confBytes); result.setMapFunction(mapFunc); return result; @@ -210,14 +172,9 @@ private SparkShuffler generate(SparkEdgeProperty edge) { return new GroupByShuffler(); } - private ReduceTran generate(ReduceWork rw) throws IOException { + private ReduceTran generate(ReduceWork rw) throws Exception { ReduceTran result = new ReduceTran(); - // Clone jobConf for each ReduceWork so we can have multiple of them - JobConf newJobConf = new JobConf(jobConf); - // Make sure we'll use a different plan path from the original one - HiveConf.setVar(newJobConf, HiveConf.ConfVars.PLAN, ""); - Utilities.setReduceWork(newJobConf, rw, scratchDir, false); - Utilities.createTmpDirs(newJobConf, rw); + JobConf newJobConf = cloneJobConf(rw); byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); HiveReduceFunction redFunc = new HiveReduceFunction(confBytes); result.setReduceFunction(redFunc); @@ -229,4 +186,39 @@ private UnionTran generate(UnionWork uw) { return result; } + private JobConf cloneJobConf(BaseWork work) throws Exception { + JobConf cloned = new JobConf(jobConf); + // Make sure we'll use a different plan path from the original one + HiveConf.setVar(cloned, HiveConf.ConfVars.PLAN, ""); + if (work instanceof MapWork) { + List inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, scratchDir, context, false); + Utilities.setInputPaths(cloned, inputPaths); + Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false); + Utilities.createTmpDirs(cloned, (MapWork) work); + cloned.set("mapred.mapper.class", ExecMapper.class.getName()); + } else if (work instanceof ReduceWork) { + Utilities.setReduceWork(cloned, (ReduceWork) work, scratchDir, false); + Utilities.createTmpDirs(cloned, (ReduceWork) work); + cloned.set("mapred.reducer.class", ExecReducer.class.getName()); + } + return cloned; + } + + private void initStatsPublisher(BaseWork work) throws HiveException { + // initialize stats publisher if necessary + if (work.isGatheringStats()) { + StatsPublisher statsPublisher; + StatsFactory factory = StatsFactory.newFactory(jobConf); + if (factory != null) { + statsPublisher = factory.getStatsPublisher(); + if (!statsPublisher.init(jobConf)) { // creating stats table if not exists + if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { + throw new HiveException( + ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); + } + } + } + } + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index 8cbf32f..331798e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.io; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.session.SessionState; /** @@ -41,10 +40,10 @@ private static IOContext ioContext = new IOContext(); public static IOContext get() { - if (SessionState.get() == null) { - // this happens on the backend. only one io context needed. - return ioContext; - } +// if (SessionState.get() == null) { +// // this happens on the backend. only one io context needed. +// return ioContext; +// } return IOContext.threadLocal.get(); }