From 44d2d769ce0e1e713126cc8f258d3dfbab43ad48 Mon Sep 17 00:00:00 2001 From: Gautam Kowshik Date: Sun, 2 Jun 2013 16:17:41 -0700 Subject: [PATCH] HCATALOG-647 : Added ability to parse child job id and progress for Hive jobs. modified/added appropriate unit tests. changed to flush writing childjob stdout/stderr output more frequently. --- .../templeton/tool/TempletonControllerJob.java | 25 ++++++------------ .../hcatalog/templeton/tool/TempletonUtils.java | 18 ++++++++----- .../templeton/tool/TestTempletonUtils.java | 30 +++++++++++++++++++--- 3 files changed, 46 insertions(+), 27 deletions(-) diff --git a/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java b/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java index 344f492..9c4681d 100644 --- a/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java +++ b/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonControllerJob.java @@ -18,21 +18,6 @@ */ package org.apache.hcatalog.templeton.tool; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,11 +33,17 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; + +import java.io.*; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * A Map Reduce job that will start another job. @@ -226,6 +217,7 @@ public class TempletonControllerJob extends Configured implements Tool { String line; while ((line = reader.readLine()) != null) { writer.println(line); + writer.flush(); JobState state = null; try { String percent = TempletonUtils.extractPercentComplete(line); @@ -247,7 +239,6 @@ public class TempletonControllerJob extends Configured implements Tool { } } } - writer.flush(); } catch (IOException e) { System.err.println("templeton: execute error: " + e); } diff --git a/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java b/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java index ebf4f17..1a045a1 100644 --- a/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java +++ b/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/tool/TempletonUtils.java @@ -18,6 +18,11 @@ */ package org.apache.hcatalog.templeton.tool; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -32,11 +37,6 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.StringUtils; - /** * General utility methods. */ @@ -81,6 +81,7 @@ public class TempletonUtils { public static final Pattern JAR_COMPLETE = Pattern.compile(" map \\d+%\\s+reduce \\d+%$"); public static final Pattern PIG_COMPLETE = Pattern.compile(" \\d+% complete$"); + public static final Pattern HIVE_COMPLETE = Pattern.compile("map = \\d+%, reduce = \\d+%"); /** * Extract the percent complete line from Pig or Jar jobs. @@ -94,12 +95,17 @@ public class TempletonUtils { if (pig.find()) return pig.group().trim(); + Matcher hive = HIVE_COMPLETE.matcher(line); + if (hive.find()) + return hive.group().trim(); + return null; } public static final Pattern JAR_ID = Pattern.compile(" Running job: (\\S+)$"); public static final Pattern PIG_ID = Pattern.compile(" HadoopJobId: (\\S+)$"); - public static final Pattern[] ID_PATTERNS = {JAR_ID, PIG_ID}; + public static final Pattern HIVE_ID = Pattern.compile("jobid=(\\S+)$"); + public static final Pattern[] ID_PATTERNS = {JAR_ID, PIG_ID, HIVE_ID}; /** * Extract the job id from jar jobs. diff --git a/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/tool/TestTempletonUtils.java b/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/tool/TestTempletonUtils.java index 4f23e0a..900b048 100644 --- a/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/tool/TestTempletonUtils.java +++ b/webhcat/svr/src/test/java/org/apache/hcatalog/templeton/tool/TestTempletonUtils.java @@ -18,14 +18,13 @@ */ package org.apache.hcatalog.templeton.tool; -import org.junit.Assert; - -import java.io.FileNotFoundException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; +import org.junit.Assert; import org.junit.Test; +import java.io.FileNotFoundException; + public class TestTempletonUtils { public static final String[] CONTROLLER_LINES = { "2011-12-15 18:12:21,758 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: http://localhost:50030/jobdetails.jsp?jobid=job_201112140012_0047", @@ -62,8 +61,31 @@ public class TestTempletonUtils { String fifty = "2011-12-15 18:12:36,333 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete"; Assert.assertEquals("50% complete", TempletonUtils.extractPercentComplete(fifty)); + + // MapRed jar run + String sixty_six = "13/06/02 20:57:32 INFO mapred.JobClient: map 66% reduce 0%"; + Assert.assertEquals("map 66% reduce 0%", TempletonUtils.extractPercentComplete(sixty_six)); + + // Hive job run + String hundred = "2013-06-02 20:52:57,331 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 13.31 sec"; + Assert.assertEquals("map = 100%, reduce = 100%", TempletonUtils.extractPercentComplete(hundred)); } + + @Test + public void testExtractChildJobId() { + Assert.assertNull(TempletonUtils.extractChildJobId("fred")); + + // MapRed jar run + String jar_job = "12/01/02 20:57:11 INFO mapred.JobClient: Running job: job_201201221934_0798"; + Assert.assertEquals("job_201201221934_0798", TempletonUtils.extractChildJobId(jar_job)); + + String hive_job = "Starting Job = job_201305221934_0798, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201201221934_0798"; + Assert.assertEquals("job_201201221934_0798", TempletonUtils.extractChildJobId(hive_job)); + + } + + @Test public void testEncodeArray() { Assert.assertEquals(null, TempletonUtils.encodeArray((String []) null)); -- 1.8.1.2