Pig
  1. Pig
  2. PIG-2417

Streaming UDFs - allow users to easily write UDFs in scripting languages with no JVM implementation.

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.12.0
    • Fix Version/s: 0.12.0
    • Component/s: None
    • Labels:
      None
    • Patch Info:
      Patch Available
    • Hadoop Flags:
      Reviewed

      Description

      The goal of Streaming UDFs is to allow users to easily write UDFs in scripting languages with no JVM implementation or a limited JVM implementation. The initial proposal is outlined here: https://cwiki.apache.org/confluence/display/PIG/StreamingUDFs.

      In order to implement this we need new syntax to distinguish a streaming UDF from an embedded JVM UDF. I'd propose something like the following (although I'm not sure 'language' is the best term to be using):

      define my_streaming_udfs language('python') ship('my_streaming_udfs.py')

      We'll also need a language-specific controller script that gets shipped to the cluster which is responsible for reading the input stream, deserializing the input data, passing it to the user written script, serializing that script output, and writing that to the output stream.

      Finally, we'll need to add a StreamingUDF class that extends evalFunc. This class will likely share some of the existing code in POStream and ExecutableManager (where it make sense to pull out shared code) to stream data to/from the controller script.

      One alternative approach to creating the StreamingUDF EvalFunc is to use the POStream operator directly. This would involve inserting the POStream operator instead of the POUserFunc operator whenever we encountered a streaming UDF while building the physical plan. This approach seemed problematic because there would need to be a lot of changes in order to support POStream in all of the places we want to be able use UDFs (For example - to operate on a single field inside of a for each statement).

      1. streaming3.patch
        99 kB
        Jeremy Karn
      2. streaming2.patch
        115 kB
        Jeremy Karn
      3. streaming.patch
        44 kB
        Jeremy Karn
      4. PIG-2417-unicode.patch
        1 kB
        Jeremy Karn
      5. PIG-2417-e2e.patch
        15 kB
        Jeremy Karn
      6. PIG-2417-9-2.patch
        32 kB
        Jeremy Karn
      7. PIG-2417-9-1.patch
        2 kB
        Daniel Dai
      8. PIG-2417-9.patch
        169 kB
        Jeremy Karn
      9. PIG-2417-8.patch
        153 kB
        Jeremy Karn
      10. PIG-2417-7.patch
        172 kB
        Jeremy Karn
      11. PIG-2417-6.patch
        152 kB
        Jeremy Karn
      12. PIG-2417-5.patch
        154 kB
        Jeremy Karn
      13. PIG-2417-4.patch
        109 kB
        Jeremy Karn

        Issue Links

          Activity

          Jeremy Karn created issue -
          Hide
          Alan Gates added a comment -

          This looks interesting. Some thoughts regarding the open questions on your wiki.

          We'd want to either update DEFINE or add a new command to support streaming UDFs.

          We should definitely use DEFINE for this as well. It seems it should stay as close to the JVM based UDF defines as possible. I'm wondering if it could follow the same:

          DEFINE 'filename' using <language> as <namespace>

          that the JVM based UDFs use, and just add new language tokens that indicate the streaming nature, such as 'streaming_python', 'streaming_perl', etc.

          How can we return the output type information back to pig? Perhaps we could support something like the @outputSchema decorator in python at least, and have the controller script gather that information and provide it back to pig in a separate file?

          There's two sides to this, one how you communicate the information through the channel you're creating, and two how the UDF writer communicates it in his UDF. The design will need to propose a way for implementations of streaming UDF for various languages to communicate schema information back to Pig. But how the UDF writer communicates it should be language specific. Wherever possible it should mimic the choices made in the JVM based implementations. So a streaming Python implementation should use the same @outputSchema as Jython does.

          How can we return the output type information back to pig? Perhaps we could support something like the @outputSchema decorator in python at least, and have the controller script gather that information and provide it back to pig in a separate file?

          This should be a Java property for each language implementation. Something like pig.streaming_udf.executable.python.

          Have you done any prototyping? I'm curious how the performance of this will compare against the JVM based implementations. I realize you are doing this to extend functionality, not get performance.

          Show
          Alan Gates added a comment - This looks interesting. Some thoughts regarding the open questions on your wiki. We'd want to either update DEFINE or add a new command to support streaming UDFs. We should definitely use DEFINE for this as well. It seems it should stay as close to the JVM based UDF defines as possible. I'm wondering if it could follow the same: DEFINE 'filename' using <language> as <namespace> that the JVM based UDFs use, and just add new language tokens that indicate the streaming nature, such as 'streaming_python', 'streaming_perl', etc. How can we return the output type information back to pig? Perhaps we could support something like the @outputSchema decorator in python at least, and have the controller script gather that information and provide it back to pig in a separate file? There's two sides to this, one how you communicate the information through the channel you're creating, and two how the UDF writer communicates it in his UDF. The design will need to propose a way for implementations of streaming UDF for various languages to communicate schema information back to Pig. But how the UDF writer communicates it should be language specific. Wherever possible it should mimic the choices made in the JVM based implementations. So a streaming Python implementation should use the same @outputSchema as Jython does. How can we return the output type information back to pig? Perhaps we could support something like the @outputSchema decorator in python at least, and have the controller script gather that information and provide it back to pig in a separate file? This should be a Java property for each language implementation. Something like pig.streaming_udf.executable.python. Have you done any prototyping? I'm curious how the performance of this will compare against the JVM based implementations. I realize you are doing this to extend functionality, not get performance.
          Hide
          Jeremy Karn added a comment -

          Here's a rough first patch that only works with streaming python UDFs that return a String. So it doesn't handle communicating the type information yet and you have to manually copy the python controller script and user scripts into the pig path.

          I went with Alan's suggestion of modifying the REGISTER command (I think he meant register instead of define) to support streaming_python as a language.

          I haven't had a chance to do any meaningful performance tests - but as a very rough guide the following simple example running locally took 130 seconds when I used streaming_python and 44 seconds when I used jython.

          Pig:

          REGISTER 'test_streaming_udfs.py' using streaming_python as test;

          A = LOAD 'excite.log.bz2' AS (user_id:chararray, timestamp:chararray, query:chararray);
          
          B = FOREACH A GENERATE test.shorten(user_id) as new_user_id:chararray, timestamp, query;
          
          STORE B INTO 'short_ids';
          

          Python:

          def shorten(word):
              return word[:6]
          
          Show
          Jeremy Karn added a comment - Here's a rough first patch that only works with streaming python UDFs that return a String. So it doesn't handle communicating the type information yet and you have to manually copy the python controller script and user scripts into the pig path. I went with Alan's suggestion of modifying the REGISTER command (I think he meant register instead of define) to support streaming_python as a language. I haven't had a chance to do any meaningful performance tests - but as a very rough guide the following simple example running locally took 130 seconds when I used streaming_python and 44 seconds when I used jython. Pig: REGISTER 'test_streaming_udfs.py' using streaming_python as test; A = LOAD 'excite.log.bz2' AS (user_id:chararray, timestamp:chararray, query:chararray); B = FOREACH A GENERATE test.shorten(user_id) as new_user_id:chararray, timestamp, query; STORE B INTO 'short_ids'; Python: def shorten(word): return word[:6]
          Jeremy Karn made changes -
          Field Original Value New Value
          Status Open [ 1 ] Patch Available [ 10002 ]
          Jeremy Karn made changes -
          Attachment python_streaming_string.patch [ 12507125 ]
          Jeremy Karn made changes -
          Attachment python_streaming_string.patch [ 12507125 ]
          Jeremy Karn made changes -
          Attachment python_streaming_string.patch [ 12507126 ]
          Hide
          Alan Gates added a comment -

          Jeremy,

          For the patch to apply in an SVN environment, you need to generate it with 'git diff --no-prefix'.

          Show
          Alan Gates added a comment - Jeremy, For the patch to apply in an SVN environment, you need to generate it with 'git diff --no-prefix'.
          Alan Gates made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Jeremy Karn made changes -
          Attachment python_streaming_string.patch [ 12507126 ]
          Jeremy Karn made changes -
          Attachment streaming.patch [ 12507623 ]
          Hide
          Jeremy Karn added a comment -

          Ah, ok. I've generated this last patch with --no-prefix.

          It includes the previous code changes as well as:

          • code to pass the input schema from Pig to Python
          • Python code to deserialize the input from Standard Input into Python types
          • Python unit tests for the Python controller script.

          I'm not really sure the best way to include the python unit tests. Right now I have them in test/python/streaming/test_controller.py but in order to run them you need to copy that file into the same directory as the controller file.

          Show
          Jeremy Karn added a comment - Ah, ok. I've generated this last patch with --no-prefix. It includes the previous code changes as well as: code to pass the input schema from Pig to Python Python code to deserialize the input from Standard Input into Python types Python unit tests for the Python controller script. I'm not really sure the best way to include the python unit tests. Right now I have them in test/python/streaming/test_controller.py but in order to run them you need to copy that file into the same directory as the controller file.
          Ashutosh Chauhan made changes -
          Assignee Jeremy Karn [ jeremykarn ]
          Jeremy Karn made changes -
          Attachment streaming2.patch [ 12508535 ]
          Hide
          Jeremy Karn added a comment -

          This latest patch (streaming2.patch) contains all of the functionality necessary for writing streaming UDFs.

          Registering python files still works as outlined above.

          Declaring the output schema of your python udf uses an outputSchema decorator (so the same syntax used for jython udfs). When the user registers the file and pig scans for functions it also looks for the outputSchema decorator and only registers functions that have it. The schema string from the decorator is passed to the StreamingUDF instance(s) so that it knows what output schema to expect from the streaming process.

          Performance:

          I haven't done exhaustive testing, profiling, or tuning but right now it looks like small data sets using standalone hadoop are about 2-3 times slower using python streaming udfs instead of jython udfs.

          Running similar scripts on a small data set but on a cluster improves a bit and the python streaming udfs are twice as slow.

          When you move up to much larger data sets and run on the cluster I'm seeing python streaming udfs being around 50% slower than equivalent jython udfs.

          The code still has a few bugs and I need to add unit tests for the Pig changes I've made but I'd definitely appreciate any feedback on what's already done.

          Show
          Jeremy Karn added a comment - This latest patch (streaming2.patch) contains all of the functionality necessary for writing streaming UDFs. Registering python files still works as outlined above. Declaring the output schema of your python udf uses an outputSchema decorator (so the same syntax used for jython udfs). When the user registers the file and pig scans for functions it also looks for the outputSchema decorator and only registers functions that have it. The schema string from the decorator is passed to the StreamingUDF instance(s) so that it knows what output schema to expect from the streaming process. Performance: I haven't done exhaustive testing, profiling, or tuning but right now it looks like small data sets using standalone hadoop are about 2-3 times slower using python streaming udfs instead of jython udfs. Running similar scripts on a small data set but on a cluster improves a bit and the python streaming udfs are twice as slow. When you move up to much larger data sets and run on the cluster I'm seeing python streaming udfs being around 50% slower than equivalent jython udfs. The code still has a few bugs and I need to add unit tests for the Pig changes I've made but I'd definitely appreciate any feedback on what's already done.
          Jeremy Karn made changes -
          Attachment streaming3.patch [ 12509992 ]
          Hide
          Jeremy Karn added a comment -

          This patch includes:

          • Better passing of null values to/from the udf
          • 3 character delimiters when serializing input/output to/from udf (to avoid conflicts when the delimiters are in the data).
          • Improved performance
          • Fix to illustrate so that it works with streaming udfs.

          In terms of performance I ran tests with 4 scripts using both streaming python and jython udfs. With 3 of the scripts the streaming python was faster (5%, 7%, and 33% faster) and in 1 case the streaming python was 80% slower.

          The script with the poor performance involved passing really big tuples to the udf (one tuple serialized to a 30 meg string).

          Show
          Jeremy Karn added a comment - This patch includes: Better passing of null values to/from the udf 3 character delimiters when serializing input/output to/from udf (to avoid conflicts when the delimiters are in the data). Improved performance Fix to illustrate so that it works with streaming udfs. In terms of performance I ran tests with 4 scripts using both streaming python and jython udfs. With 3 of the scripts the streaming python was faster (5%, 7%, and 33% faster) and in 1 case the streaming python was 80% slower. The script with the poor performance involved passing really big tuples to the udf (one tuple serialized to a 30 meg string).
          Jeremy Karn made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Hide
          Russell Jurney added a comment -

          I'm going to try this out.

          Show
          Russell Jurney added a comment - I'm going to try this out.
          Hide
          Alan Gates added a comment -

          Patch no longer applies cleanly to trunk.

          Show
          Alan Gates added a comment - Patch no longer applies cleanly to trunk.
          Alan Gates made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Jeremy Karn made changes -
          Attachment PIG-2417-4.patch [ 12571279 ]
          Hide
          Jeremy Karn added a comment -

          I've attached a new patch that applies onto trunk. It needs updating in a number of places before its ready to be merged but it looks like I'm not going to have a chance to do that in the near future. If someone else wants to take a stab at cleaning it up and getting it ready for trunk that would be great and feel free to email me with any questions.

          Show
          Jeremy Karn added a comment - I've attached a new patch that applies onto trunk. It needs updating in a number of places before its ready to be merged but it looks like I'm not going to have a chance to do that in the near future. If someone else wants to take a stab at cleaning it up and getting it ready for trunk that would be great and feel free to email me with any questions.
          Hide
          Russell Jurney added a comment -

          I'll do clean-up, plz advise.

          Show
          Russell Jurney added a comment - I'll do clean-up, plz advise.
          Hide
          Jeremy Karn added a comment -

          Jonathan Coveney had expressed interest at looking at the streaming python work we've done so this patch is just to get our code available for people to look at. I updated the code to work on trunk but I had to do it in a quick and dirty way. Here's a list of some specific things I know need some work (and there are probably a few more I haven't thought of but that would come out in review):

          1. In the serialization/deserialization code I don't support any of the new data types added to pig since 0.9.

          2. When I first wrote this code I pulled some common logic out of ExecutableManager into a class called StreamingUtil. ExecutableManager has changed enough since 0.9 that it wasn't straightforward to figure out how it should work now so there's some duplicated logic in StreamingUtil and ExecutableManager.

          3. There's some Mortar specific wording in a couple of places and a couple of places in StreamingUDF where I'm handling the cases that come up with how we run Pig/Hadoop but that might need to be more generic/robust to work for everyone out of the box.

          4. There's some exception handling decisions and some code for capturing standard output from the UDF for illustrate that might not make much sense without the rest of our illustrate changes.

          5. It might make sense to use a more efficient serialization/deserialization method. I tried to use the existing code (just adding code to handle cases that didn't work before) but its probably not the most efficient approach. I'm not sure if this is something that would need to be tackled now or if it could be a future enhancement.

          Show
          Jeremy Karn added a comment - Jonathan Coveney had expressed interest at looking at the streaming python work we've done so this patch is just to get our code available for people to look at. I updated the code to work on trunk but I had to do it in a quick and dirty way. Here's a list of some specific things I know need some work (and there are probably a few more I haven't thought of but that would come out in review): 1. In the serialization/deserialization code I don't support any of the new data types added to pig since 0.9. 2. When I first wrote this code I pulled some common logic out of ExecutableManager into a class called StreamingUtil. ExecutableManager has changed enough since 0.9 that it wasn't straightforward to figure out how it should work now so there's some duplicated logic in StreamingUtil and ExecutableManager. 3. There's some Mortar specific wording in a couple of places and a couple of places in StreamingUDF where I'm handling the cases that come up with how we run Pig/Hadoop but that might need to be more generic/robust to work for everyone out of the box. 4. There's some exception handling decisions and some code for capturing standard output from the UDF for illustrate that might not make much sense without the rest of our illustrate changes. 5. It might make sense to use a more efficient serialization/deserialization method. I tried to use the existing code (just adding code to handle cases that didn't work before) but its probably not the most efficient approach. I'm not sure if this is something that would need to be tackled now or if it could be a future enhancement.
          Hide
          Jonathan Coveney added a comment -

          Russell,

          If you want to take first stab at cleanup that would be excellent (especially adding tests and some documentation so people know how to use it). You can take note of any deeper technical Pig stuff that needs to be worked on, and then I can take over that portion.

          Thoughts?

          Show
          Jonathan Coveney added a comment - Russell, If you want to take first stab at cleanup that would be excellent (especially adding tests and some documentation so people know how to use it). You can take note of any deeper technical Pig stuff that needs to be worked on, and then I can take over that portion. Thoughts?
          Hide
          Jeremy Karn added a comment -

          Here's an updated patch that I think should be ready for review (review board coming soon).

          Aside from the streaming python udfs this patch also contains some logic for capturing output from the python process that doesn't do much. However, I'm hoping to get a patch up soon with Mortar's illustrate changes and that will take advantage of the captured output.

          One thing thats still outstanding is documentation changes. Should I just add a section similar to http://pig.apache.org/docs/r0.11.1/udf.html#python-udfs for streaming python?

          Show
          Jeremy Karn added a comment - Here's an updated patch that I think should be ready for review (review board coming soon). Aside from the streaming python udfs this patch also contains some logic for capturing output from the python process that doesn't do much. However, I'm hoping to get a patch up soon with Mortar's illustrate changes and that will take advantage of the captured output. One thing thats still outstanding is documentation changes. Should I just add a section similar to http://pig.apache.org/docs/r0.11.1/udf.html#python-udfs for streaming python?
          Jeremy Karn made changes -
          Attachment PIG-2417-5.patch [ 12599706 ]
          Hide
          Jeremy Karn added a comment -

          Here's the review board: https://reviews.apache.org/r/13781/

          Show
          Jeremy Karn added a comment - Here's the review board: https://reviews.apache.org/r/13781/
          Jeremy Karn made changes -
          Status Open [ 1 ] In Progress [ 3 ]
          Jeremy Karn made changes -
          Status In Progress [ 3 ] Open [ 1 ]
          Jeremy Karn made changes -
          Patch Info Patch Available [ 10042 ]
          Jeremy Karn made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Affects Version/s 0.12 [ 12323380 ]
          Affects Version/s 0.11 [ 12318878 ]
          Jeremy Karn made changes -
          Assignee Jeremy Karn [ jeremykarn ]
          Jeremy Karn made changes -
          Fix Version/s 0.12 [ 12323380 ]
          Jeremy Karn made changes -
          Attachment PIG-2417-6.patch [ 12601437 ]
          Hide
          Jeremy Karn added a comment -

          Latest patch just has a couple small changes so that it applies cleanly after PIG-3419.

          Show
          Jeremy Karn added a comment - Latest patch just has a couple small changes so that it applies cleanly after PIG-3419 .
          Jeremy Karn made changes -
          Attachment PIG-2417-7.patch [ 12601668 ]
          Jeremy Karn made changes -
          Attachment PIG-2417-8.patch [ 12601981 ]
          Hide
          Jeremy Karn added a comment -

          Latest patch has changes from code review. I still need to add e2e tests.

          Show
          Jeremy Karn added a comment - Latest patch has changes from code review. I still need to add e2e tests.
          Jeremy Karn made changes -
          Attachment PIG-2417-e2e.patch [ 12602202 ]
          Hide
          Jeremy Karn added a comment -

          PIG-2417-e2e.patch contains e2e tests for streaming python (mostly just copied from the jython tests).

          Show
          Jeremy Karn added a comment - PIG-2417 -e2e.patch contains e2e tests for streaming python (mostly just copied from the jython tests).
          Rohini Palaniswamy made changes -
          Link This issue requires PIG-3255 [ PIG-3255 ]
          Hide
          Daniel Dai added a comment -

          Jeremy Karn
          With PIG-3255 check in, do you want to add this optimization? Also can you respond to my comments in review board.

          Show
          Daniel Dai added a comment - Jeremy Karn With PIG-3255 check in, do you want to add this optimization? Also can you respond to my comments in review board.
          Hide
          Jeremy Karn added a comment -

          I'll update the patch (probably tomorrow) to take advantage of PIG-3255.

          I think the only outstanding comment in the review board is how the logging works with Hadoop2. I'm hoping to get a chance to test that in the next couple of days.

          Show
          Jeremy Karn added a comment - I'll update the patch (probably tomorrow) to take advantage of PIG-3255 . I think the only outstanding comment in the review board is how the logging works with Hadoop2. I'm hoping to get a chance to test that in the next couple of days.
          Hide
          Daniel Dai added a comment -

          Thanks. Wish to check this in before branch.

          Show
          Daniel Dai added a comment - Thanks. Wish to check this in before branch.
          Jeremy Karn made changes -
          Attachment PIG-2417-9.patch [ 12603921 ]
          Hide
          Jeremy Karn added a comment -

          PIG-2417-9.patch contains everything including the e2e tests. I've incorporated the changes from PIG-3255. I wasn't able to test against a Hadoop2 cluster but it looks like hadoop.log.dir is still a valid property there and I've made the logic around determining the log directory a little more robust.

          Show
          Jeremy Karn added a comment - PIG-2417 -9.patch contains everything including the e2e tests. I've incorporated the changes from PIG-3255 . I wasn't able to test against a Hadoop2 cluster but it looks like hadoop.log.dir is still a valid property there and I've made the logic around determining the log directory a little more robust.
          Hide
          Daniel Dai added a comment -

          Running e2e tests on Hadoop 2, hit the following stack on map side:

          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:338)
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378)
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298)
          	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:282)
          	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:277)
          	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
          	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
          	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
          	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
          	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
          	at java.security.AccessController.doPrivileged(Native Method)
          	at javax.security.auth.Subject.doAs(Subject.java:396)
          	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1477)
          	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
          Caused by: java.lang.NullPointerException
          	at org.apache.pig.impl.builtin.StreamingUDF.getControllerPath(StreamingUDF.java:252)
          	at org.apache.pig.impl.builtin.StreamingUDF.constructCommand(StreamingUDF.java:183)
          	at org.apache.pig.impl.builtin.StreamingUDF.startUdfController(StreamingUDF.java:147)
          	at org.apache.pig.impl.builtin.StreamingUDF.initialize(StreamingUDF.java:140)
          	at org.apache.pig.impl.builtin.StreamingUDF.exec(StreamingUDF.java:130)
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330)
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextInteger(POUserFunc.java:379)
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:321)
          

          I will try to take a look tonight.

          Show
          Daniel Dai added a comment - Running e2e tests on Hadoop 2, hit the following stack on map side: at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:338) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:282) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:277) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1477) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) Caused by: java.lang.NullPointerException at org.apache.pig.impl.builtin.StreamingUDF.getControllerPath(StreamingUDF.java:252) at org.apache.pig.impl.builtin.StreamingUDF.constructCommand(StreamingUDF.java:183) at org.apache.pig.impl.builtin.StreamingUDF.startUdfController(StreamingUDF.java:147) at org.apache.pig.impl.builtin.StreamingUDF.initialize(StreamingUDF.java:140) at org.apache.pig.impl.builtin.StreamingUDF.exec(StreamingUDF.java:130) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextInteger(POUserFunc.java:379) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:321) I will try to take a look tonight.
          Hide
          Jeremy Karn added a comment -

          My guess is that is caused by the code in StreamingUDF at line 157:

                  Configuration conf = UDFContext.getUDFContext().getJobConf();
                  String jarPath = conf.get("mapred.jar");
          

          I'm not sure how that should work with Hadoop 2.

          Show
          Jeremy Karn added a comment - My guess is that is caused by the code in StreamingUDF at line 157: Configuration conf = UDFContext.getUDFContext().getJobConf(); String jarPath = conf.get( "mapred.jar" ); I'm not sure how that should work with Hadoop 2.
          Hide
          Daniel Dai added a comment -

          Yes, "mapred.jar" does not physically exist in Hadoop 2:
          jarPath: /tmp/hadoop-yarn/staging/daijy/.staging/job_1379571725164_0007/job.jar
          '/tmp/hadoop-yarn/staging/daijy/.staging/job_1379571725164_0007/job.jar' does not exist

          But I think job.jar should be already in classpath. I will try it tomorrow.

          Show
          Daniel Dai added a comment - Yes, "mapred.jar" does not physically exist in Hadoop 2: jarPath: /tmp/hadoop-yarn/staging/daijy/.staging/job_1379571725164_0007/job.jar '/tmp/hadoop-yarn/staging/daijy/.staging/job_1379571725164_0007/job.jar' does not exist But I think job.jar should be already in classpath. I will try it tomorrow.
          Hide
          Jeremy Karn added a comment -

          The jar path is being used to figure out where the appropriate python files are on the cluster nodes. We need to know where controller.py is to start the python controller process and then we need to pass in the directory of controller.py and pig_util.py (the PATH_TO_FILE_CACHE option) so that the controller can add that directory to the python path.

          Show
          Jeremy Karn added a comment - The jar path is being used to figure out where the appropriate python files are on the cluster nodes. We need to know where controller.py is to start the python controller process and then we need to pass in the directory of controller.py and pig_util.py (the PATH_TO_FILE_CACHE option) so that the controller can add that directory to the python path.
          Hide
          Daniel Dai added a comment -

          PIG-2417-9-1.patch include an ugly fix to make Hadoop 2 working. Still need some refinement.

          Show
          Daniel Dai added a comment - PIG-2417 -9-1.patch include an ugly fix to make Hadoop 2 working. Still need some refinement.
          Daniel Dai made changes -
          Attachment PIG-2417-9-1.patch [ 12604153 ]
          Daniel Dai made changes -
          Attachment PIG-2417-9-1.patch [ 12604153 ]
          Daniel Dai made changes -
          Attachment PIG-2417-9-1.patch [ 12604156 ]
          Jeremy Karn made changes -
          Attachment PIG-2417-9-2.patch [ 12604176 ]
          Hide
          Jeremy Karn added a comment -

          Thanks Daniel.

          I also just attached PIG-2417-9-2.patch which updates the tests to use the serialize/deserialize methods created in PIG-3255 instead of the deprecated versions.

          Show
          Jeremy Karn added a comment - Thanks Daniel. I also just attached PIG-2417 -9-2.patch which updates the tests to use the serialize/deserialize methods created in PIG-3255 instead of the deprecated versions.
          Hide
          Daniel Dai added a comment -

          There is one more issue in the Hadoop 2: job.jar does not get unjared before launching map/reduce, so controller.py cannot find the udf script. Seems we need one more step to unjar script files before invoking controller.py.

          I'd like to commit this patch before we branch 0.12. There still several holes to get stream udf work under Hadoop2, I would suggest commit the patch first, mark e2e tests as not valid in hadoop 2, then fix them after branch. Thoughts?

          Show
          Daniel Dai added a comment - There is one more issue in the Hadoop 2: job.jar does not get unjared before launching map/reduce, so controller.py cannot find the udf script. Seems we need one more step to unjar script files before invoking controller.py. I'd like to commit this patch before we branch 0.12. There still several holes to get stream udf work under Hadoop2, I would suggest commit the patch first, mark e2e tests as not valid in hadoop 2, then fix them after branch. Thoughts?
          Hide
          Jeremy Karn added a comment -

          I agree.

          If we get this committed and open a new jira for the hadoop2 problems, that'll give me a bit of time to set up a hadoop2 cluster and work out any kinks.

          Show
          Jeremy Karn added a comment - I agree. If we get this committed and open a new jira for the hadoop2 problems, that'll give me a bit of time to set up a hadoop2 cluster and work out any kinks.
          Hide
          Daniel Dai added a comment -

          Committed PIG-2417-9.patch + PIG-2417-9-2.patch. Also disabled SteamingUDF e2e tests and unit tests for Hadoop 2. Opened PIG-3478 to fix Streaming UDF for Hadoop 2.

          Show
          Daniel Dai added a comment - Committed PIG-2417 -9.patch + PIG-2417 -9-2.patch. Also disabled SteamingUDF e2e tests and unit tests for Hadoop 2. Opened PIG-3478 to fix Streaming UDF for Hadoop 2.
          Daniel Dai made changes -
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Hadoop Flags Reviewed [ 10343 ]
          Assignee Jeremy Karn [ jeremykarn ]
          Resolution Fixed [ 1 ]
          Hide
          Jeremy Karn added a comment -

          Thanks for all of your help Daniel!

          Show
          Jeremy Karn added a comment - Thanks for all of your help Daniel!
          Hide
          Rohini Palaniswamy added a comment -

          I see compilation fails with 4 similar errors in TestStreamingUDF.java when running it on Linux. Works fine on Mac.

          test/org/apache/pig/impl/builtin/TestStreamingUDF.java:287: error: unmappable character for encoding UTF8

          From javac documentation:
          -encoding encoding
          Set the source file encoding name, such as EUC-JP and UTF-8. If -encoding is not specified, the platform default converter is used.

          Not sure what the platform defaults exactly are in MAC, as inside a java program file.encoding and Charset.defaultCharset() are UTF8. Either we should specify -encoding in the ant javac invocation or fix the test to use \uxxxx.

          Daniel Dai,
          Does it compile fine on Windows?

          Show
          Rohini Palaniswamy added a comment - I see compilation fails with 4 similar errors in TestStreamingUDF.java when running it on Linux. Works fine on Mac. test/org/apache/pig/impl/builtin/TestStreamingUDF.java:287: error: unmappable character for encoding UTF8 From javac documentation: -encoding encoding Set the source file encoding name, such as EUC-JP and UTF-8. If -encoding is not specified, the platform default converter is used. Not sure what the platform defaults exactly are in MAC, as inside a java program file.encoding and Charset.defaultCharset() are UTF8. Either we should specify -encoding in the ant javac invocation or fix the test to use \uxxxx. Daniel Dai , Does it compile fine on Windows?
          Hide
          Daniel Dai added a comment -

          I compiled on both my RHEL6 and Windows, seems fine for me. We can change the javadoc anyway to fix the issue.

          Show
          Daniel Dai added a comment - I compiled on both my RHEL6 and Windows, seems fine for me. We can change the javadoc anyway to fix the issue.
          Jeremy Karn made changes -
          Attachment PIG-2417-unicode.patch [ 12605089 ]
          Hide
          Jeremy Karn added a comment -

          I can't reproduce the problem but I think PIG-2417-unicode.patch should fix the encoding issue.

          Show
          Jeremy Karn added a comment - I can't reproduce the problem but I think PIG-2417 -unicode.patch should fix the encoding issue.
          Hide
          Rohini Palaniswamy added a comment -

          +1 for PIG-2417-unicode.patch. That worked. Committed to 0.12 and trunk. Thanks Jeremy.

          Show
          Rohini Palaniswamy added a comment - +1 for PIG-2417 -unicode.patch. That worked. Committed to 0.12 and trunk. Thanks Jeremy.
          Jeremy Karn made changes -
          Attachment PIG-3478.patch [ 12607596 ]
          Jeremy Karn made changes -
          Attachment PIG-3478.patch [ 12607596 ]
          Daniel Dai made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Hide
          Russell Jurney added a comment -

          What work remains to get this working on YARN? I need it to work

          Show
          Russell Jurney added a comment - What work remains to get this working on YARN? I need it to work

            People

            • Assignee:
              Jeremy Karn
              Reporter:
              Jeremy Karn
            • Votes:
              5 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development