diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 9b2dae36a5..6f5830dfc0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -153,7 +153,6 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.input.MultiMRInput; -import org.apache.hadoop.hive.ql.exec.tez.NullMROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.comparator.TezBytesComparator; @@ -667,15 +666,14 @@ private static String getContainerJavaOpts(Configuration conf) { } } - private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, FileSystem fs, - Path mrScratchDir, Context ctx, VertexType vertexType, - Map localResources) throws Exception { + private Vertex createVertexFromMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, + Path mrScratchDir, VertexType vertexType) throws Exception { Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false); if (mergeJoinWork.getMainWork() instanceof MapWork) { List mapWorkList = mergeJoinWork.getBaseWorkList(); MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork()); - Vertex mergeVx = createVertex( - conf, mapWork, fs, mrScratchDir, ctx, vertexType, localResources); + Vertex mergeVx = createVertexFromMapWork( + conf, mapWork, mrScratchDir, vertexType); conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); // mapreduce.tez.input.initializer.serialize.event.payload should be set @@ -718,17 +716,16 @@ private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, FileSyste mergeVx.setVertexManagerPlugin(desc); return mergeVx; } else { - return createVertex(conf, - (ReduceWork) mergeJoinWork.getMainWork(), fs, mrScratchDir, ctx, localResources); + return createVertexFromReduceWork(conf, + (ReduceWork) mergeJoinWork.getMainWork(), mrScratchDir); } } /* * Helper function to create Vertex from MapWork. */ - private Vertex createVertex(JobConf conf, MapWork mapWork, - FileSystem fs, Path mrScratchDir, Context ctx, VertexType vertexType, - Map localResources) throws Exception { + private Vertex createVertexFromMapWork(JobConf conf, MapWork mapWork, Path mrScratchDir, + VertexType vertexType) throws Exception { // set up the operator plan Utilities.cacheMapWork(conf, mapWork, mrScratchDir); @@ -846,21 +843,16 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, procClassName = MergeFileTezProcessor.class.getName(); } - VertexExecutionContext executionContext = createVertexExecutionContext(mapWork); - map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName) .setUserPayload(serializedConf), numTasks, getContainerResource(conf)); map.setTaskEnvironment(getContainerEnvironment(conf, true)); - map.setExecutionContext(executionContext); - map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); assert mapWork.getAliasToWork().keySet().size() == 1; // Add the actual source input String alias = mapWork.getAliasToWork().keySet().iterator().next(); map.addDataSource(alias, dataSource); - map.addTaskLocalFiles(localResources); return map; } @@ -899,8 +891,7 @@ private VertexExecutionContext createVertexExecutionContext(BaseWork work) { /* * Helper function to create Vertex for given ReduceWork. */ - private Vertex createVertex(JobConf conf, ReduceWork reduceWork, FileSystem fs, - Path mrScratchDir, Context ctx, Map localResources) + private Vertex createVertexFromReduceWork(JobConf conf, ReduceWork reduceWork, Path mrScratchDir) throws Exception { // set up operator plan @@ -910,8 +901,6 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, FileSystem fs, // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, reduceWork); - VertexExecutionContext vertexExecutionContext = createVertexExecutionContext(reduceWork); - // create the vertex Vertex reducer = Vertex.create(reduceWork.getName(), ProcessorDescriptor.create(ReduceTezProcessor.class.getName()). @@ -921,9 +910,6 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, FileSystem fs, reduceWork.getNumReduceTasks(), getContainerResource(conf)); reducer.setTaskEnvironment(getContainerEnvironment(conf, false)); - reducer.setExecutionContext(vertexExecutionContext); - reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); - reducer.addTaskLocalFiles(localResources); return reducer; } @@ -1444,40 +1430,38 @@ private JobConf initializeVertexConf(JobConf conf, Context context, MergeJoinWor * Create a vertex from a given work object. * * @param conf JobConf to be used to this execution unit - * @param work The instance of BaseWork representing the actual work to be performed + * @param workUnit The instance of BaseWork representing the actual work to be performed * by this vertex. * @param scratchDir HDFS scratch dir for this execution unit. - * @param fileSystem FS corresponding to scratchDir and LocalResources - * @param ctx This query's context * @return Vertex */ @SuppressWarnings("deprecation") - public Vertex createVertex(JobConf conf, BaseWork work, - Path scratchDir, FileSystem fileSystem, Context ctx, boolean hasChildren, - TezWork tezWork, VertexType vertexType, Map localResources) throws Exception { + public Vertex createVertex(JobConf conf, BaseWork workUnit, Path scratchDir, + TezWork tezWork, Map localResources) throws Exception { - Vertex v = null; + Vertex vertex; // simply dispatch the call to the right method for the actual (sub-) type of // BaseWork. - if (work instanceof MapWork) { - v = createVertex( - conf, (MapWork) work, fileSystem, scratchDir, ctx, vertexType, localResources); - } else if (work instanceof ReduceWork) { - v = createVertex(conf, (ReduceWork) work, fileSystem, scratchDir, ctx, localResources); - } else if (work instanceof MergeJoinWork) { - v = createVertex( - conf, (MergeJoinWork) work, fileSystem, scratchDir, ctx, vertexType, localResources); + VertexType vertexType = tezWork.getVertexType(workUnit); + if (workUnit instanceof MapWork) { + vertex = createVertexFromMapWork( + conf, (MapWork) workUnit, scratchDir, vertexType); + } else if (workUnit instanceof ReduceWork) { + vertex = createVertexFromReduceWork(conf, (ReduceWork) workUnit, scratchDir); + } else if (workUnit instanceof MergeJoinWork) { + vertex = createVertexFromMergeWork( + conf, (MergeJoinWork) workUnit, scratchDir, vertexType); // set VertexManagerPlugin if whether it's a cross product destination vertex List crossProductSources = new ArrayList<>(); - for (BaseWork parentWork : tezWork.getParents(work)) { - if (tezWork.getEdgeType(parentWork, work) == EdgeType.XPROD_EDGE) { + for (BaseWork parentWork : tezWork.getParents(workUnit)) { + if (tezWork.getEdgeType(parentWork, workUnit) == EdgeType.XPROD_EDGE) { crossProductSources.add(parentWork.getName()); } } if (!crossProductSources.isEmpty()) { CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources); - v.setVertexManagerPlugin( + vertex.setVertexManagerPlugin( VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName()) .setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf)))); // parallelism shouldn't be set for cartesian product vertex @@ -1486,14 +1470,18 @@ public Vertex createVertex(JobConf conf, BaseWork work, // something is seriously wrong if this is happening throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg()); } + VertexExecutionContext vertexExecutionContext = createVertexExecutionContext(workUnit); + vertex.addTaskLocalFiles(localResources); + vertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); + vertex.setExecutionContext(vertexExecutionContext); // initialize stats publisher if necessary - if (work.isGatheringStats()) { + if (workUnit.isGatheringStats()) { StatsPublisher statsPublisher; StatsFactory factory = StatsFactory.newFactory(conf); if (factory != null) { StatsCollectionContext sCntxt = new StatsCollectionContext(conf); - sCntxt.setStatsTmpDirs(Utilities.getStatsTmpDirs(work, conf)); + sCntxt.setStatsTmpDirs(Utilities.getStatsTmpDirs(workUnit, conf)); statsPublisher = factory.getStatsPublisher(); if (!statsPublisher.init(sCntxt)) { // creating stats table if not exists if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { @@ -1512,13 +1500,14 @@ public Vertex createVertex(JobConf conf, BaseWork work, outputKlass = MROutput.class; } // final vertices need to have at least one output - if (!hasChildren) { - v.addDataSink("out_"+work.getName(), new DataSinkDescriptor( + boolean endVertex = tezWork.getLeaves().contains(workUnit); + if (endVertex) { + vertex.addDataSink("out_"+workUnit.getName(), new DataSinkDescriptor( OutputDescriptor.create(outputKlass.getName()) .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), null, null)); } - return v; + return vertex; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 3599d19141..854bc89e9c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -34,7 +34,6 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; @@ -404,17 +403,15 @@ void checkOutputSpec(BaseWork work, JobConf jc) throws IOException { } } - DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx, + DAG build(JobConf conf, TezWork tezWork, Path scratchDir, Context ctx, Map vertexResources) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG); // getAllWork returns a topologically sorted list, which we use to make // sure that vertices are created before they are used in edges. - List ws = work.getAllWork(); - Collections.reverse(ws); - - FileSystem fs = scratchDir.getFileSystem(conf); + List topologicalWorkList = tezWork.getAllWork(); + Collections.reverse(topologicalWorkList); // the name of the dag is what is displayed in the AM/Job UI String dagName = utils.createDagName(conf, queryPlan); @@ -435,13 +432,12 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx, dag.setCredentials(conf.getCredentials()); setAccessControlsForCurrentUser(dag, queryPlan.getQueryId(), conf); - for (BaseWork w: ws) { - boolean isFinal = work.getLeaves().contains(w); + for (BaseWork workUnit: topologicalWorkList) { // translate work to vertex - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + workUnit.getName()); - if (w instanceof UnionWork) { + if (workUnit instanceof UnionWork) { // Special case for unions. These items translate to VertexGroups List unionWorkItems = new LinkedList(); @@ -449,8 +445,8 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx, // split the children into vertices that make up the union and vertices that are // proper children of the union - for (BaseWork v: work.getChildren(w)) { - EdgeType type = work.getEdgeProperty(w, v).getEdgeType(); + for (BaseWork v: tezWork.getChildren(workUnit)) { + EdgeType type = tezWork.getEdgeProperty(workUnit, v).getEdgeType(); if (type == EdgeType.CONTAINS) { unionWorkItems.add(v); } else { @@ -458,7 +454,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx, } } JobConf parentConf = workToConf.get(unionWorkItems.get(0)); - checkOutputSpec(w, parentConf); + checkOutputSpec(workUnit, parentConf); // create VertexGroup Vertex[] vertexArray = new Vertex[unionWorkItems.size()]; @@ -467,7 +463,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx, for (BaseWork v: unionWorkItems) { vertexArray[i++] = workToVertex.get(v); } - VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray); + VertexGroup group = dag.createVertexGroup(workUnit.getName(), vertexArray); // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner. // Pick any one source vertex to figure out the Edge configuration. @@ -476,47 +472,47 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx, for (BaseWork v: children) { // finally we can create the grouped edge GroupInputEdge e = utils.createEdge(group, parentConf, - workToVertex.get(v), work.getEdgeProperty(w, v), v, work); + workToVertex.get(v), tezWork.getEdgeProperty(workUnit, v), v, tezWork); dag.addEdge(e); } } else { // Regular vertices - JobConf wxConf = utils.initializeVertexConf(conf, ctx, w); - checkOutputSpec(w, wxConf); - Vertex wx = utils.createVertex(wxConf, w, scratchDir, fs, ctx, !isFinal, - work, work.getVertexType(w), vertexResources); - if (work.getChildren(w).size() > 1) { - String value = wxConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB); + JobConf wxConf = utils.initializeVertexConf(conf, ctx, workUnit); + checkOutputSpec(workUnit, wxConf); + Vertex wx = utils.createVertex(wxConf, workUnit, scratchDir, + tezWork, vertexResources); + if (tezWork.getChildren(workUnit).size() > 1) { + String tezRuntimeSortMb = wxConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB); int originalValue = 0; - if(value == null) { + if(tezRuntimeSortMb == null) { originalValue = TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB_DEFAULT; } else { - originalValue = Integer.valueOf(value); + originalValue = Integer.valueOf(tezRuntimeSortMb); } - int newValue = (int) (originalValue / work.getChildren(w).size()); + int newValue = originalValue / tezWork.getChildren(workUnit).size(); wxConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, Integer.toString(newValue)); LOG.info("Modified " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " to " + newValue); } - if (w.getReservedMemoryMB() > 0) { + if (workUnit.getReservedMemoryMB() > 0) { // If reversedMemoryMB is set, make memory allocation fraction adjustment as needed - double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf); + double frac = DagUtils.adjustMemoryReserveFraction(workUnit.getReservedMemoryMB(), super.conf); LOG.info("Setting " + TEZ_MEMORY_RESERVE_FRACTION + " to " + frac); wx.setConf(TEZ_MEMORY_RESERVE_FRACTION, Double.toString(frac)); } // Otherwise just leave it up to Tez to decide how much memory to allocate dag.addVertex(wx); - utils.addCredentials(w, dag); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); - workToVertex.put(w, wx); - workToConf.put(w, wxConf); + utils.addCredentials(workUnit, dag); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + workUnit.getName()); + workToVertex.put(workUnit, wx); + workToConf.put(workUnit, wxConf); // add all dependencies (i.e.: edges) to the graph - for (BaseWork v: work.getChildren(w)) { + for (BaseWork v: tezWork.getChildren(workUnit)) { assert workToVertex.containsKey(v); Edge e = null; - TezEdgeProperty edgeProp = work.getEdgeProperty(w, v); - e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, v, work); + TezEdgeProperty edgeProp = tezWork.getEdgeProperty(workUnit, v); + e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, v, tezWork); dag.addEdge(e); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 389f5cc86b..00a6c89b1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -440,7 +440,6 @@ private SplitResult getSplits(JobConf job, TezWork work, Schema schema, Applicat JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork); // TODO: should we also whitelist input formats here? from mapred.input.format.class Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job); - FileSystem fs = scratchDir.getFileSystem(job); try { LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(ctx.getConf()), utils, job); @@ -453,8 +452,8 @@ private SplitResult getSplits(JobConf job, TezWork work, Schema schema, Applicat // Update the queryId to use the generated applicationId. See comment below about // why this is done. HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString()); - Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, fs, ctx, false, work, - work.getVertexType(mapWork), DagUtils.createTezLrMap(appJarLr, null)); + Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, work, + DagUtils.createTezLrMap(appJarLr, null)); String vertexName = wx.getName(); dag.addVertex(wx); utils.addCredentials(mapWork, dag); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index c14dc62b21..6f52d65b13 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; -import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.mapred.JobConf; @@ -95,8 +94,7 @@ public void setUp() throws Exception { when(utils.getTezDir(any(Path.class))).thenReturn(path); when( utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class), - any(FileSystem.class), any(Context.class), - anyBoolean(), any(TezWork.class), any(VertexType.class), any(Map.class))).thenAnswer( + any(TezWork.class), any(Map.class))).thenAnswer( new Answer() { @Override