diff --git hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf index 791c00a..0e141ef 100644 --- hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf +++ hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf @@ -110,6 +110,26 @@ $cfg = 'check_job_exit_value' => 1, 'check_call_back' => 1, }, + { + #all streaming options exercised + 'num' => 5, + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', + 'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', + 'mapper=org.apache.hadoop.mapred.lib.InverseMapper', 'reducer=org.apache.hadoop.mapred.lib.IdentityReducer', + 'libjars=hdfs:///webhcate2e/hclient.jar','archives=hdfs:///webhcate2e/hexamples.jar', + 'inputformat=org.apache.hadoop.mapred.KeyValueTextInputFormat','outputformat=org.apache.hadoop.mapred.TextOutputFormat', + 'partitioner=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner', + 'define=mapreduce.map.output.key.class=org.apache.hadoop.io.Text', + 'define=mapreduce.map.output.value.class=org.apache.hadoop.io.Text'], + 'json_field_substr_match' => { 'id' => '\d+'}, + #results + 'status_code' => 200, + 'check_job_created' => 1, + 'check_job_complete' => 'SUCCESS', + 'check_job_exit_value' => 0, + 'check_call_back' => 1, + }, ] }, ##============================================================================================================= diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index ebe1179..6022372 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -40,7 +40,7 @@ public JarDelegator(AppConfig appConf) { } public EnqueueBean run(String user, Map userArgs, String jar, String mainClass, - String libjars, String files, + String libjars, String archives, String files, List jarArgs, List defines, String statusdir, String callback, boolean usesHcatalog, String completedUrl, @@ -49,14 +49,14 @@ public EnqueueBean run(String user, Map userArgs, String jar, St ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(jar, mainClass, - libjars, files, jarArgs, defines, + libjars, archives, files, jarArgs, defines, statusdir, usesHcatalog, completedUrl, enablelog, jobType); return enqueueController(user, userArgs, callback, args); } private List makeArgs(String jar, String mainClass, - String libjars, String files, + String libjars, String archives, String files, List jarArgs, List defines, String statusdir, boolean usesHcatalog, String completedUrl, boolean enablelog, JobType jobType) @@ -88,6 +88,12 @@ public EnqueueBean run(String user, Map userArgs, String jar, St args.add("-libjars"); args.add(TempletonUtils.quoteForWindows(libjarsListAsString)); } + if (TempletonUtils.isset(archives)) { + String archivesListAsString = + TempletonUtils.hadoopFsListAsString(archives, appConf, runAs); + args.add("-archives"); + args.add(TempletonUtils.quoteForWindows(archivesListAsString)); + } if (TempletonUtils.isset(files)) { String filesListAsString = TempletonUtils.hadoopFsListAsString(files, appConf, runAs); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index d2127e1..06f0cb4 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -639,6 +639,11 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, @FormParam("mapper") String mapper, @FormParam("reducer") String reducer, @FormParam("combiner") String combiner, + @FormParam("partitioner") String partitioner, + @FormParam("inputformat") String inputformat, + @FormParam("outputformat") String outputformat, + @FormParam("libjars") String libjars, + @FormParam("archives") String archives, @FormParam("file") List fileList, @FormParam("files") String files, @FormParam("define") List defines, @@ -661,6 +666,11 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, userArgs.put("mapper", mapper); userArgs.put("reducer", reducer); userArgs.put("combiner", combiner); + userArgs.put("partitioner", partitioner); + userArgs.put("inputformat", inputformat); + userArgs.put("outputformat", outputformat); + userArgs.put("libjars", libjars); + userArgs.put("archives", archives); userArgs.put("file", fileList); userArgs.put("files", files); userArgs.put("define", defines); @@ -673,6 +683,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, StreamingDelegator d = new StreamingDelegator(appConf); return d.run(getDoAsUser(), userArgs, inputs, output, mapper, reducer, combiner, + partitioner, inputformat, outputformat, libjars, archives, fileList, files, defines, cmdenvs, args, statusdir, callback, getCompletedUrl(), enablelog, JobType.STREAMING); } @@ -691,6 +702,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, public EnqueueBean mapReduceJar(@FormParam("jar") String jar, @FormParam("class") String mainClass, @FormParam("libjars") String libjars, + @FormParam("archives") String archives, @FormParam("files") String files, @FormParam("arg") List args, @FormParam("define") List defines, @@ -709,6 +721,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, userArgs.put("jar", jar); userArgs.put("class", mainClass); userArgs.put("libjars", libjars); + userArgs.put("archives", archives); userArgs.put("files", files); userArgs.put("arg", args); userArgs.put("define", defines); @@ -721,7 +734,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, JarDelegator d = new JarDelegator(appConf); return d.run(getDoAsUser(), userArgs, jar, mainClass, - libjars, files, args, defines, + libjars, archives, files, args, defines, statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, JobType.JAR); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index 147a041..4eb1f5e 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -40,6 +40,11 @@ public StreamingDelegator(AppConfig appConf) { public EnqueueBean run(String user, Map userArgs, List inputs, String output, String mapper, String reducer, String combiner, + String partitioner, + String inputformat, + String outputformat, + String libjars, + String archives, List fileList, String files, List defines, List cmdenvs, @@ -52,12 +57,13 @@ public EnqueueBean run(String user, Map userArgs, throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { List args = makeArgs(inputs, output, mapper, reducer, combiner, + partitioner, inputformat, outputformat, fileList, cmdenvs, jarArgs); JarDelegator d = new JarDelegator(appConf); return d.run(user, userArgs, appConf.streamingJar(), null, - null, files, args, defines, + libjars, archives, files, args, defines, statusdir, callback, false, completedUrl, enableLog, jobType); } @@ -66,6 +72,9 @@ public EnqueueBean run(String user, Map userArgs, String mapper, String reducer, String combiner, + String partitioner, + String inputformat, + String outputformat, List fileList, List cmdenvs, List jarArgs) @@ -87,6 +96,18 @@ public EnqueueBean run(String user, Map userArgs, args.add("-combiner"); args.add(combiner); } + if (TempletonUtils.isset(partitioner)) { + args.add("-partitioner"); + args.add(partitioner); + } + if (TempletonUtils.isset(inputformat)) { + args.add("-inputformat"); + args.add(inputformat); + } + if (TempletonUtils.isset(outputformat)) { + args.add("-outputformat"); + args.add(outputformat); + } for (String f : fileList) { args.add("-file");