commit 6b58fd0f3304e697b068379a1ae2a4e40aad7814 Author: Daniel Dai Date: Tue Sep 10 11:22:39 2013 -0700 HIVE-4531: Collecting task logs to hdfs diff --git a/hcatalog/src/docs/src/documentation/content/xdocs/hive.xml b/hcatalog/src/docs/src/documentation/content/xdocs/hive.xml index a738ceb..8761c44 100644 --- a/hcatalog/src/docs/src/documentation/content/xdocs/hive.xml +++ b/hcatalog/src/docs/src/documentation/content/xdocs/hive.xml @@ -66,6 +66,13 @@ None + enablelog + Collecting hadoop job config and logs into $statusdir/logs folder. + statusdir must be set as well to use this feature. + Optional + None + + callback Define a URL to be called upon job completion. You may embed a specific job ID into this URL using $jobId. This tag diff --git a/hcatalog/src/docs/src/documentation/content/xdocs/mapreducejar.xml b/hcatalog/src/docs/src/documentation/content/xdocs/mapreducejar.xml index 8eff9a3..af60451 100644 --- a/hcatalog/src/docs/src/documentation/content/xdocs/mapreducejar.xml +++ b/hcatalog/src/docs/src/documentation/content/xdocs/mapreducejar.xml @@ -86,6 +86,13 @@ None + enablelog + Collecting hadoop job config and logs into $statusdir/logs folder. + statusdir must be set as well to use this feature. + Optional + None + + callback Define a URL to be called upon job completion. You may embed a specific job ID into this URL using $jobId. This tag diff --git a/hcatalog/src/docs/src/documentation/content/xdocs/mapreducestreaming.xml b/hcatalog/src/docs/src/documentation/content/xdocs/mapreducestreaming.xml index 51c62bd..f69a5e7 100644 --- a/hcatalog/src/docs/src/documentation/content/xdocs/mapreducestreaming.xml +++ b/hcatalog/src/docs/src/documentation/content/xdocs/mapreducestreaming.xml @@ -101,6 +101,13 @@ None + enablelog + Collecting hadoop job config and logs into $statusdir/logs folder. + statusdir must be set as well to use this feature. + Optional + None + + callback Define a URL to be called upon job completion. You may embed a specific job ID into this URL using $jobId. This tag diff --git a/hcatalog/src/docs/src/documentation/content/xdocs/pig.xml b/hcatalog/src/docs/src/documentation/content/xdocs/pig.xml index 22c8dcc..b9099f3 100644 --- a/hcatalog/src/docs/src/documentation/content/xdocs/pig.xml +++ b/hcatalog/src/docs/src/documentation/content/xdocs/pig.xml @@ -70,6 +70,13 @@ None + enablelog + Collecting hadoop job config and logs into $statusdir/logs folder. + statusdir must be set as well to use this feature. + Optional + None + + callback Define a URL to be called upon job completion. You may embed a specific job ID into this URL using $jobId. This tag diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index 90d834f..1e27e28 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -43,25 +43,25 @@ public HiveDelegator(AppConfig appConf) { public EnqueueBean run(String user, String execute, String srcFile, List defines, List hiveArgs, String otherFiles, - String statusdir, String callback, String completedUrl) + String statusdir, String callback, String completedUrl, boolean enablelog) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir, - completedUrl); + completedUrl, enablelog); return enqueueController(user, callback, args); } private List makeArgs(String execute, String srcFile, List defines, List hiveArgs, String otherFiles, - String statusdir, String completedUrl) + String statusdir, String completedUrl, boolean enablelog) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); try { - args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl)); + args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl, enablelog)); args.add("--"); args.add(appConf.hivePath()); @@ -99,7 +99,8 @@ public EnqueueBean run(String user, } private List makeBasicArgs(String execute, String srcFile, String otherFiles, - String statusdir, String completedUrl) + String statusdir, String completedUrl, + boolean enablelog) throws URISyntaxException, FileNotFoundException, IOException, InterruptedException { @@ -115,7 +116,8 @@ public EnqueueBean run(String user, allFiles.addAll(Arrays.asList(ofs)); } - args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles)); + args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, + enablelog, JobType.HIVE)); args.add("-archives"); args.add(appConf.hiveArchive()); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveJobIDParser.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveJobIDParser.java new file mode 100644 index 0000000..2ebdcb3 --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveJobIDParser.java @@ -0,0 +1,22 @@ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; + +public class HiveJobIDParser extends JobIDParser { + public final static String jobidPattern = "Starting Job = (job_\\d+_\\d+),"; + + public HiveJobIDParser(String statusdir, Configuration conf) { + this.statusdir = statusdir; + this.conf = conf; + } + + @Override + public List parseJobID() throws IOException { + return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern); + } + +} diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 663da94..13f8f6f 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -41,13 +41,14 @@ public JarDelegator(AppConfig appConf) { public EnqueueBean run(String user, String jar, String mainClass, String libjars, String files, List jarArgs, List defines, - String statusdir, String callback, String completedUrl) + String statusdir, String callback, String completedUrl, + boolean enablelog, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, - statusdir, completedUrl); + statusdir, completedUrl, enablelog, jobType); return enqueueController(user, callback, args); } @@ -55,7 +56,8 @@ public EnqueueBean run(String user, String jar, String mainClass, private List makeArgs(String jar, String mainClass, String libjars, String files, List jarArgs, List defines, - String statusdir, String completedUrl) + String statusdir, String completedUrl, + boolean enablelog, JobType jobType) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); try { @@ -63,7 +65,7 @@ public EnqueueBean run(String user, String jar, String mainClass, allFiles.add(TempletonUtils.hadoopFsFilename(jar, appConf, runAs)); args.addAll(makeLauncherArgs(appConf, statusdir, - completedUrl, allFiles)); + completedUrl, allFiles, enablelog, jobType)); args.add("--"); args.add(appConf.clusterHadoop()); args.add("jar"); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarJobIDParser.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarJobIDParser.java new file mode 100644 index 0000000..989134f --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarJobIDParser.java @@ -0,0 +1,22 @@ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; + +public class JarJobIDParser extends JobIDParser { + public final static String jobidPattern = "Running job: (job_\\d+_\\d+)"; + + public JarJobIDParser(String statusdir, Configuration conf) { + this.statusdir = statusdir; + this.conf = conf; + } + + @Override + public List parseJobID() throws IOException { + return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern); + } + +} diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobIDParser.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobIDParser.java new file mode 100644 index 0000000..886c641 --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JobIDParser.java @@ -0,0 +1,48 @@ +package org.apache.hive.hcatalog.templeton; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; + +public abstract class JobIDParser { + String statusdir; + Configuration conf; + public BufferedReader openStatusFile(String fname) throws IOException { + Path p = new Path(statusdir, fname); + FileSystem fs = p.getFileSystem(conf); + BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(p))); + return in; + } + + public List findJobID(BufferedReader in, String patternAsString) throws IOException { + Pattern pattern = Pattern.compile(patternAsString); + Matcher matcher; + String line; + List jobs = new ArrayList(); + while ((line=in.readLine())!=null) { + matcher = pattern.matcher(line); + if (matcher.find()) { + String jobid = matcher.group(1); + jobs.add(jobid); + } + } + return jobs; + } + + public abstract List parseJobID() throws IOException; + + public List parseJobID(String fname, String pattern) throws IOException { + BufferedReader in = openStatusFile(TempletonControllerJob.STDERR_FNAME); + List jobs = findJobID(in, pattern); + return jobs; + } +} diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index 75d57eb..900e19c 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -43,6 +43,7 @@ public class LauncherDelegator extends TempletonDelegator { private static final Log LOG = LogFactory.getLog(LauncherDelegator.class); protected String runAs = null; + static public enum JobType {JAR, STREAMING, PIG, HIVE}; public LauncherDelegator(AppConfig appConf) { super(appConf); @@ -105,7 +106,9 @@ public String run() throws Exception { public List makeLauncherArgs(AppConfig appConf, String statusdir, String completedUrl, - List copyFiles) { + List copyFiles, + boolean enablelog, + JobType jobType) { ArrayList args = new ArrayList(); args.add("-libjars"); @@ -123,6 +126,10 @@ public String run() throws Exception { TempletonUtils.encodeArray(copyFiles)); addDef(args, TempletonControllerJob.OVERRIDE_CLASSPATH, makeOverrideClasspath(appConf)); + addDef(args, TempletonControllerJob.ENABLE_LOG, + Boolean.toString(enablelog)); + addDef(args, TempletonControllerJob.JOB_TYPE, + jobType.toString()); // Hadoop queue information addDef(args, "mapred.job.queue.name", appConf.hadoopQueueName()); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LogRetriever.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LogRetriever.java new file mode 100644 index 0000000..bc38105 --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LogRetriever.java @@ -0,0 +1,317 @@ +package org.apache.hive.hcatalog.templeton; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.URL; +import java.net.URLConnection; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hive.hcatalog.templeton.LauncherDelegator.JobType; + +public class LogRetriever { + String statusDir; + JobType jobType; + private static final String attemptDetailPatternInString = ""; + private static Pattern attemptDetailPattern = null; + private static final String attemptLogPatternInString = "Last 8KB
All"; + private static Pattern attemptLogPattern = null; + private static final String attemptIDPatternInString = "attemptid=(.*)?&"; + private static Pattern attemptIDPattern = null; + private static final String attemptStartTimePatternInString = "(\\d{1,2}-[A-Za-z]{3}-\\d{4} \\d{2}:\\d{2}:\\d{2})(
)?"; + private static Pattern attemptStartTimePattern = null; + private static final String attemptEndTimePatternInString = "(\\d{1,2}-[A-Za-z]{3}-\\d{4} \\d{2}:\\d{2}:\\d{2}) \\(.*\\)(
)?"; + private static Pattern attemptEndTimePattern = null; + FileSystem fs; + JobClient jobClient = null; + Configuration conf = null; + + // Class to store necessary information for an attempt to log + class AttemptInfo { + public String id; + public URL baseUrl; + public String status; + public String startTime; + public String endTime; + public String type = "unknown"; + + @Override + public String toString() { + return id + "\t" + baseUrl.toString() + "\t" + status + "\t" + type + "\t" + + startTime + "\t" + endTime + "\n"; + } + } + + public LogRetriever(String statusDir, JobType jobType, Configuration conf) throws IOException { + this.statusDir = statusDir; + this.jobType = jobType; + attemptDetailPattern = Pattern.compile(attemptDetailPatternInString); + attemptLogPattern = Pattern.compile(attemptLogPatternInString); + attemptIDPattern = Pattern.compile(attemptIDPatternInString); + attemptStartTimePattern = Pattern.compile(attemptStartTimePatternInString); + attemptEndTimePattern = Pattern.compile(attemptEndTimePatternInString); + Path statusPath = new Path(statusDir); + fs = statusPath.getFileSystem(conf); + jobClient = new JobClient(new JobConf(conf)); + this.conf = conf; + } + + public void run() throws IOException { + String logDir = statusDir + "/logs"; + + fs.mkdirs(new Path(logDir)); + + // Get jobids from job status dir + JobIDParser jobIDParser = null; + switch (jobType) { + case PIG: + jobIDParser = new PigJobIDParser(statusDir, conf); + break; + case HIVE: + jobIDParser = new HiveJobIDParser(statusDir, conf); + break; + case JAR: + case STREAMING: + jobIDParser = new JarJobIDParser(statusDir, conf); + break; + default: + System.err.println("Unknown job type, only pig/hive/jar/streaming are supported, skip logs"); + return; + } + List jobs = new ArrayList(); + try { + jobs = jobIDParser.parseJobID(); + } catch (IOException e) { + System.err.println("Cannot retrieve jobid from log file"); + e.printStackTrace(); + } + + // Log jobs + PrintWriter listWriter = new PrintWriter(new OutputStreamWriter(fs.create(new Path(logDir, "list.txt")))); + for (String job:jobs) { + try { + logJob(logDir, job, listWriter); + } catch (IOException e) { + listWriter.close(); + System.err.println("Cannot retrieve log for " + job); + e.printStackTrace(); + } + } + listWriter.close(); + } + + private void logJob(String logDir, String job, PrintWriter listWriter) throws IOException { + RunningJob rj = jobClient.getJob(JobID.forName(job)); + String jobURLString = rj.getTrackingURL(); + + Path jobDir = new Path(logDir, job); + fs.mkdirs(jobDir); + + // Log jobconf + try { + logJobConf(job, jobURLString, jobDir.toString()); + } catch (IOException e) { + System.err.println("Cannot retrieve job.xml.html for " + job); + e.printStackTrace(); + } + + listWriter.println("job: " + job + "(" + + "name=" + rj.getJobName() + "," + + "status=" + JobStatus.getJobRunState(rj.getJobState()) + ")"); + + // Get completed attempts + List attempts = new ArrayList(); + for (String type : new String[] {"map", "reduce", "setup", "cleanup"}) { + try { + List successAttempts = getCompletedAttempts(job, jobURLString, type); + attempts.addAll(successAttempts); + } catch (IOException e) { + System.err.println("Cannot retrieve " + type + " tasks for " + job); + e.printStackTrace(); + } + } + + // Get failed attempts + try { + List failedAttempts = getFailedAttempts(job, jobURLString); + attempts.addAll(failedAttempts); + } catch (IOException e) { + System.err.println("Cannot retrieve failed attempts for " + job); + e.printStackTrace(); + } + + // Log attempts + for (AttemptInfo attempt : attempts) { + try { + logAttempt(job, attempt, jobDir.toString()); + listWriter.println(" attempt:" + attempt.id + "(" + + "type=" + attempt.type + "," + + "status=" + attempt.status + "," + + "starttime=" + attempt.startTime + "," + + "endtime=" + attempt.endTime + ")"); + } catch (IOException e) { + System.err.println("Cannot log attempt " + attempt.id); + e.printStackTrace(); + } + } + + listWriter.println(); + } + + // Utility to get patterns from a url, every array element is match for one pattern + private List[] getMatches(URL url, Pattern[] pattern) throws IOException { + List[] results = new ArrayList[pattern.length]; + for (int i=0;i(); + } + + URLConnection urlConnection = url.openConnection(); + BufferedReader reader = new BufferedReader(new InputStreamReader + (urlConnection.getInputStream())); + String line; + while ((line=reader.readLine())!=null) { + for (int i=0;i getCompletedAttempts(String job, String jobURLInString, String type) throws IOException { + // Get task detail link from the jobtask page + String fileInURL = "/jobtasks.jsp?jobid=" + job + "&type=" + type + + "&pagenum=1&state=completed"; + URL jobURL = new URL(jobURLInString); + URL jobTasksURL = new URL(jobURL.getProtocol(), jobURL.getHost(), jobURL.getPort(), fileInURL); + List[] taskAttemptURLAndTimestamp = getMatches(jobTasksURL, new Pattern[]{attemptDetailPattern, + attemptStartTimePattern, attemptEndTimePattern}); + List results = new ArrayList(); + + // Go to task details, fetch task tracker url + for (int i=0;i[] attemptLogStrings = getMatches(taskDetailsURL, new Pattern[]{attemptLogPattern}); + for (String attemptLogString : attemptLogStrings[0]) { + AttemptInfo attempt = new AttemptInfo(); + attempt.baseUrl = new URL(attemptLogString); + attempt.startTime = taskAttemptURLAndTimestamp[1].get(i); + attempt.endTime = taskAttemptURLAndTimestamp[2].get(i); + attempt.type = type; + Matcher matcher = attemptIDPattern.matcher(attemptLogString); + if (matcher.find()) { + attempt.id = matcher.group(1); + } + attempt.status = "completed"; + results.add(attempt); + } + } + + return results; + } + + // Get failed attempts from jobfailures.jsp + private List getFailedAttempts(String job, String jobURLInString) throws IOException { + String fileInURL = "/jobfailures.jsp?jobid="+ job + "&kind=all&cause=failed"; + URL jobURL = new URL(jobURLInString); + URL url = new URL(jobURL.getProtocol(), jobURL.getHost(), jobURL.getPort(), fileInURL); + List[] attemptLogStrings = getMatches(url, new Pattern[]{attemptDetailPattern}); + List failedTaskStrings = new ArrayList(); + for (String attempt : attemptLogStrings[0]) { + if (!failedTaskStrings.contains(attempt)) { + failedTaskStrings.add(attempt); + } + } + List results = new ArrayList(); + for (String taskString : failedTaskStrings) { + URL taskDetailsURL = new URL(jobURL.getProtocol(), jobURL.getHost(), jobURL.getPort(), "/" + taskString); + List[] taskAttemptURLAndTimestamp = getMatches(taskDetailsURL, new Pattern[]{attemptLogPattern, + attemptStartTimePattern, attemptEndTimePattern}); + for (int i=0;i pigArgs, String otherFiles, - String statusdir, String callback, String completedUrl) + String statusdir, String callback, String completedUrl, boolean enablelog) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(execute, srcFile, pigArgs, - otherFiles, statusdir, completedUrl); + otherFiles, statusdir, completedUrl, enablelog); return enqueueController(user, callback, args); } private List makeArgs(String execute, String srcFile, List pigArgs, String otherFiles, - String statusdir, String completedUrl) + String statusdir, String completedUrl, boolean enablelog) throws BadParam, IOException, InterruptedException { ArrayList args = new ArrayList(); try { @@ -68,7 +68,7 @@ public EnqueueBean run(String user, allFiles.addAll(Arrays.asList(ofs)); } - args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles)); + args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, JobType.PIG)); args.add("-archives"); args.add(appConf.pigArchive()); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigJobIDParser.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigJobIDParser.java new file mode 100644 index 0000000..1e360f9 --- /dev/null +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigJobIDParser.java @@ -0,0 +1,22 @@ +package org.apache.hive.hcatalog.templeton; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; + +public class PigJobIDParser extends JobIDParser { + public final static String jobidPattern = "HadoopJobId: (job_\\d+_\\d+)"; + + public PigJobIDParser(String statusdir, Configuration conf) { + this.statusdir = statusdir; + this.conf = conf; + } + + @Override + public List parseJobID() throws IOException { + return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern); + } + +} diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index 01ff4db..ba0f7ce 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -50,6 +50,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; +import org.apache.hive.hcatalog.templeton.LauncherDelegator.JobType; import org.apache.hive.hcatalog.templeton.tool.TempletonUtils; /** @@ -591,18 +592,21 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, @FormParam("cmdenv") List cmdenvs, @FormParam("arg") List args, @FormParam("statusdir") String statusdir, - @FormParam("callback") String callback) + @FormParam("callback") String callback, + @FormParam("enablelog") boolean enablelog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); verifyParam(inputs, "input"); verifyParam(mapper, "mapper"); verifyParam(reducer, "reducer"); + if (enablelog == true && !TempletonUtils.isset(statusdir)) + throw new BadParam("enablelog is only applicable when statusdir is set"); StreamingDelegator d = new StreamingDelegator(appConf); return d.run(getDoAsUser(), inputs, output, mapper, reducer, files, defines, cmdenvs, args, - statusdir, callback, getCompletedUrl()); + statusdir, callback, getCompletedUrl(), enablelog, JobType.STREAMING); } /** @@ -618,18 +622,21 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, @FormParam("arg") List args, @FormParam("define") List defines, @FormParam("statusdir") String statusdir, - @FormParam("callback") String callback) + @FormParam("callback") String callback, + @FormParam("enablelog") boolean enablelog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); verifyParam(jar, "jar"); verifyParam(mainClass, "class"); + if (enablelog == true && !TempletonUtils.isset(statusdir)) + throw new BadParam("enablelog is only applicable when statusdir is set"); JarDelegator d = new JarDelegator(appConf); return d.run(getDoAsUser(), jar, mainClass, libjars, files, args, defines, - statusdir, callback, getCompletedUrl()); + statusdir, callback, getCompletedUrl(), enablelog, JobType.JAR); } /** @@ -643,18 +650,21 @@ public EnqueueBean pig(@FormParam("execute") String execute, @FormParam("arg") List pigArgs, @FormParam("files") String otherFiles, @FormParam("statusdir") String statusdir, - @FormParam("callback") String callback) + @FormParam("callback") String callback, + @FormParam("enablelog") boolean enablelog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); if (execute == null && srcFile == null) throw new BadParam("Either execute or file parameter required"); + if (enablelog == true && !TempletonUtils.isset(statusdir)) + throw new BadParam("enablelog is only applicable when statusdir is set"); PigDelegator d = new PigDelegator(appConf); return d.run(getDoAsUser(), execute, srcFile, pigArgs, otherFiles, - statusdir, callback, getCompletedUrl()); + statusdir, callback, getCompletedUrl(), enablelog); } /** @@ -669,16 +679,19 @@ public EnqueueBean hive(@FormParam("execute") String execute, @FormParam("files") String otherFiles, @FormParam("define") List defines, @FormParam("statusdir") String statusdir, - @FormParam("callback") String callback) + @FormParam("callback") String callback, + @FormParam("enablelog") boolean enablelog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); if (execute == null && srcFile == null) throw new BadParam("Either execute or file parameter required"); + if (enablelog == true && !TempletonUtils.isset(statusdir)) + throw new BadParam("enablelog is only applicable when statusdir is set"); HiveDelegator d = new HiveDelegator(appConf); return d.run(getDoAsUser(), execute, srcFile, defines, hiveArgs, otherFiles, - statusdir, callback, getCompletedUrl()); + statusdir, callback, getCompletedUrl(), enablelog); } /** @@ -891,6 +904,17 @@ public String getCompletedUrl() { return theUriInfo.getBaseUri() + VERSION + "/internal/complete/$jobId"; } + + /** + * Describe a database + */ + @GET + @Path("*/*") + @Produces({MediaType.APPLICATION_JSON}) + public String misc() { + return "OK"; + } + /** * Returns canonical host name from which the request is made; used for doAs validation */ diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index f9f6c94..a4c9803 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -43,7 +43,9 @@ public EnqueueBean run(String user, List jarArgs, String statusdir, String callback, - String completedUrl) + String completedUrl, + boolean enableLog, + JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { List args = makeArgs(inputs, output, mapper, reducer, @@ -53,7 +55,7 @@ public EnqueueBean run(String user, return d.run(user, appConf.streamingJar(), null, null, null, args, defines, - statusdir, callback, completedUrl); + statusdir, callback, completedUrl, enableLog, jobType); } private List makeArgs(List inputs, diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java index 4deafbb..7e91951 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java @@ -24,6 +24,7 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -53,6 +54,9 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.templeton.LauncherDelegator; +import org.apache.hive.hcatalog.templeton.LogRetriever; +import org.apache.hive.hcatalog.templeton.Main; /** * A Map Reduce job that will start another job. @@ -69,6 +73,8 @@ public class TempletonControllerJob extends Configured implements Tool { public static final String COPY_NAME = "templeton.copy"; public static final String STATUSDIR_NAME = "templeton.statusdir"; + public static final String ENABLE_LOG = "templeton.enablelog"; + public static final String JOB_TYPE = "templeton.jobtype"; public static final String JAR_ARGS_NAME = "templeton.args"; public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath"; @@ -155,9 +161,17 @@ public void run(Context context) String statusdir = conf.get(STATUSDIR_NAME); if (statusdir != null) { - statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir, conf.get("user.name"), conf); + try { + statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable + (statusdir, conf.get("user.name")); + } catch (URISyntaxException e) { + throw new IOException("Invalid status dir URI", e); + } } + Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG)); + LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE)); + ExecutorService pool = Executors.newCachedThreadPool(); executeWatcher(pool, conf, context.getJobID(), proc.getInputStream(), statusdir, STDOUT_FNAME); @@ -177,6 +191,12 @@ public void run(Context context) state.setCompleteStatus("done"); state.close(); + if (enablelog && TempletonUtils.isset(statusdir)) { + System.err.println("templeton: collecting logs to " + statusdir + "/logs"); + LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf); + logRetriever.run(); + } + if (proc.exitValue() != 0) System.err.println("templeton: job failed with exit code " + proc.exitValue()); @@ -272,6 +292,7 @@ public void run() { } } writer.flush(); + writer.close(); } catch (IOException e) { System.err.println("templeton: execute error: " + e); } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java index 1c91890..be43277 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java @@ -33,6 +33,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.ws.rs.core.UriBuilder; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -213,22 +215,19 @@ public static boolean hadoopFsIsMissing(FileSystem fs, Path p) { } } - public static String addUserHomeDirectoryIfApplicable(String origPathStr, String user, Configuration conf) throws IOException { - Path path = new Path(origPathStr); - String result = origPathStr; - - // shortcut for s3/asv - // If path contains scheme, user should mean an absolute path, - // However, path.isAbsolute tell us otherwise. - // So we skip conversion for non-hdfs. - if (!(path.getFileSystem(conf) instanceof DistributedFileSystem)&& - !(path.getFileSystem(conf) instanceof LocalFileSystem)) { - return result; - } - if (!path.isAbsolute()) { - result = "/user/" + user + "/" + origPathStr; - } - return result; + public static String addUserHomeDirectoryIfApplicable(String origPathStr, String user) + throws IOException, URISyntaxException { + URI uri = new URI(origPathStr); + + if (uri.getPath().isEmpty()) { + String newPath = "/user/" + user; + uri = UriBuilder.fromUri(uri).replacePath(newPath).build(); + } else if (!new Path(uri.getPath()).isAbsolute()) { + String newPath = "/user/" + user + "/" + uri.getPath(); + uri = UriBuilder.fromUri(uri).replacePath(newPath).build(); + } // no work needed for absolute paths + + return uri.toString(); } public static Path hadoopFsPath(String fname, final Configuration conf, String user) @@ -254,7 +253,7 @@ public FileSystem run() } }); - fname = addUserHomeDirectoryIfApplicable(fname, user, conf); + fname = addUserHomeDirectoryIfApplicable(fname, user); URI u = new URI(fname); Path p = new Path(u).makeQualified(defaultFs); diff --git a/hcatalog/webhcat/svr/src/test/data/status/hive/stderr b/hcatalog/webhcat/svr/src/test/data/status/hive/stderr new file mode 100644 index 0000000..76a1a58 --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/data/status/hive/stderr @@ -0,0 +1,17 @@ +WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files. +Logging initialized using configuration in jar:file:/Users/daijy/hadoop-1.0.3/tmp/mapred/local/taskTracker/distcache/7168149899505899073_637041239_1133292873/localhost/apps/templeton/hive-0.10.0.tar.gz/hive-0.10.0/lib/hive-common-0.10.0.jar!/hive-log4j.properties +Hive history file=/tmp/daijy/hive_job_log_daijy_201305091500_862342848.txt +Total MapReduce jobs = 1 +Launching Job 1 out of 1 +Number of reduce tasks is set to 0 since there's no reduce operator +Starting Job = job_201305091437_0012, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201305091437_0012 +Kill Command = /Users/daijy/hadoop-1.0.3/libexec/../bin/hadoop job -kill job_201305091437_0012 +Hadoop job information for Stage-1: number of mappers: 0; number of reducers: 0 +2013-05-09 15:01:13,625 Stage-1 map = 0%, reduce = 0% +2013-05-09 15:01:19,660 Stage-1 map = 100%, reduce = 100% +Ended Job = job_201305091437_0012 +MapReduce Jobs Launched: +Job 0: HDFS Read: 0 HDFS Write: 0 SUCCESS +Total MapReduce CPU Time Spent: 0 msec +OK +Time taken: 26.187 seconds diff --git a/hcatalog/webhcat/svr/src/test/data/status/jar/stderr b/hcatalog/webhcat/svr/src/test/data/status/jar/stderr new file mode 100644 index 0000000..fac13ef --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/data/status/jar/stderr @@ -0,0 +1,40 @@ +13/05/09 09:56:05 INFO input.FileInputFormat: Total input paths to process : 1 +13/05/09 09:56:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +13/05/09 09:56:05 WARN snappy.LoadSnappy: Snappy native library not loaded +13/05/09 09:56:05 INFO mapred.JobClient: Running job: job_201305090950_0004 +13/05/09 09:56:06 INFO mapred.JobClient: map 0% reduce 0% +13/05/09 09:56:19 INFO mapred.JobClient: map 100% reduce 0% +13/05/09 09:56:31 INFO mapred.JobClient: map 100% reduce 100% +13/05/09 09:56:36 INFO mapred.JobClient: Job complete: job_201305090950_0004 +13/05/09 09:56:36 INFO mapred.JobClient: Counters: 26 +13/05/09 09:56:36 INFO mapred.JobClient: Job Counters +13/05/09 09:56:36 INFO mapred.JobClient: Launched reduce tasks=1 +13/05/09 09:56:36 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=12660 +13/05/09 09:56:36 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 +13/05/09 09:56:36 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 +13/05/09 09:56:36 INFO mapred.JobClient: Launched map tasks=1 +13/05/09 09:56:36 INFO mapred.JobClient: Data-local map tasks=1 +13/05/09 09:56:36 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10247 +13/05/09 09:56:36 INFO mapred.JobClient: File Output Format Counters +13/05/09 09:56:36 INFO mapred.JobClient: Bytes Written=16 +13/05/09 09:56:36 INFO mapred.JobClient: FileSystemCounters +13/05/09 09:56:36 INFO mapred.JobClient: FILE_BYTES_READ=38 +13/05/09 09:56:36 INFO mapred.JobClient: HDFS_BYTES_READ=127 +13/05/09 09:56:36 INFO mapred.JobClient: FILE_BYTES_WRITTEN=45519 +13/05/09 09:56:36 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=16 +13/05/09 09:56:36 INFO mapred.JobClient: File Input Format Counters +13/05/09 09:56:36 INFO mapred.JobClient: Bytes Read=8 +13/05/09 09:56:36 INFO mapred.JobClient: Map-Reduce Framework +13/05/09 09:56:36 INFO mapred.JobClient: Map output materialized bytes=38 +13/05/09 09:56:36 INFO mapred.JobClient: Map input records=2 +13/05/09 09:56:36 INFO mapred.JobClient: Reduce shuffle bytes=0 +13/05/09 09:56:36 INFO mapred.JobClient: Spilled Records=8 +13/05/09 09:56:36 INFO mapred.JobClient: Map output bytes=24 +13/05/09 09:56:36 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200 +13/05/09 09:56:36 INFO mapred.JobClient: Combine input records=4 +13/05/09 09:56:36 INFO mapred.JobClient: SPLIT_RAW_BYTES=119 +13/05/09 09:56:36 INFO mapred.JobClient: Reduce input records=4 +13/05/09 09:56:36 INFO mapred.JobClient: Reduce input groups=4 +13/05/09 09:56:36 INFO mapred.JobClient: Combine output records=4 +13/05/09 09:56:36 INFO mapred.JobClient: Reduce output records=4 +13/05/09 09:56:36 INFO mapred.JobClient: Map output records=4 diff --git a/hcatalog/webhcat/svr/src/test/data/status/pig/stderr b/hcatalog/webhcat/svr/src/test/data/status/pig/stderr new file mode 100644 index 0000000..62ccc91 --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/data/status/pig/stderr @@ -0,0 +1,55 @@ +2013-04-18 14:54:45,945 [main] INFO org.apache.pig.Main - Apache Pig version 0.10.1 (r1426677) compiled Dec 28 2012, 16:46:13 +2013-04-18 14:54:45,946 [main] INFO org.apache.pig.Main - Logging error messages to: /Users/daijy/hadoop-1.0.3/tmp/mapred/local/taskTracker/daijy/jobcache/job_201304181449_0003/attempt_201304181449_0003_m_000000_0/work/pig_1366322085940.log +2013-04-18 14:54:46,381 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:8020 +2013-04-18 14:54:46,512 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001 +2013-04-18 14:54:46,899 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN +2013-04-18 14:54:47,059 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false +2013-04-18 14:54:47,082 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1 +2013-04-18 14:54:47,083 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1 +2013-04-18 14:54:47,144 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job +2013-04-18 14:54:47,159 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3 +2013-04-18 14:54:47,162 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job4814368788682413488.jar +2013-04-18 14:54:50,051 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job4814368788682413488.jar created +2013-04-18 14:54:50,065 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job +2013-04-18 14:54:50,093 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission. +2013-04-18 14:54:50,386 [Thread-7] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 +2013-04-18 14:54:50,386 [Thread-7] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 +2013-04-18 14:54:50,395 [Thread-7] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +2013-04-18 14:54:50,395 [Thread-7] WARN org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library not loaded +2013-04-18 14:54:50,397 [Thread-7] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1 +2013-04-18 14:54:50,594 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201304181449_0004 +2013-04-18 14:54:50,595 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://localhost:50030/jobdetails.jsp?jobid=job_201304181449_0004 +2013-04-18 14:54:50,597 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete +2013-04-18 14:55:12,184 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete +2013-04-18 14:55:20,743 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete +2013-04-18 14:55:20,744 [main] INFO org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics: + +HadoopVersion PigVersion UserId StartedAt FinishedAt Features +1.0.3 0.10.1 daijy 2013-04-18 14:54:47 2013-04-18 14:55:20 UNKNOWN + +Success! + +Job Stats (time in seconds): +JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs +job_201304181449_0004 1 0 6 6 6 0 0 0 a MAP_ONLY hdfs://localhost:8020/tmp/temp416260498/tmp1616274076, + +Input(s): +Successfully read 3 records (369 bytes) from: "hdfs://localhost:8020/user/daijy/2.txt" + +Output(s): +Successfully stored 3 records (33 bytes) in: "hdfs://localhost:8020/tmp/temp416260498/tmp1616274076" + +Counters: +Total records written : 3 +Total bytes written : 33 +Spillable Memory Manager spill count : 0 +Total bags proactively spilled: 0 +Total records proactively spilled: 0 + +Job DAG: +job_201304181449_0004 + + +2013-04-18 14:55:20,752 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! +2013-04-18 14:55:20,759 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 +2013-04-18 14:55:20,759 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 diff --git a/hcatalog/webhcat/svr/src/test/data/status/streaming/stderr b/hcatalog/webhcat/svr/src/test/data/status/streaming/stderr new file mode 100644 index 0000000..f51bead --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/data/status/streaming/stderr @@ -0,0 +1,15 @@ +13/05/09 09:58:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +13/05/09 09:58:26 WARN snappy.LoadSnappy: Snappy native library not loaded +13/05/09 09:58:26 INFO mapred.FileInputFormat: Total input paths to process : 1 +13/05/09 09:58:26 INFO streaming.StreamJob: getLocalDirs(): [/Users/daijy/hadoop-1.0.3/tmp/mapred/local] +13/05/09 09:58:26 INFO streaming.StreamJob: Running job: job_201305090950_0006 +13/05/09 09:58:26 INFO streaming.StreamJob: To kill this job, run: +13/05/09 09:58:26 INFO streaming.StreamJob: /Users/daijy/hadoop-1.0.3/libexec/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201305090950_0006 +13/05/09 09:58:26 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201305090950_0006 +13/05/09 09:58:27 INFO streaming.StreamJob: map 0% reduce 0% +13/05/09 09:58:39 INFO streaming.StreamJob: map 50% reduce 0% +13/05/09 09:58:45 INFO streaming.StreamJob: map 100% reduce 0% +13/05/09 09:58:48 INFO streaming.StreamJob: map 100% reduce 17% +13/05/09 09:58:57 INFO streaming.StreamJob: map 100% reduce 100% +13/05/09 09:59:03 INFO streaming.StreamJob: Job complete: job_201305090950_0006 +13/05/09 09:59:03 INFO streaming.StreamJob: Output: ooo4 diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestJobIDParser.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestJobIDParser.java new file mode 100644 index 0000000..6d3457a --- /dev/null +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestJobIDParser.java @@ -0,0 +1,51 @@ +package org.apache.hive.hcatalog.templeton; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; +import org.junit.Test; + +import junit.framework.Assert; +import junit.framework.TestCase; + +public class TestJobIDParser extends TestCase { + @Test + public void testParsePig() throws IOException { + String errFileName = "../../src/test/data/status/pig/" + TempletonControllerJob.STDERR_FNAME; + PigJobIDParser pigJobIDParser = new PigJobIDParser(null, new Configuration()); + BufferedReader in = new BufferedReader(new FileReader(errFileName)); + List jobs = pigJobIDParser.findJobID(in, PigJobIDParser.jobidPattern); + Assert.assertEquals(jobs.size(), 1); + } + + @Test + public void testParseHive() throws IOException { + String errFileName = "../../src/test/data/status/hive/" + TempletonControllerJob.STDERR_FNAME; + HiveJobIDParser hiveJobIDParser = new HiveJobIDParser(null, new Configuration()); + BufferedReader in = new BufferedReader(new FileReader(errFileName)); + List jobs = hiveJobIDParser.findJobID(in, HiveJobIDParser.jobidPattern); + Assert.assertEquals(jobs.size(), 1); + } + + @Test + public void testParseJar() throws IOException { + String errFileName = "../../src/test/data/status/jar/" + TempletonControllerJob.STDERR_FNAME; + JarJobIDParser jarJobIDParser = new JarJobIDParser(null, new Configuration()); + BufferedReader in = new BufferedReader(new FileReader(errFileName)); + List jobs = jarJobIDParser.findJobID(in, JarJobIDParser.jobidPattern); + Assert.assertEquals(jobs.size(), 1); + } + + @Test + public void testParseStreaming() throws IOException { + String errFileName = "../../src/test/data/status/streaming/" + TempletonControllerJob.STDERR_FNAME; + JarJobIDParser jarJobIDParser = new JarJobIDParser(null, new Configuration()); + BufferedReader in = new BufferedReader(new FileReader(errFileName)); + List jobs = jarJobIDParser.findJobID(in, JarJobIDParser.jobidPattern); + Assert.assertEquals(jobs.size(), 1); + } +} diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java index f48c548..5f536d6 100644 --- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java +++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.FileNotFoundException; +import java.net.URISyntaxException; import java.io.IOException; import org.apache.hadoop.conf.Configuration; @@ -245,4 +246,41 @@ public void testHadoopFsListAsString() { } } + @Test + public void testConstructingUserHomeDirectory() throws Exception { + String[] sources = new String[] { + "output+", + "/user/hadoop/output", + "hdfs://container", + "hdfs://container/", + "hdfs://container/path", + "output#link", + "hdfs://cointaner/output#link", + "hdfs://container@acc/test"}; + String[] expectedResults = new String[] { + "/user/webhcat/output+", + "/user/hadoop/output", + "hdfs://container/user/webhcat", + "hdfs://container/", + "hdfs://container/path", + "/user/webhcat/output#link", + "hdfs://cointaner/output#link", + "hdfs://container@acc/test"}; + for (int i=0;i