diff --git a/hcatalog/src/test/e2e/templeton/inpdir/xml/file1.xml b/hcatalog/src/test/e2e/templeton/inpdir/xml/file1.xml
new file mode 100644
index 0000000..bbf98dd
--- /dev/null
+++ b/hcatalog/src/test/e2e/templeton/inpdir/xml/file1.xml
@@ -0,0 +1,10 @@
+
+Chocolate Mousse
+
+1.chocolate
+2.eggs
+3.sugar
+4.salt
+5.heavycream
+
+
diff --git a/hcatalog/src/test/e2e/templeton/inpdir/xml/file2.xml b/hcatalog/src/test/e2e/templeton/inpdir/xml/file2.xml
new file mode 100644
index 0000000..e7aea8d
--- /dev/null
+++ b/hcatalog/src/test/e2e/templeton/inpdir/xml/file2.xml
@@ -0,0 +1,12 @@
+
+Chocolate Cake
+
+1.chocolate
+2.eggs
+3.sugar
+4.Baking powder
+5.heavycream
+6.vanilla
+7.butter
+
+
diff --git a/hcatalog/src/test/e2e/templeton/inpdir/xmlmapper.py b/hcatalog/src/test/e2e/templeton/inpdir/xmlmapper.py
new file mode 100644
index 0000000..935fb9b
--- /dev/null
+++ b/hcatalog/src/test/e2e/templeton/inpdir/xmlmapper.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+# dftmapper.py
+
+import sys
+
+list = []
+title = "Unknown"
+inText = False
+for line in sys.stdin:
+ line = line.strip()
+ if line.find( "
" )!= -1:
+ title = line[ len( "" ) : -len( "" ) ]
+ if line.find( "" ) != -1:
+ inText = True
+ continue
+ if line.find( "" ) != -1:
+ inText = False
+ continue
+ if inText:
+ list.append( line )
+
+text = ' '.join( list )
+text = text[0:10] + "..." + text[-10:]
+print '[[%s]]\t[[%s]]' % (title, text)
diff --git a/hcatalog/src/test/e2e/templeton/inpdir/xmlreducer.py b/hcatalog/src/test/e2e/templeton/inpdir/xmlreducer.py
new file mode 100644
index 0000000..484f02c
--- /dev/null
+++ b/hcatalog/src/test/e2e/templeton/inpdir/xmlreducer.py
@@ -0,0 +1,8 @@
+#!/usr/bin/env python
+import sys
+
+for line in sys.stdin:
+
+ line = line.strip()
+ title, page = line.split('\t', 1)
+ print '%s\t%s' % ( title, page )
diff --git a/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf b/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf
index 791c00a..41392e6 100644
--- a/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf
+++ b/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf
@@ -110,6 +110,23 @@ $cfg =
'check_job_exit_value' => 1,
'check_call_back' => 1,
},
+ {
+ #test streaming xml files
+ 'num' => 5,
+ 'method' => 'POST',
+ 'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming',
+ 'post_options' => ['user.name=:UNAME:',
+ 'input=:INPDIR_HDFS:/xml','inputreader=StreamXmlRecordReader,begin=xml,end=/xml',
+ 'output=:OUTDIR:/xml-out', 'file=:INPDIR_HDFS:/xmlmapper.py',
+ 'mapper=./:INPDIR_HDFS:/xmlmapper.py', 'file=:INPDIR_HDFS:/xmlreducer.py',
+ 'reducer=./:INPDIR_HDFS:/xmlreducer.py'],
+ #results
+ 'status_code' => 200,
+ 'check_job_created' => 1,
+ 'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
+ 'check_call_back' => 1,
+ },
]
},
##=============================================================================================================
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
index a28e04d..80d9d2c 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
@@ -635,6 +635,7 @@ public Response addOneColumn(@PathParam("db") String db,
@Path("mapreduce/streaming")
@Produces({MediaType.APPLICATION_JSON})
public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs,
+ @FormParam("inputreader") String inputreader,
@FormParam("output") String output,
@FormParam("mapper") String mapper,
@FormParam("reducer") String reducer,
@@ -657,6 +658,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs,
Map userArgs = new HashMap();
userArgs.put("user.name", getDoAsUser());
userArgs.put("input", inputs);
+ userArgs.put("inputreader", inputreader);
userArgs.put("output", output);
userArgs.put("mapper", mapper);
userArgs.put("reducer", reducer);
@@ -672,7 +674,7 @@ public EnqueueBean mapReduceStreaming(@FormParam("input") List inputs,
checkEnableLogPrerequisite(enablelog, statusdir);
StreamingDelegator d = new StreamingDelegator(appConf);
- return d.run(getDoAsUser(), userArgs, inputs, output, mapper, reducer, combiner,
+ return d.run(getDoAsUser(), userArgs, inputs, inputreader, output, mapper, reducer, combiner,
fileList, files, defines, cmdenvs, args,
statusdir, callback, getCompletedUrl(), enablelog, JobType.STREAMING);
}
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
index 147a041..622440b 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
@@ -38,7 +38,7 @@ public StreamingDelegator(AppConfig appConf) {
}
public EnqueueBean run(String user, Map userArgs,
- List inputs, String output,
+ List inputs, String inputreader, String output,
String mapper, String reducer, String combiner,
List fileList,
String files, List defines,
@@ -51,7 +51,7 @@ public EnqueueBean run(String user, Map userArgs,
JobType jobType)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
ExecuteException, IOException, InterruptedException {
- List args = makeArgs(inputs, output, mapper, reducer, combiner,
+ List args = makeArgs(inputs, inputreader, output, mapper, reducer, combiner,
fileList, cmdenvs, jarArgs);
JarDelegator d = new JarDelegator(appConf);
@@ -62,6 +62,7 @@ public EnqueueBean run(String user, Map userArgs,
}
private List makeArgs(List inputs,
+ String inputreader,
String output,
String mapper,
String reducer,
@@ -82,6 +83,11 @@ public EnqueueBean run(String user, Map userArgs,
args.add(mapper);
args.add("-reducer");
args.add(reducer);
+
+ if (inputreader != null && !inputreader.isEmpty()) {
+ args.add("-inputreader");
+ args.add(inputreader);
+ }
if (TempletonUtils.isset(combiner)) {
args.add("-combiner");