diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index e34cc09..1606397 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -83,12 +83,12 @@ */ public class DagUtils { - private static Subject currentUser; + private static final String TEZ_DIR = "_tez_scratch_dir"; /* * Creates the configuration object necessary to run a specific vertex from * map work. This includes input formats, input processor, etc. -= */ + */ private static JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { JobConf conf = new JobConf(baseConf); @@ -127,8 +127,8 @@ private static JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { inpFormat = BucketizedHiveInputFormat.class.getName(); } - conf.set(MRJobConfig.MAP_CLASS_ATTR, ExecMapper.class.getName()); - conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, inpFormat); + conf.set("mapred.mapper.class", ExecMapper.class.getName()); + conf.set("mapred.input.format.class", inpFormat); return conf; } @@ -144,11 +144,16 @@ private static JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { * @param w The second vertex (sink) * @return */ - public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) { + public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) + throws IOException { // Tez needs to setup output subsequent input pairs correctly MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf); + // update payloads (configuration for the vertices might have changed) + v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf)); + w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); + // all edges are of the same type right now EdgeProperty edgeProperty = new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, @@ -164,6 +169,8 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, int seqNo, LocalResource appJarLr, List additionalLr, FileSystem fs, Path mrScratchDir, Context ctx) throws Exception { + Path tezDir = getTezDir(mrScratchDir); + // map work can contain localwork, i.e: hashtables for map-side joins Path hashTableArchive = createHashTables(mapWork, conf); LocalResource localWorkLr = null; @@ -174,17 +181,24 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, int seqNo, } // write out the operator plan - Path planPath = Utilities.setMapWork(conf, mapWork, mrScratchDir.toUri().toString(), false); + Path planPath = Utilities.setMapWork(conf, mapWork, + mrScratchDir.toUri().toString(), false); LocalResource planLr = createLocalResource(fs, planPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); // setup input paths and split info - List inputPaths = Utilities.getInputPaths(conf, mapWork, mrScratchDir.toUri().toString(), ctx); + List inputPaths = Utilities.getInputPaths(conf, mapWork, + mrScratchDir.toUri().toString(), ctx); Utilities.setInputPaths(conf, inputPaths); - InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, mrScratchDir); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, conf); + InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, tezDir); + + // create the directories FileSinkOperators need + Utilities.createTmpDirs(conf, mapWork); + + // Tez ask us to call this even if there's no preceding vertex + MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null); // finally create the vertex Vertex map = null; @@ -232,17 +246,13 @@ private static Path createHashTables(MapWork mapWork, Configuration conf) { private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) { JobConf conf = new JobConf(baseConf); - conf.set(MRJobConfig.REDUCE_CLASS_ATTR, ExecReducer.class.getName()); + conf.set("mapred.reducer.class", ExecReducer.class.getName()); boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS, useSpeculativeExecReducers); - // reducers should have been set at planning stage - // job.setNumberOfReducers(rWork.getNumberOfReducers()) - conf.set(MRJobConfig.NUM_REDUCES, reduceWork.getNumReduceTasks().toString()); - return conf; } @@ -255,10 +265,13 @@ private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, int seqN // write out the operator plan Path planPath = Utilities.setReduceWork(conf, reduceWork, - mrScratchDir.getName(), false); + mrScratchDir.toUri().toString(), false); LocalResource planLr = createLocalResource(fs, planPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); + // create the directories FileSinkOperators need + Utilities.createTmpDirs(conf, reduceWork); + // create the vertex Vertex reducer = new Vertex("Reducer "+seqNo, new ProcessorDescriptor(ReduceProcessor.class.getName(), @@ -407,7 +420,8 @@ private static String getClientVersion(String pathStr) { * @throws LoginException when we are unable to determine the user. */ public static LocalResource createHiveExecLocalResource(HiveConf conf) - throws IOException, LoginException { + throws IOException, LoginException { + String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY); String currentVersionPathStr = getExecJarPathLocal(); String currentJar = getClientVersion(currentVersionPathStr); @@ -499,20 +513,17 @@ public static JobConf createConfiguration(HiveConf hiveConf) throws IOException } } - conf.set("mapreduce.framework.name","yarn-tez"); - conf.set("mapreduce.job.output.committer.class", NullOutputCommitter.class.getName()); - - conf.setBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, false); - conf.setBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, false); + conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName()); - conf.setClass(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, HiveOutputFormatImpl.class, OutputFormat.class); + conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false); + conf.setBoolean("mapred.committer.job.task.cleanup.needed", false); - conf.set(MRJobConfig.MAP_CLASS_ATTR, ExecMapper.class.getName()); + conf.setClass("mapred.output.format.class", HiveOutputFormatImpl.class, OutputFormat.class); conf.set(MRJobConfig.OUTPUT_KEY_CLASS, HiveKey.class.getName()); conf.set(MRJobConfig.OUTPUT_VALUE_CLASS, BytesWritable.class.getName()); - conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR, HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER)); + conf.set("mapred.partitioner.class", HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER)); return conf; } @@ -570,6 +581,25 @@ public static Vertex createVertex(JobConf conf, BaseWork work, } } + /** + * createTezDir creates a temporary directory in the scratchDir folder to + * be used with Tez. Assumes scratchDir exists. + */ + public static Path createTezDir(Path scratchDir, Configuration conf) + throws IOException { + Path tezDir = getTezDir(scratchDir); + FileSystem fs = tezDir.getFileSystem(conf); + fs.mkdirs(tezDir); + return tezDir; + } + + /** + * Gets the tez dir that belongs to the hive scratch dir + */ + public static Path getTezDir(Path scratchDir) { + return new Path(scratchDir, TEZ_DIR); + } + private DagUtils() { // don't instantiate } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 1a36b4b..f5d136f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -24,16 +24,20 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; @@ -60,15 +64,26 @@ public TezTask() { @Override public int execute(DriverContext driverContext) { int rc = 1; - - Context ctx = driverContext.getCtx(); + boolean cleanContext = false; + Context ctx = null; + DAGClient client = null; try { + // Get or create Context object. If we create it we have to clean + // it later as well. + ctx = driverContext.getCtx(); + if (ctx == null) { + ctx = new Context(conf); + cleanContext = true; + } // we will localize all the files (jars, plans, hashtables) to the // scratch dir. let's create this first. Path scratchDir = new Path(ctx.getMRScratchDir()); + // create the tez tmp dir + DagUtils.createTezDir(scratchDir, conf); + // jobConf will hold all the configuration for hadoop, tez, and hive JobConf jobConf = DagUtils.createConfiguration(conf); @@ -80,7 +95,7 @@ public int execute(DriverContext driverContext) { DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx); // submit will send the job to the cluster and start executing - DAGClient client = submit(jobConf, dag, scratchDir, appJarLr); + client = submit(jobConf, dag, scratchDir, appJarLr); // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); @@ -88,13 +103,21 @@ public int execute(DriverContext driverContext) { } catch (Exception e) { LOG.error("Failed to execute tez graph.", e); + // rc will be 1 at this point indicating failure. } finally { Utilities.clearWork(conf); - try { - ctx.clear(); - } catch (Exception e) { - /*best effort*/ - LOG.warn("Failed to clean up after tez job"); + if (cleanContext) { + try { + ctx.clear(); + } catch (Exception e) { + /*best effort*/ + LOG.warn("Failed to clean up after tez job"); + } + } + // need to either move tmp files or remove them + if (client != null) { + // rc will only be overwritten if close errors out + rc = close(work, rc); } } return rc; @@ -115,6 +138,9 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, List ws = work.getAllWork(); Collections.reverse(ws); + Path tezDir = DagUtils.getTezDir(scratchDir); + FileSystem fs = tezDir.getFileSystem(conf); + // the name of the dag is what is displayed in the AM/Job UI DAG dag = new DAG( Utilities.abbreviate(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYSTRING), @@ -125,8 +151,8 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, // translate work to vertex JobConf wxConf = DagUtils.initializeVertexConf(conf, w); - Vertex wx = DagUtils.createVertex(wxConf, w, scratchDir, i--, - appJarLr, additionalLr, scratchDir.getFileSystem(conf), ctx); + Vertex wx = DagUtils.createVertex(wxConf, w, tezDir, + i--, appJarLr, additionalLr, fs, ctx); dag.addVertex(wx); workToVertex.put(w, wx); workToConf.put(w, wxConf); @@ -155,14 +181,42 @@ private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource a Map amLrs = new HashMap(); amLrs.put(DagUtils.getBaseName(appJarLr), appJarLr); + Path tezDir = DagUtils.getTezDir(scratchDir); + // ready to start execution on the cluster - DAGClient dagClient = tezClient.submitDAGApplication(dag, scratchDir, + DAGClient dagClient = tezClient.submitDAGApplication(dag, tezDir, null, "default", Collections.singletonList(""), amEnv, amLrs, new TezConfiguration(conf)); return dagClient; } + /* + * close will move the temp files into the right place for the fetch + * task. If the job has failed it will clean up the files. + */ + private int close(TezWork work, int rc) { + try { + JobCloseFeedBack feedBack = new JobCloseFeedBack(); + List ws = work.getAllWork(); + for (BaseWork w: ws) { + List> ops = w.getAllOperators(); + for (Operator op: ops) { + op.jobClose(conf, rc == 0, feedBack); + } + } + } catch (Exception e) { + // jobClose needs to execute successfully otherwise fail task + if (rc == 0) { + rc = 3; + String mesg = "Job Commit failed with exception '" + + Utilities.getNameMessage(e) + "'"; + console.printError(mesg, "\n" + StringUtils.stringifyException(e)); + } + } + return rc; + } + @Override public boolean isMapRedTask() { return true; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 46c8b6d..7433ddc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -773,6 +773,21 @@ public static void setTaskPlan(String path, String alias, } /** + * Set key and value descriptor + * @param work RedueWork + * @param rs ReduceSinkOperator + */ + public static void setKeyAndValueDesc(ReduceWork work, ReduceSinkOperator rs) { + work.setKeyDesc(rs.getConf().getKeySerializeInfo()); + int tag = Math.max(0, rs.getConf().getTag()); + List tagToSchema = work.getTagToValueDesc(); + while (tag + 1 > tagToSchema.size()) { + tagToSchema.add(null); + } + tagToSchema.set(tag, rs.getConf().getValueSerializeInfo()); + } + + /** * set key and value descriptor. * * @param plan @@ -788,13 +803,7 @@ public static void setKeyAndValueDesc(ReduceWork plan, if (topOp instanceof ReduceSinkOperator) { ReduceSinkOperator rs = (ReduceSinkOperator) topOp; - plan.setKeyDesc(rs.getConf().getKeySerializeInfo()); - int tag = Math.max(0, rs.getConf().getTag()); - List tagToSchema = plan.getTagToValueDesc(); - while (tag + 1 > tagToSchema.size()) { - tagToSchema.add(null); - } - tagToSchema.set(tag, rs.getConf().getValueSerializeInfo()); + setKeyAndValueDesc(plan, rs); } else { List> children = topOp.getChildOperators(); if (children != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 7ef4850..48145ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -109,6 +109,14 @@ public Object process(Node nd, Stack stack, = (ReduceSinkOperator)root.getParentOperators().get(0); reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); + // need to fill in information about the key and value in the reducer + GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); + + // needs to be fixed in HIVE-5052. This should be driven off of stats + if (reduceWork.getNumReduceTasks() <= 0) { + reduceWork.setNumReduceTasks(1); + } + tezWork.add(reduceWork); tezWork.connect( context.preceedingWork, @@ -129,6 +137,17 @@ public Object process(Node nd, Stack stack, // Also note: the concept of leaf and root is reversed in hive for historical // reasons. Roots are data sources, leaves are data sinks. I know. if (context.leafOperatorToFollowingWork.containsKey(operator)) { + + BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator); + + // need to add this branch to the key + value info + assert operator instanceof ReduceSinkOperator + && followingWork instanceof ReduceWork; + ReduceSinkOperator rs = (ReduceSinkOperator) operator; + ReduceWork rWork = (ReduceWork) followingWork; + GenMapRedUtils.setKeyAndValueDesc(rWork, rs); + + // add dependency between the two work items tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator)); } @@ -136,6 +155,7 @@ public Object process(Node nd, Stack stack, // we might have to connect parent work with this work later. for (Operator parent: new ArrayList>(root.getParentOperators())) { assert !context.leafOperatorToFollowingWork.containsKey(parent); + assert !(work instanceof MapWork); context.leafOperatorToFollowingWork.put(parent, work); LOG.debug("Removing " + parent + " as parent from " + root); root.removeParent(parent);