diff --git a/hcatalog/src/docs/src/documentation/content/xdocs/queue.xml b/hcatalog/src/docs/src/documentation/content/xdocs/queue.xml index bd7af76..c5771fd 100644 --- a/hcatalog/src/docs/src/documentation/content/xdocs/queue.xml +++ b/hcatalog/src/docs/src/documentation/content/xdocs/queue.xml @@ -143,7 +143,17 @@ "exitValue": 0, "user": "ctdean", "callback": null, - "completed": "done" + "completed": "done", + "userargs" => { + "callback" => null, + "define" => [], + "enablelog" => "false", + "execute" => "select a,rand(b) from mynums", + "file" => null, + "files" => [], + "statusdir" => null, + "user.name" => "hadoopqa", + }, } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java index 200d35a..216e05e 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.apache.commons.exec.ExecuteException; import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; @@ -40,7 +41,7 @@ public HiveDelegator(AppConfig appConf) { super(appConf); } - public EnqueueBean run(String user, + public EnqueueBean run(String user, Map userArgs, String execute, String srcFile, List defines, List hiveArgs, String otherFiles, String statusdir, String callback, String completedUrl, boolean enablelog) @@ -51,7 +52,7 @@ public EnqueueBean run(String user, List args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir, completedUrl, enablelog); - return enqueueController(user, callback, args); + return enqueueController(user, userArgs, callback, args); } private List makeArgs(String execute, String srcFile, diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java index 13f8f6f..b451f5e 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java @@ -23,6 +23,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.exec.ExecuteException; import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; @@ -38,7 +39,7 @@ public JarDelegator(AppConfig appConf) { super(appConf); } - public EnqueueBean run(String user, String jar, String mainClass, + public EnqueueBean run(String user, Map userArgs, String jar, String mainClass, String libjars, String files, List jarArgs, List defines, String statusdir, String callback, String completedUrl, @@ -50,7 +51,7 @@ public EnqueueBean run(String user, String jar, String mainClass, libjars, files, jarArgs, defines, statusdir, completedUrl, enablelog, jobType); - return enqueueController(user, callback, args); + return enqueueController(user, userArgs, callback, args); } private List makeArgs(String jar, String mainClass, diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java index 900e19c..46ba5f5 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java @@ -22,6 +22,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.exec.ExecuteException; import org.apache.commons.logging.Log; @@ -49,13 +50,15 @@ public LauncherDelegator(AppConfig appConf) { super(appConf); } - public void registerJob(String id, String user, String callback) + public void registerJob(String id, String user, String callback, + Map userArgs) throws IOException { JobState state = null; try { state = new JobState(id, Main.getAppConfigInstance()); state.setUser(user); state.setCallback(callback); + state.setUserArgs(userArgs); } finally { if (state != null) state.close(); @@ -65,7 +68,7 @@ public void registerJob(String id, String user, String callback) /** * Enqueue the TempletonControllerJob directly calling doAs. */ - public EnqueueBean enqueueController(String user, String callback, + public EnqueueBean enqueueController(String user, Map userArgs, String callback, List args) throws NotAuthorizedException, BusyException, ExecuteException, IOException, QueueException { @@ -82,7 +85,7 @@ public EnqueueBean enqueueController(String user, String callback, if (id == null) throw new QueueException("Unable to get job id"); - registerJob(id, user, callback); + registerJob(id, user, callback, userArgs); return new EnqueueBean(id); } catch (InterruptedException e) { diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java index 07f4d40..ff1e2c0 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.apache.commons.exec.ExecuteException; import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; @@ -39,7 +40,7 @@ public PigDelegator(AppConfig appConf) { super(appConf); } - public EnqueueBean run(String user, + public EnqueueBean run(String user, Map userArgs, String execute, String srcFile, List pigArgs, String otherFiles, String statusdir, String callback, String completedUrl, boolean enablelog) @@ -50,7 +51,7 @@ public EnqueueBean run(String user, srcFile, pigArgs, otherFiles, statusdir, completedUrl, enablelog); - return enqueueController(user, callback, args); + return enqueueController(user, userArgs, callback, args); } private List makeArgs(String execute, String srcFile, diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/QueueStatusBean.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/QueueStatusBean.java index 509ac1f..9723714 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/QueueStatusBean.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/QueueStatusBean.java @@ -19,6 +19,7 @@ package org.apache.hive.hcatalog.templeton; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.JobProfile; @@ -38,6 +39,7 @@ public String user; public String callback; public String completed; + public Map userargs; public QueueStatusBean() { } @@ -63,5 +65,6 @@ public QueueStatusBean(JobState state, JobStatus status, JobProfile profile) user = state.getUser(); callback = state.getCallback(); completed = state.getCompleteStatus(); + userargs = state.getUserArgs(); } } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index 59221eb..b6b1fc2 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -601,10 +601,23 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, verifyParam(mapper, "mapper"); verifyParam(reducer, "reducer"); + Map userArgs = new HashMap(); + userArgs.put("user.name", getDoAsUser()); + userArgs.put("input", inputs); + userArgs.put("output", output); + userArgs.put("mapper", mapper); + userArgs.put("reducer", reducer); + userArgs.put("files", files); + userArgs.put("define", defines); + userArgs.put("cmdenv", cmdenvs); + userArgs.put("arg", args); + userArgs.put("statusdir", statusdir); + userArgs.put("callback", callback); + userArgs.put("enablelog", Boolean.toString(enablelog)); checkEnableLogPrerequisite(enablelog, statusdir); StreamingDelegator d = new StreamingDelegator(appConf); - return d.run(getDoAsUser(), inputs, output, mapper, reducer, + return d.run(getDoAsUser(), userArgs, inputs, output, mapper, reducer, files, defines, cmdenvs, args, statusdir, callback, getCompletedUrl(), enablelog, JobType.STREAMING); } @@ -630,10 +643,22 @@ public EnqueueBean mapReduceJar(@FormParam("jar") String jar, verifyParam(jar, "jar"); verifyParam(mainClass, "class"); + Map userArgs = new HashMap(); + userArgs.put("user.name", getDoAsUser()); + userArgs.put("jar", jar); + userArgs.put("class", mainClass); + userArgs.put("libjars", libjars); + userArgs.put("files", files); + userArgs.put("arg", args); + userArgs.put("define", defines); + userArgs.put("statusdir", statusdir); + userArgs.put("callback", callback); + userArgs.put("enablelog", Boolean.toString(enablelog)); + checkEnableLogPrerequisite(enablelog, statusdir); JarDelegator d = new JarDelegator(appConf); - return d.run(getDoAsUser(), + return d.run(getDoAsUser(), userArgs, jar, mainClass, libjars, files, args, defines, statusdir, callback, getCompletedUrl(), enablelog, JobType.JAR); @@ -658,10 +683,21 @@ public EnqueueBean pig(@FormParam("execute") String execute, if (execute == null && srcFile == null) throw new BadParam("Either execute or file parameter required"); + //add all function arguments to a map + Map userArgs = new HashMap(); + userArgs.put("user.name", getDoAsUser()); + userArgs.put("execute", execute); + userArgs.put("file", srcFile); + userArgs.put("arg", pigArgs); + userArgs.put("files", otherFiles); + userArgs.put("statusdir", statusdir); + userArgs.put("callback", callback); + userArgs.put("enablelog", Boolean.toString(enablelog)); + checkEnableLogPrerequisite(enablelog, statusdir); PigDelegator d = new PigDelegator(appConf); - return d.run(getDoAsUser(), + return d.run(getDoAsUser(), userArgs, execute, srcFile, pigArgs, otherFiles, statusdir, callback, getCompletedUrl(), enablelog); @@ -699,10 +735,21 @@ public EnqueueBean hive(@FormParam("execute") String execute, if (execute == null && srcFile == null) throw new BadParam("Either execute or file parameter required"); + //add all function arguments to a map + Map userArgs = new HashMap(); + userArgs.put("user.name", getDoAsUser()); + userArgs.put("execute", execute); + userArgs.put("file", srcFile); + userArgs.put("define", defines); + userArgs.put("files", otherFiles); + userArgs.put("statusdir", statusdir); + userArgs.put("callback", callback); + userArgs.put("enablelog", Boolean.toString(enablelog)); + checkEnableLogPrerequisite(enablelog, statusdir); HiveDelegator d = new HiveDelegator(appConf); - return d.run(getDoAsUser(), execute, srcFile, defines, hiveArgs, otherFiles, + return d.run(getDoAsUser(), userArgs, execute, srcFile, defines, hiveArgs, otherFiles, statusdir, callback, getCompletedUrl(), enablelog); } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index a4c9803..5cfbbc1 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.exec.ExecuteException; @@ -35,7 +36,7 @@ public StreamingDelegator(AppConfig appConf) { super(appConf); } - public EnqueueBean run(String user, + public EnqueueBean run(String user, Map userArgs, List inputs, String output, String mapper, String reducer, List files, List defines, @@ -52,7 +53,7 @@ public EnqueueBean run(String user, files, defines, cmdenvs, jarArgs); JarDelegator d = new JarDelegator(appConf); - return d.run(user, + return d.run(user, userArgs, appConf.streamingJar(), null, null, null, args, defines, statusdir, callback, completedUrl, enableLog, jobType); diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java index d710320..01cab97 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobState.java @@ -21,10 +21,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hive.hcatalog.templeton.JsonBuilder; /** * The persistent state of a job. The state is stored in one of the @@ -232,6 +234,20 @@ public void setUser(String user) setField("user", user); } + @SuppressWarnings("unchecked") + public Map getUserArgs() + throws IOException + { + String jsonString = getField("userArgs"); + return (Map)JsonBuilder.jsonToMap(jsonString); + } + public void setUserArgs(Map userArgs) + throws IOException + { + String jsonString = JsonBuilder.mapToJson(userArgs); + setField("userArgs", jsonString); + } + /** * The url callback */