diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 41b1dc5..a28db2f 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -40,7 +40,7 @@ public JarDelegator(AppConfig appConf) { } public EnqueueBean run(String user, Map userArgs, String jar, String mainClass, - String libjars, String files, + String libjars, String archives, String files, List jarArgs, List defines, String statusdir, String callback, boolean usesHcatalog, String completedUrl, @@ -49,14 +49,14 @@ public EnqueueBean run(String user, Map userArgs, String jar, St ExecuteException, IOException, InterruptedException { runAs = user; List args = makeArgs(jar, mainClass, - libjars, files, jarArgs, defines, + libjars, archives, files, jarArgs, defines, statusdir, usesHcatalog, completedUrl, enablelog, jobType); return enqueueController(user, userArgs, callback, args); } private List makeArgs(String jar, String mainClass, - String libjars, String files, + String libjars, String archives, String files, List jarArgs, List defines, String statusdir, boolean usesHcatalog, String completedUrl, boolean enablelog, JobType jobType) @@ -88,6 +88,12 @@ public EnqueueBean run(String user, Map userArgs, String jar, St args.add("-libjars"); args.add(TempletonUtils.quoteForWindows(libjarsListAsString)); } + if (TempletonUtils.isset(archives)) { + String archivesListAsString = + TempletonUtils.hadoopFsListAsString(archives, appConf, runAs); + args.add("-archives"); + args.add(TempletonUtils.quoteForWindows(archivesListAsString)); + } if (TempletonUtils.isset(files)) { String filesListAsString = TempletonUtils.hadoopFsListAsString(files, appConf, runAs); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index f0c3635..9390893 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -639,6 +639,11 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, @FormParam("mapper") String mapper, @FormParam("reducer") String reducer, @FormParam("combiner") String combiner, + @FormParam("partitioner") String partitioner, + @FormParam("inputformat") String inputformat, + @FormParam("outputformat") String outputformat, + @FormParam("libjars") String libjars, + @FormParam("archives") String archives, @FormParam("file") List fileList, @FormParam("files") String files, @FormParam("define") List defines, @@ -661,6 +666,11 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, userArgs.put("mapper", mapper); userArgs.put("reducer", reducer); userArgs.put("combiner", combiner); + userArgs.put("partitioner", partitioner); + userArgs.put("inputformat", inputformat); + userArgs.put("outputformat", outputformat); + userArgs.put("libjars", libjars); + userArgs.put("archives", archives); userArgs.put("file", fileList); userArgs.put("files", files); userArgs.put("define", defines); @@ -673,6 +683,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, StreamingDelegator d = new StreamingDelegator(appConf); return d.run(getDoAsUser(), userArgs, inputs, output, mapper, reducer, combiner, + partitioner, inputformat, outputformat, libjars, archives, fileList, files, defines, cmdenvs, args, statusdir, callback, getCompletedUrl(), enablelog, JobType.STREAMING); } @@ -691,6 +702,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, public EnqueueBean mapReduceJar(@FormParam("jar") String jar, @FormParam("class") String mainClass, @FormParam("libjars") String libjars, + @FormParam("archives") String archives, @FormParam("files") String files, @FormParam("arg") List args, @FormParam("define") List defines, @@ -709,6 +721,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, userArgs.put("jar", jar); userArgs.put("class", mainClass); userArgs.put("libjars", libjars); + userArgs.put("archives", archives); userArgs.put("files", files); userArgs.put("arg", args); userArgs.put("define", defines); @@ -721,7 +734,7 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, JarDelegator d = new JarDelegator(appConf); return d.run(getDoAsUser(), userArgs, jar, mainClass, - libjars, files, args, defines, + libjars, archives, files, args, defines, statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, JobType.JAR); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index 147a041..4eb1f5e 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -40,6 +40,11 @@ public StreamingDelegator(AppConfig appConf) { public EnqueueBean run(String user, Map userArgs, List inputs, String output, String mapper, String reducer, String combiner, + String partitioner, + String inputformat, + String outputformat, + String libjars, + String archives, List fileList, String files, List defines, List cmdenvs, @@ -52,12 +57,13 @@ public EnqueueBean run(String user, Map userArgs, throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { List args = makeArgs(inputs, output, mapper, reducer, combiner, + partitioner, inputformat, outputformat, fileList, cmdenvs, jarArgs); JarDelegator d = new JarDelegator(appConf); return d.run(user, userArgs, appConf.streamingJar(), null, - null, files, args, defines, + libjars, archives, files, args, defines, statusdir, callback, false, completedUrl, enableLog, jobType); } @@ -66,6 +72,9 @@ public EnqueueBean run(String user, Map userArgs, String mapper, String reducer, String combiner, + String partitioner, + String inputformat, + String outputformat, List fileList, List cmdenvs, List jarArgs) @@ -87,6 +96,18 @@ public EnqueueBean run(String user, Map userArgs, args.add("-combiner"); args.add(combiner); } + if (TempletonUtils.isset(partitioner)) { + args.add("-partitioner"); + args.add(partitioner); + } + if (TempletonUtils.isset(inputformat)) { + args.add("-inputformat"); + args.add(inputformat); + } + if (TempletonUtils.isset(outputformat)) { + args.add("-outputformat"); + args.add(outputformat); + } for (String f : fileList) { args.add("-file");