diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index a28e04d..80d9d2c 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -635,6 +635,7 @@ public Response addOneColumn(@PathParam("db") String db, @Path("mapreduce/streaming") @Produces({MediaType.APPLICATION_JSON}) public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, + @FormParam("inputreader") String inputreader, @FormParam("output") String output, @FormParam("mapper") String mapper, @FormParam("reducer") String reducer, @@ -657,6 +658,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, Map userArgs = new HashMap(); userArgs.put("user.name", getDoAsUser()); userArgs.put("input", inputs); + userArgs.put("inputreader", inputreader); userArgs.put("output", output); userArgs.put("mapper", mapper); userArgs.put("reducer", reducer); @@ -672,7 +674,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, checkEnableLogPrerequisite(enablelog, statusdir); StreamingDelegator d = new StreamingDelegator(appConf); - return d.run(getDoAsUser(), userArgs, inputs, output, mapper, reducer, combiner, + return d.run(getDoAsUser(), userArgs, inputs, inputreader, output, mapper, reducer, combiner, fileList, files, defines, cmdenvs, args, statusdir, callback, getCompletedUrl(), enablelog, JobType.STREAMING); } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index 147a041..622440b 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -38,7 +38,7 @@ public StreamingDelegator(AppConfig appConf) { } public EnqueueBean run(String user, Map userArgs, - List inputs, String output, + List inputs, String inputreader, String output, String mapper, String reducer, String combiner, List fileList, String files, List defines, @@ -51,7 +51,7 @@ public EnqueueBean run(String user, Map userArgs, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { - List args = makeArgs(inputs, output, mapper, reducer, combiner, + List args = makeArgs(inputs, inputreader, output, mapper, reducer, combiner, fileList, cmdenvs, jarArgs); JarDelegator d = new JarDelegator(appConf); @@ -62,6 +62,7 @@ public EnqueueBean run(String user, Map userArgs, } private List makeArgs(List inputs, + String inputreader, String output, String mapper, String reducer, @@ -82,6 +83,11 @@ public EnqueueBean run(String user, Map userArgs, args.add(mapper); args.add("-reducer"); args.add(reducer); + + if (inputreader != null && !inputreader.isEmpty()) { + args.add("-inputreader"); + args.add(inputreader); + } if (TempletonUtils.isset(combiner)) { args.add("-combiner");