diff --git a/hcatalog/src/test/e2e/templeton/inpdir/xml/file1.xml b/hcatalog/src/test/e2e/templeton/inpdir/xml/file1.xml new file mode 100644 index 0000000..bbf98dd --- /dev/null +++ b/hcatalog/src/test/e2e/templeton/inpdir/xml/file1.xml @@ -0,0 +1,10 @@ + +Chocolate Mousse + +1.chocolate +2.eggs +3.sugar +4.salt +5.heavycream + + diff --git a/hcatalog/src/test/e2e/templeton/inpdir/xml/file2.xml b/hcatalog/src/test/e2e/templeton/inpdir/xml/file2.xml new file mode 100644 index 0000000..e7aea8d --- /dev/null +++ b/hcatalog/src/test/e2e/templeton/inpdir/xml/file2.xml @@ -0,0 +1,12 @@ + +Chocolate Cake + +1.chocolate +2.eggs +3.sugar +4.Baking powder +5.heavycream +6.vanilla +7.butter + + diff --git a/hcatalog/src/test/e2e/templeton/inpdir/xmlmapper.py b/hcatalog/src/test/e2e/templeton/inpdir/xmlmapper.py new file mode 100644 index 0000000..935fb9b --- /dev/null +++ b/hcatalog/src/test/e2e/templeton/inpdir/xmlmapper.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# dftmapper.py + +import sys + +list = [] +title = "Unknown" +inText = False +for line in sys.stdin: + line = line.strip() + if line.find( "" )!= -1: + title = line[ len( "<title>" ) : -len( "" ) ] + if line.find( "" ) != -1: + inText = True + continue + if line.find( "" ) != -1: + inText = False + continue + if inText: + list.append( line ) + +text = ' '.join( list ) +text = text[0:10] + "..." + text[-10:] +print '[[%s]]\t[[%s]]' % (title, text) diff --git a/hcatalog/src/test/e2e/templeton/inpdir/xmlreducer.py b/hcatalog/src/test/e2e/templeton/inpdir/xmlreducer.py new file mode 100644 index 0000000..484f02c --- /dev/null +++ b/hcatalog/src/test/e2e/templeton/inpdir/xmlreducer.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python +import sys + +for line in sys.stdin: + + line = line.strip() + title, page = line.split('\t', 1) + print '%s\t%s' % ( title, page ) diff --git a/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf b/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf index 791c00a..41392e6 100644 --- a/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf +++ b/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf @@ -110,6 +110,23 @@ $cfg = 'check_job_exit_value' => 1, 'check_call_back' => 1, }, + { + #test streaming xml files + 'num' => 5, + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming', + 'post_options' => ['user.name=:UNAME:', + 'input=:INPDIR_HDFS:/xml','inputreader=StreamXmlRecordReader,begin=xml,end=/xml', + 'output=:OUTDIR:/xml-out', 'file=:INPDIR_HDFS:/xmlmapper.py', + 'mapper=./:INPDIR_HDFS:/xmlmapper.py', 'file=:INPDIR_HDFS:/xmlreducer.py', + 'reducer=./:INPDIR_HDFS:/xmlreducer.py'], + #results + 'status_code' => 200, + 'check_job_created' => 1, + 'check_job_complete' => 'SUCCESS', + 'check_job_exit_value' => 0, + 'check_call_back' => 1, + }, ] }, ##============================================================================================================= diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java index a28e04d..80d9d2c 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java @@ -635,6 +635,7 @@ public Response addOneColumn(@PathParam("db") String db, @Path("mapreduce/streaming") @Produces({MediaType.APPLICATION_JSON}) public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, + @FormParam("inputreader") String inputreader, @FormParam("output") String output, @FormParam("mapper") String mapper, @FormParam("reducer") String reducer, @@ -657,6 +658,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, Map userArgs = new HashMap(); userArgs.put("user.name", getDoAsUser()); userArgs.put("input", inputs); + userArgs.put("inputreader", inputreader); userArgs.put("output", output); userArgs.put("mapper", mapper); userArgs.put("reducer", reducer); @@ -672,7 +674,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs, checkEnableLogPrerequisite(enablelog, statusdir); StreamingDelegator d = new StreamingDelegator(appConf); - return d.run(getDoAsUser(), userArgs, inputs, output, mapper, reducer, combiner, + return d.run(getDoAsUser(), userArgs, inputs, inputreader, output, mapper, reducer, combiner, fileList, files, defines, cmdenvs, args, statusdir, callback, getCompletedUrl(), enablelog, JobType.STREAMING); } diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java index 147a041..622440b 100644 --- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java +++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java @@ -38,7 +38,7 @@ public StreamingDelegator(AppConfig appConf) { } public EnqueueBean run(String user, Map userArgs, - List inputs, String output, + List inputs, String inputreader, String output, String mapper, String reducer, String combiner, List fileList, String files, List defines, @@ -51,7 +51,7 @@ public EnqueueBean run(String user, Map userArgs, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { - List args = makeArgs(inputs, output, mapper, reducer, combiner, + List args = makeArgs(inputs, inputreader, output, mapper, reducer, combiner, fileList, cmdenvs, jarArgs); JarDelegator d = new JarDelegator(appConf); @@ -62,6 +62,7 @@ public EnqueueBean run(String user, Map userArgs, } private List makeArgs(List inputs, + String inputreader, String output, String mapper, String reducer, @@ -82,6 +83,11 @@ public EnqueueBean run(String user, Map userArgs, args.add(mapper); args.add("-reducer"); args.add(reducer); + + if (inputreader != null && !inputreader.isEmpty()) { + args.add("-inputreader"); + args.add(inputreader); + } if (TempletonUtils.isset(combiner)) { args.add("-combiner");