diff --git a/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestStreamingStatus.java b/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestStreamingStatus.java index b2af40adbfa..cf30cd41b03 100644 --- a/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestStreamingStatus.java +++ b/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestStreamingStatus.java @@ -22,6 +22,9 @@ import java.io.IOException; import java.io.File; +import org.apache.hadoop.mapred.MiniMRClientCluster; +import org.apache.hadoop.mapred.MiniMRClientClusterFactory; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -32,7 +35,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapred.TaskLog; @@ -90,7 +92,7 @@ "print STDERR \"my error msg after consuming input\\n\";\n" + "print STDERR \"reporter:counter:myOwnCounterGroup,myOwnCounter,1\\n\";\n"; - MiniMRCluster mr = null; + private MiniMRClientCluster mr; FileSystem fs = null; JobConf conf = null; @@ -105,10 +107,10 @@ public void setUp() throws IOException { conf.setBoolean(JTConfig.JT_RETIREJOBS, false); conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false); - mr = new MiniMRCluster(1, "file:///", 3, null , null, conf); + mr = MiniMRClientClusterFactory.create(this.getClass(), 3, conf); Path inFile = new Path(INPUT_FILE); - fs = inFile.getFileSystem(mr.createJobConf()); + fs = inFile.getFileSystem(mr.getConfig()); clean(fs); buildExpectedJobOutput(); @@ -118,9 +120,13 @@ public void setUp() throws IOException { * Kill the cluster after the test is done. */ @After - public void tearDown() { - if (fs != null) { clean(fs); } - if (mr != null) { mr.shutdown(); } + public void tearDown() throws IOException { + if (fs != null) { + clean(fs); + } + if (mr != null) { + mr.stop(); + } } // Updates expectedOutput to have the expected job output as a string @@ -146,21 +152,23 @@ protected void createInputAndScript(boolean isEmptyInput, file.close(); } - protected String[] genArgs(String jobtracker, String mapper, String reducer) + protected String[] genArgs(String jobtracker, String rmAddress, + String mapper, String reducer) { return new String[] { - "-input", INPUT_FILE, - "-output", OUTPUT_DIR, - "-mapper", mapper, - "-reducer", reducer, - "-jobconf", MRJobConfig.NUM_MAPS + "=1", - "-jobconf", MRJobConfig.NUM_REDUCES + "=1", - "-jobconf", MRJobConfig.PRESERVE_FAILED_TASK_FILES + "=true", - "-jobconf", "stream.tmpdir=" + new Path(TEST_ROOT_DIR).toUri().getPath(), - "-jobconf", JTConfig.JT_IPC_ADDRESS + "="+jobtracker, - "-jobconf", "fs.default.name=file:///", - "-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR, - "-jobconf", "mapreduce.framework.name=yarn" + "-input", INPUT_FILE, + "-output", OUTPUT_DIR, + "-mapper", mapper, + "-reducer", reducer, + "-jobconf", MRJobConfig.NUM_MAPS + "=1", + "-jobconf", MRJobConfig.NUM_REDUCES + "=1", + "-jobconf", MRJobConfig.PRESERVE_FAILED_TASK_FILES + "=true", + "-jobconf", YarnConfiguration.RM_ADDRESS + "=" + rmAddress, + "-jobconf", "stream.tmpdir=" + new Path(TEST_ROOT_DIR).toUri().getPath(), + "-jobconf", JTConfig.JT_IPC_ADDRESS + "="+jobtracker, + "-jobconf", "fs.default.name=file:///", + "-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR, + "-jobconf", "mapreduce.framework.name=yarn" }; } @@ -218,10 +226,9 @@ public void testReporting() throws Exception { * Run another streaming job with the given script as reducer and validate. * * @param isEmptyInput Should the input to the script be empty ? - * @param script The content of the script that will run as the streaming task */ private void testStreamJob(boolean isEmptyInput) - throws IOException { + throws Exception { createInputAndScript(isEmptyInput, script); @@ -249,11 +256,12 @@ private void testStreamJob(boolean isEmptyInput) // all "reporter:status" and "reporter:counter" lines. // (4) Validate stderr of task of given task type. // (5) Validate job output - void runStreamJob(TaskType type, boolean isEmptyInput) throws IOException { - boolean mayExit = false; - StreamJob job = new StreamJob(genArgs( - mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS), map, reduce), mayExit); - int returnValue = job.go(); + private void runStreamJob(TaskType type, boolean isEmptyInput) + throws Exception { + StreamJob job = new StreamJob(); + int returnValue = job.run(genArgs( + mr.getConfig().get(JTConfig.JT_IPC_ADDRESS), + mr.getConfig().get(YarnConfiguration.RM_ADDRESS), map, reduce)); assertEquals(0, returnValue); // If input to reducer is empty, dummy reporter(which ignores all