diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 5aa7f06..4393596 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -513,10 +513,16 @@ public void shutdown() { Throwable getException() { return exception; } + void setException(Throwable ex) { exception = ex; } + public void setConsole(LogHelper console) { + this.console = console; + } + + @Override public String toString() { return getId() + ":" + getType(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index e456fc9..7bc947c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -97,12 +97,13 @@ public class DagUtils { private static final String TEZ_DIR = "_tez_scratch_dir"; + private static DagUtils instance; /* * Creates the configuration object necessary to run a specific vertex from * map work. This includes input formats, input processor, etc. */ - private static JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { + private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { JobConf conf = new JobConf(baseConf); if (mapWork.getNumMapTasks() != null) { @@ -157,7 +158,7 @@ private static JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { * @param w The second vertex (sink) * @return */ - public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, + public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, EdgeType edgeType) throws IOException { @@ -199,7 +200,7 @@ public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, /* * Helper function to create Vertex from MapWork. */ - private static Vertex createVertex(JobConf conf, MapWork mapWork, + private Vertex createVertex(JobConf conf, MapWork mapWork, LocalResource appJarLr, List additionalLr, FileSystem fs, Path mrScratchDir, Context ctx) throws Exception { @@ -295,7 +296,7 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, /* * Helper function to create JobConf for specific ReduceWork. */ - private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) { + private JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) { JobConf conf = new JobConf(baseConf); conf.set("mapred.reducer.class", ExecReducer.class.getName()); @@ -311,7 +312,7 @@ private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceW /* * Helper function to create Vertex for given ReduceWork. */ - private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, + private Vertex createVertex(JobConf conf, ReduceWork reduceWork, LocalResource appJarLr, List additionalLr, FileSystem fs, Path mrScratchDir, Context ctx) throws Exception { @@ -351,7 +352,7 @@ private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, /* * Helper method to create a yarn local resource. */ - private static LocalResource createLocalResource(FileSystem remoteFs, Path file, + private LocalResource createLocalResource(FileSystem remoteFs, Path file, LocalResourceType type, LocalResourceVisibility visibility) { FileStatus fstat = null; @@ -381,7 +382,7 @@ private static LocalResource createLocalResource(FileSystem remoteFs, Path file, * @throws LoginException if we are unable to figure user information * @throws IOException when any dfs operation fails. */ - public static Path getDefaultDestDir(Configuration conf) throws LoginException, IOException { + public Path getDefaultDestDir(Configuration conf) throws LoginException, IOException { UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); String userName = ShimLoader.getHadoopShims().getShortUserName(ugi); String userPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_USER_INSTALL_DIR); @@ -415,7 +416,7 @@ public static Path getDefaultDestDir(Configuration conf) throws LoginException, * @throws IOException when hdfs operation fails * @throws LoginException when getDefaultDestDir fails with the same exception */ - public static List localizeTempFiles(Configuration conf) throws IOException, LoginException { + public List localizeTempFiles(Configuration conf) throws IOException, LoginException { List tmpResources = new ArrayList(); String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE); @@ -471,7 +472,7 @@ public static Path getDefaultDestDir(Configuration conf) throws LoginException, } // the api that finds the jar being used by this class on disk - public static String getExecJarPathLocal () throws URISyntaxException { + public String getExecJarPathLocal () throws URISyntaxException { // returns the location on disc of the jar of this class. return DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString(); } @@ -479,7 +480,7 @@ public static String getExecJarPathLocal () throws URISyntaxException { /* * Helper function to retrieve the basename of a local resource */ - public static String getBaseName(LocalResource lr) { + public String getBaseName(LocalResource lr) { return FilenameUtils.getName(lr.getResource().getFile()); } @@ -487,7 +488,7 @@ public static String getBaseName(LocalResource lr) { * @param pathStr - the string from which we try to determine the resource base name * @return the name of the resource from a given path string. */ - public static String getResourceBaseName(String pathStr) { + public String getResourceBaseName(String pathStr) { String[] splits = pathStr.split("/"); return splits[splits.length - 1]; } @@ -499,7 +500,7 @@ public static String getResourceBaseName(String pathStr) { * @return true if the file names match else returns false. * @throws IOException when any file system related call fails */ - private static boolean checkPreExisting(Path src, Path dest, Configuration conf) + private boolean checkPreExisting(Path src, Path dest, Configuration conf) throws IOException { FileSystem destFS = dest.getFileSystem(conf); @@ -528,7 +529,7 @@ private static boolean checkPreExisting(Path src, Path dest, Configuration conf) * @return localresource from tez localization. * @throws IOException when any file system related calls fails. */ - public static LocalResource localizeResource(Path src, Path dest, Configuration conf) + public LocalResource localizeResource(Path src, Path dest, Configuration conf) throws IOException { FileSystem destFS = dest.getFileSystem(conf); if (!(destFS instanceof DistributedFileSystem)) { @@ -557,7 +558,7 @@ public static LocalResource localizeResource(Path src, Path dest, Configuration * @return JobConf base configuration for job execution * @throws IOException */ - public static JobConf createConfiguration(HiveConf hiveConf) throws IOException { + public JobConf createConfiguration(HiveConf hiveConf) throws IOException { hiveConf.setBoolean("mapred.mapper.new-api", false); JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf); @@ -585,7 +586,7 @@ public static JobConf createConfiguration(HiveConf hiveConf) throws IOException * @param work BaseWork will be used to populate the configuration object. * @return JobConf new configuration object */ - public static JobConf initializeVertexConf(JobConf conf, BaseWork work) { + public JobConf initializeVertexConf(JobConf conf, BaseWork work) { // simply dispatch the call to the right method for the actual (sub-) type of // BaseWork. @@ -612,7 +613,7 @@ public static JobConf initializeVertexConf(JobConf conf, BaseWork work) { * @param ctx This query's context * @return Vertex */ - public static Vertex createVertex(JobConf conf, BaseWork work, + public Vertex createVertex(JobConf conf, BaseWork work, Path scratchDir, LocalResource appJarLr, List additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren) throws Exception { @@ -626,8 +627,8 @@ public static Vertex createVertex(JobConf conf, BaseWork work, v = createVertex(conf, (ReduceWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx); } else { - assert false; - return null; + // something is seriously wrong if this is happening + throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg()); } // initialize stats publisher if necessary @@ -660,7 +661,7 @@ public static Vertex createVertex(JobConf conf, BaseWork work, * createTezDir creates a temporary directory in the scratchDir folder to * be used with Tez. Assumes scratchDir exists. */ - public static Path createTezDir(Path scratchDir, Configuration conf) + public Path createTezDir(Path scratchDir, Configuration conf) throws IOException { Path tezDir = getTezDir(scratchDir); FileSystem fs = tezDir.getFileSystem(conf); @@ -671,10 +672,21 @@ public static Path createTezDir(Path scratchDir, Configuration conf) /** * Gets the tez dir that belongs to the hive scratch dir */ - public static Path getTezDir(Path scratchDir) { + public Path getTezDir(Path scratchDir) { return new Path(scratchDir, TEZ_DIR); } + /** + * Singleton + * @return instance of this class + */ + public static DagUtils getInstance() { + if (instance == null) { + instance = new DagUtils(); + } + return instance; + } + private DagUtils() { // don't instantiate } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 165aa49..15f1011 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -58,6 +58,7 @@ private LocalResource appJarLr; private TezSession session; private String sessionId; + private DagUtils utils; private static List openSessions = Collections.synchronizedList(new LinkedList()); @@ -66,7 +67,16 @@ * Constructor. We do not automatically connect, because we only want to * load tez classes when the user has tez installed. */ - public void TezSessionContext() { + public TezSessionState(DagUtils utils) { + this.utils = utils; + } + + /** + * Constructor. We do not automatically connect, because we only want to + * load tez classes when the user has tez installed. + */ + public TezSessionState() { + this(DagUtils.getInstance()); } /** @@ -112,7 +122,7 @@ public void open(String sessionId, HiveConf conf) // configuration for the application master Map commonLocalResources = new HashMap(); - commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr); + commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr); AMConfiguration amConfig = new AMConfiguration(null, commonLocalResources, tezConfig, null); @@ -211,8 +221,8 @@ private Path createTezDir(String sessionId) private LocalResource createHiveExecLocalResource() throws IOException, LoginException, URISyntaxException { String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY); - String currentVersionPathStr = DagUtils.getExecJarPathLocal(); - String currentJarName = DagUtils.getResourceBaseName(currentVersionPathStr); + String currentVersionPathStr = utils.getExecJarPathLocal(); + String currentJarName = utils.getResourceBaseName(currentVersionPathStr); FileSystem fs = null; Path jarPath = null; FileStatus dirStatus = null; @@ -234,18 +244,18 @@ private LocalResource createHiveExecLocalResource() if ((dirStatus != null) && (dirStatus.isDir())) { FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath); for (FileStatus fstatus : listFileStatus) { - String jarName = DagUtils.getResourceBaseName(fstatus.getPath().toString()); + String jarName = utils.getResourceBaseName(fstatus.getPath().toString()); if (jarName.equals(currentJarName)) { // we have found the jar we need. jarPath = fstatus.getPath(); - return DagUtils.localizeResource(null, jarPath, conf); + return utils.localizeResource(null, jarPath, conf); } } // jar wasn't in the directory, copy the one in current use if (jarPath == null) { Path dest = new Path(hiveJarDir + "/" + currentJarName); - return DagUtils.localizeResource(new Path(currentVersionPathStr), dest, conf); + return utils.localizeResource(new Path(currentVersionPathStr), dest, conf); } } } @@ -258,12 +268,12 @@ private LocalResource createHiveExecLocalResource() */ if ((hiveJarDir == null) || (dirStatus == null) || ((dirStatus != null) && (!dirStatus.isDir()))) { - Path dest = DagUtils.getDefaultDestDir(conf); + Path dest = utils.getDefaultDestDir(conf); String destPathStr = dest.toString(); String jarPathStr = destPathStr + "/" + currentJarName; dirStatus = fs.getFileStatus(dest); if (dirStatus.isDir()) { - return DagUtils.localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf); + return utils.localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf); } else { throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString())); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 2231145..d16baac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -72,8 +72,15 @@ private TezCounters counters; + private DagUtils utils; + public TezTask() { + this(DagUtils.getInstance()); + } + + public TezTask(DagUtils utils) { super(); + this.utils = utils; } public TezCounters getTezCounters() { @@ -116,10 +123,10 @@ public int execute(DriverContext driverContext) { Path scratchDir = new Path(ctx.getMRScratchDir()); // create the tez tmp dir - DagUtils.createTezDir(scratchDir, conf); + utils.createTezDir(scratchDir, conf); // jobConf will hold all the configuration for hadoop, tez, and hive - JobConf jobConf = DagUtils.createConfiguration(conf); + JobConf jobConf = utils.createConfiguration(conf); // unless already installed on all the cluster nodes, we'll have to // localize hive-exec.jar as well. @@ -129,7 +136,7 @@ public int execute(DriverContext driverContext) { DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx); // submit will send the job to the cluster and start executing - client = submit(jobConf, dag, scratchDir, appJarLr, session.getSession()); + client = submit(jobConf, dag, scratchDir, appJarLr, session); // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); @@ -161,7 +168,7 @@ public int execute(DriverContext driverContext) { return rc; } - private DAG build(JobConf conf, TezWork work, Path scratchDir, + DAG build(JobConf conf, TezWork work, Path scratchDir, LocalResource appJarLr, Context ctx) throws Exception { @@ -170,14 +177,14 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, Map workToConf = new HashMap(); // we need to get the user specified local resources for this dag - List additionalLr = DagUtils.localizeTempFiles(conf); + List additionalLr = utils.localizeTempFiles(conf); // getAllWork returns a topologically sorted list, which we use to make // sure that vertices are created before they are used in edges. List ws = work.getAllWork(); Collections.reverse(ws); - Path tezDir = DagUtils.getTezDir(scratchDir); + Path tezDir = utils.getTezDir(scratchDir); FileSystem fs = tezDir.getFileSystem(conf); // the name of the dag is what is displayed in the AM/Job UI @@ -191,8 +198,8 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, // translate work to vertex perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); - JobConf wxConf = DagUtils.initializeVertexConf(conf, w); - Vertex wx = DagUtils.createVertex(wxConf, w, tezDir, + JobConf wxConf = utils.initializeVertexConf(conf, w); + Vertex wx = utils.createVertex(wxConf, w, tezDir, appJarLr, additionalLr, fs, ctx, !isFinal); dag.addVertex(wx); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); @@ -206,7 +213,7 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, EdgeType edgeType = work.getEdgeProperty(w, v); - e = DagUtils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType); + e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType); dag.addEdge(e); } } @@ -214,8 +221,8 @@ private DAG build(JobConf conf, TezWork work, Path scratchDir, return dag; } - private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, - LocalResource appJarLr, TezSession session) + DAGClient submit(JobConf conf, DAG dag, Path scratchDir, + LocalResource appJarLr, TezSessionState sessionState) throws IOException, TezException, InterruptedException, LoginException, URISyntaxException, HiveException { @@ -224,25 +231,19 @@ private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, try { // ready to start execution on the cluster - dagClient = session.submitDAG(dag); + dagClient = sessionState.getSession().submitDAG(dag); } catch (SessionNotRunning nr) { console.printInfo("Tez session was closed. Reopening..."); - // Need to remove this static hack. But this is the way currently to - // get a session. - SessionState ss = SessionState.get(); - TezSessionState tezSession = ss.getTezSession(); - // close the old one, but keep the tmp files around - tezSession.close(true); + sessionState.close(true); // (re)open the session - tezSession.open(ss.getSessionId(), this.conf); - session = tezSession.getSession(); + sessionState.open(sessionState.getSessionId(), this.conf); console.printInfo("Session re-established."); - dagClient = session.submitDAG(dag); + dagClient = sessionState.getSession().submitDAG(dag); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); @@ -253,7 +254,7 @@ private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, * close will move the temp files into the right place for the fetch * task. If the job has failed it will clean up the files. */ - private int close(TezWork work, int rc) { + int close(TezWork work, int rc) { try { List ws = work.getAllWork(); for (BaseWork w: ws) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index f4e973a..8363bbf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; @@ -58,6 +59,9 @@ public Object process(Node nd, Stack stack, GenTezProcContext context = (GenTezProcContext) procContext; + assert context != null && context.currentTask != null + && context.currentRootOperator != null; + // Operator is a file sink or reduce sink. Something that forces // a new vertex. Operator operator = (Operator) nd; @@ -183,7 +187,7 @@ public Object process(Node nd, Stack stack, return null; } - private ReduceWork createReduceWork(GenTezProcContext context, Operator root, + protected ReduceWork createReduceWork(GenTezProcContext context, Operator root, TezWork tezWork) { assert !root.getParentOperators().isEmpty(); ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber)); @@ -210,7 +214,7 @@ private ReduceWork createReduceWork(GenTezProcContext context, Operator root, return reduceWork; } - private void setupReduceSink(GenTezProcContext context, ReduceWork reduceWork, + protected void setupReduceSink(GenTezProcContext context, ReduceWork reduceWork, ReduceSinkOperator reduceSink) { LOG.debug("Setting up reduce sink: " + reduceSink @@ -227,7 +231,7 @@ private void setupReduceSink(GenTezProcContext context, ReduceWork reduceWork, reduceSink.getConf().setOutputName(reduceWork.getName()); } - private MapWork createMapWork(GenTezProcContext context, Operator root, + protected MapWork createMapWork(GenTezProcContext context, Operator root, TezWork tezWork) throws SemanticException { assert root.getParentOperators().isEmpty(); MapWork mapWork = new MapWork("Map "+ (++sequenceNumber)); @@ -237,11 +241,19 @@ private MapWork createMapWork(GenTezProcContext context, Operator root, assert root instanceof TableScanOperator; String alias = ((TableScanOperator)root).getConf().getAlias(); - GenMapRedUtils.setMapWork(mapWork, context.parseContext, - context.inputs, null, root, alias, context.conf, false); + setupMapWork(mapWork, context, root, alias); + + // add new item to the tez work tezWork.add(mapWork); return mapWork; } + // this method's main use is to help unit testing this class + protected void setupMapWork(MapWork mapWork, GenTezProcContext context, + Operator root, String alias) throws SemanticException { + // All the setup is done in GenMapRedUtils + GenMapRedUtils.setMapWork(mapWork, context.parseContext, + context.inputs, null, root, alias, context.conf, false); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 08ce7d9..dff743f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -91,6 +91,7 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"), ReduceSinkOperator.getOperatorName() + "%"), new SetReducerParallelism()); + opRules.put(new RuleRegExp(new String("Convert Join to Map-join"), JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin()); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 5e0ff0f..dad5497 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -90,6 +91,24 @@ private void visit(BaseWork child, Set seen, List result) { } /** + * add all nodes in the collection without any connections + */ + public void addAll(Collection c) { + for (BaseWork w: c) { + this.add(w); + } + } + + /** + * add all nodes in the collection without any connections + */ + public void addAll(BaseWork[] bws) { + for (BaseWork w: bws) { + this.add(w); + } + } + + /** * add creates a new node in the graph without any connections */ public void add(BaseWork w) { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java new file mode 100644 index 0000000..58fe62a --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +import javax.security.auth.login.LoginException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.client.TezSession; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestTezTask { + + DagUtils utils; + MapWork[] mws; + ReduceWork[] rws; + TezWork work; + TezTask task; + TezSession session; + TezSessionState sessionState; + JobConf conf; + LocalResource appLr; + Operator op; + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + utils = mock(DagUtils.class); + when(utils.getTezDir(any(Path.class))).thenReturn(new Path("hdfs://localhost:9000/tez/")); + when(utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class), any(LocalResource.class), + any(List.class), any(FileSystem.class), any(Context.class), anyBoolean())).thenAnswer(new Answer() { + + @Override + public Vertex answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return new Vertex(((BaseWork)args[1]).getName(), + mock(ProcessorDescriptor.class), 0, mock(Resource.class)); + } + }); + + when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(JobConf.class), + any(Vertex.class), any(EdgeType.class))).thenAnswer(new Answer() { + + @Override + public Edge answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return new Edge((Vertex)args[1], (Vertex)args[3], mock(EdgeProperty.class)); + } + }); + + work = new TezWork(); + + mws = new MapWork[] { new MapWork(), new MapWork()}; + rws = new ReduceWork[] { new ReduceWork(), new ReduceWork() }; + + work.addAll(mws); + work.addAll(rws); + + int i = 0; + for (BaseWork w: work.getAllWork()) { + w.setName("Work "+(++i)); + } + + op = mock(Operator.class); + + LinkedHashMap> map + = new LinkedHashMap>(); + map.put("foo", op); + mws[0].setAliasToWork(map); + mws[1].setAliasToWork(map); + + LinkedHashMap> pathMap + = new LinkedHashMap>(); + ArrayList aliasList = new ArrayList(); + aliasList.add("foo"); + pathMap.put("foo", aliasList); + + mws[0].setPathToAliases(pathMap); + mws[1].setPathToAliases(pathMap); + + rws[0].setReducer(op); + rws[1].setReducer(op); + + work.connect(mws[0], rws[0], EdgeType.SIMPLE_EDGE); + work.connect(mws[1], rws[0], EdgeType.SIMPLE_EDGE); + work.connect(rws[0], rws[1], EdgeType.SIMPLE_EDGE); + + task = new TezTask(utils); + task.setWork(work); + task.setConsole(mock(LogHelper.class)); + + conf = new JobConf(); + appLr = mock(LocalResource.class); + + session = mock(TezSession.class); + sessionState = mock(TezSessionState.class); + when(sessionState.getSession()).thenReturn(session); + when(session.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")) + .thenReturn(mock(DAGClient.class)); + } + + @After + public void tearDown() throws Exception { + utils = null; + work = null; + task = null; + } + + @Test + public void testBuildDag() throws IllegalArgumentException, IOException, Exception { + DAG dag = task.build(conf, work, new Path("hdfs:///"), appLr, new Context(conf)); + for (BaseWork w: work.getAllWork()) { + Vertex v = dag.getVertex(w.getName()); + assertNotNull(v); + List outs = v.getOutputVertices(); + for (BaseWork x: work.getChildren(w)) { + boolean found = false; + for (Vertex u: outs) { + if (u.getVertexName().equals(x.getName())) { + found = true; + break; + } + } + assertTrue(found); + } + } + } + + @Test + public void testEmptyWork() throws IllegalArgumentException, IOException, Exception { + DAG dag = task.build(conf, new TezWork(), new Path("hdfs:///"), appLr, new Context(conf)); + assertEquals(dag.getVertices().size(), 0); + } + + @Test + public void testSubmit() throws LoginException, IllegalArgumentException, + IOException, TezException, InterruptedException, URISyntaxException, HiveException { + DAG dag = new DAG("test"); + task.submit(conf, dag, new Path("hdfs:///"), appLr, sessionState); + // validate close/reopen + verify(sessionState, times(1)).open(any(String.class), any(HiveConf.class)); + verify(sessionState, times(1)).close(eq(true)); + verify(session, times(2)).submitDAG(any(DAG.class)); + } + + @Test + public void testClose() throws HiveException { + task.close(work, 0); + verify(op, times(4)).jobClose(any(Configuration.class), eq(true)); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java new file mode 100644 index 0000000..01583c7 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for GenTezWork. + * + */ +public class TestGenTezWork { + + GenTezProcContext ctx; + GenTezWork proc; + ReduceSinkOperator rs; + FileSinkOperator fs; + TableScanOperator ts; + + /** + * @throws java.lang.Exception + */ + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + ctx = new GenTezProcContext( + new HiveConf(), + new ParseContext(), + (List>)Collections.EMPTY_LIST, + (List>) new ArrayList>(), + (Set)Collections.EMPTY_SET, + (Set)Collections.EMPTY_SET); + + proc = new GenTezWork() { + @Override + protected void setupMapWork(MapWork mapWork, GenTezProcContext context, + Operator root, String alias) throws SemanticException { + LinkedHashMap> map + = new LinkedHashMap>(); + map.put("foo", root); + mapWork.setAliasToWork(map); + return; + } + }; + + fs = new FileSinkOperator(); + fs.setConf(new FileSinkDesc()); + rs = new ReduceSinkOperator(); + rs.setConf(new ReduceSinkDesc()); + ts = new TableScanOperator(); + ts.setConf(new TableScanDesc()); + ts.getChildOperators().add(rs); + rs.getParentOperators().add(ts); + rs.getChildOperators().add(fs); + fs.getParentOperators().add(rs); + ctx.preceedingWork = null; + ctx.currentRootOperator = ts; + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + ctx = null; + proc = null; + ts = null; + rs = null; + fs = null; + } + + @Test + public void testCreateMap() throws SemanticException { + proc.process(rs, null, ctx, (Object[])null); + + assertNotNull(ctx.currentTask); + assertTrue(ctx.rootTasks.contains(ctx.currentTask)); + + TezWork work = ctx.currentTask.getWork(); + assertEquals(work.getAllWork().size(),1); + + BaseWork w = work.getAllWork().get(0); + assertTrue(w instanceof MapWork); + + MapWork mw = (MapWork)w; + + // need to make sure names are set for tez to connect things right + assertNotNull(w.getName()); + + // map work should start with our ts op + assertSame(mw.getAliasToWork().entrySet().iterator().next().getValue(),ts); + + // preceeding work must be set to the newly generated map + assertSame(ctx.preceedingWork, mw); + + // should have a new root now + assertSame(ctx.currentRootOperator, fs); + } + + @Test + public void testCreateReduce() throws SemanticException { + // create map + proc.process(rs, null, ctx, (Object[])null); + + // create reduce + proc.process(fs, null, ctx, (Object[])null); + + TezWork work = ctx.currentTask.getWork(); + assertEquals(work.getAllWork().size(),2); + + BaseWork w = work.getAllWork().get(1); + assertTrue(w instanceof ReduceWork); + assertTrue(work.getParents(w).contains(work.getAllWork().get(0))); + + ReduceWork rw = (ReduceWork)w; + + // need to make sure names are set for tez to connect things right + assertNotNull(w.getName()); + + // map work should start with our ts op + assertSame(rw.getReducer(),fs); + + // should have severed the ties + assertEquals(fs.getParentOperators().size(),0); + } +}