Pig
  1. Pig
  2. PIG-94

Pig Streaming functional spec proposal

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.1.0
    • Component/s: None
    • Labels:
      None
    • Patch Info:
      Patch Available

      Description

      This issue is for discussion about Pig streaming functional spec.

      http://wiki.apache.org/pig/PigStreamingFunctionalSpec

      1. PigStreamingTestPlan.htm
        216 kB
        Xu Zhang
      2. PIG-94_3_2_20080401.patch
        63 kB
        Arun C Murthy
      3. PIG-94_3_1_20080331.patch
        47 kB
        Arun C Murthy
      4. PIG-94_3_0_20080328.patch
        39 kB
        Arun C Murthy
      5. PIG-94_2_2_20080324.patch
        127 kB
        Arun C Murthy
      6. PIG-94_2_1_20080323.patch
        127 kB
        Arun C Murthy
      7. PIG-94_2_0_20080317.patch
        102 kB
        Arun C Murthy
      8. PIG-94_1_20080303.patch
        23 kB
        Arun C Murthy
      9. PIG-94_1_1_20080304.patch
        27 kB
        Arun C Murthy
      10. patch.txt
        1 kB
        Arun C Murthy
      11. Binary_Storage.patch
        4 kB
        Arun C Murthy
      12. Binary_Storage.patch
        7 kB
        Arun C Murthy
      13. Binary_Storage.patch
        19 kB
        Arun C Murthy

        Activity

        Hide
        Olga Natkovich added a comment -

        We are done with all the milestones. Bugs are tracked separately.

        Show
        Olga Natkovich added a comment - We are done with all the milestones. Bugs are tracked separately.
        Hide
        Olga Natkovich added a comment -

        comitted PIG-94_3_2_20080401.patch. Thanks, Arun.

        Show
        Olga Natkovich added a comment - comitted PIG-94 _3_2_20080401.patch. Thanks, Arun.
        Hide
        Olga Natkovich added a comment -

        +1 on the code. Running tests now. If all is good, will commit patch this afternoon

        Show
        Olga Natkovich added a comment - +1 on the code. Running tests now. If all is good, will commit patch this afternoon
        Hide
        Arun C Murthy added a comment -

        Updated patch with following:

        1. BinaryStorage and test-cases
        2. Test cases for Streaming in Local execution mode
        3. Optimization for the simple LOAD/STREAM and STREAM/LOAD cases where the serialization/deserialization isn't necessary including a new 'split by' qualifier to LOAD to aid the optimization only for cases where maps are consuming complete files.
        4. Misc enhancments including fix for PIG-173
        5. Better error handling including fix for PIG-174

        Show
        Arun C Murthy added a comment - Updated patch with following: 1. BinaryStorage and test-cases 2. Test cases for Streaming in Local execution mode 3. Optimization for the simple LOAD/STREAM and STREAM/LOAD cases where the serialization/deserialization isn't necessary including a new 'split by' qualifier to LOAD to aid the optimization only for cases where maps are consuming complete files. 4. Misc enhancments including fix for PIG-173 5. Better error handling including fix for PIG-174
        Hide
        Arun C Murthy added a comment -

        Thanks for the review Olga, I'll fix that and add a test case.

        Show
        Arun C Murthy added a comment - Thanks for the review Olga, I'll fix that and add a test case.
        Hide
        Olga Natkovich added a comment -

        Arun,

        I don't think BinaryStorage handles case where the read block is smaller because we hit the map data boundary. The rest looks good.

        Show
        Olga Natkovich added a comment - Arun, I don't think BinaryStorage handles case where the read block is smaller because we hit the map data boundary. The rest looks good.
        Hide
        Arun C Murthy added a comment -

        Forgot to add that the latest patch passes all unit tests locally, however I need to run end-to-end tests on large clusters.

        Show
        Arun C Murthy added a comment - Forgot to add that the latest patch passes all unit tests locally, however I need to run end-to-end tests on large clusters.
        Hide
        Arun C Murthy added a comment -

        Updated patch with following:

        1. BinaryStorage and test-cases
        2. Test cases for Streaming in Local execution mode
        3. Optimization for the simple LOAD/STREAM and STREAM/LOAD cases where the serialization/deserialization isn't necessary.
        4. Misc enhancments including fix for PIG-173
        5. Better error handling including fix for PIG-174

        Show
        Arun C Murthy added a comment - Updated patch with following: 1. BinaryStorage and test-cases 2. Test cases for Streaming in Local execution mode 3. Optimization for the simple LOAD/STREAM and STREAM/LOAD cases where the serialization/deserialization isn't necessary. 4. Misc enhancments including fix for PIG-173 5. Better error handling including fix for PIG-174
        Hide
        Arun C Murthy added a comment -

        Fortified the check for equality of serializer/deserializer by actually instantiating them in

        {Load|Store}

        Optimizer and then using the objects to compare for equality.

        Show
        Arun C Murthy added a comment - Fortified the check for equality of serializer/deserializer by actually instantiating them in {Load|Store} Optimizer and then using the objects to compare for equality.
        Hide
        Arun C Murthy added a comment -

        Updated patch with following:

        1. BinaryStorage and test-cases
        2. Test cases for Streaming in Local execution mode
        3. Optimization for the simple LOAD/STREAM and STREAM/LOAD cases where the serialization/deserialization isn't necessary.
        4. Misc enhancments including fix for PIG-173

        Show
        Arun C Murthy added a comment - Updated patch with following: 1. BinaryStorage and test-cases 2. Test cases for Streaming in Local execution mode 3. Optimization for the simple LOAD/STREAM and STREAM/LOAD cases where the serialization/deserialization isn't necessary. 4. Misc enhancments including fix for PIG-173
        Hide
        Arun C Murthy added a comment -

        Updated BinaryStorage with test cases.

        This patch also contains test cases for testing Pig-Streaming in LOCAL mode (i.e. non Map-Reduce).

        Show
        Arun C Murthy added a comment - Updated BinaryStorage with test cases. This patch also contains test cases for testing Pig-Streaming in LOCAL mode (i.e. non Map-Reduce).
        Hide
        Olga Natkovich added a comment -

        I have committed the patch for milestone 2.

        Show
        Olga Natkovich added a comment - I have committed the patch for milestone 2.
        Hide
        Arun C Murthy added a comment -

        Updated BinaryStorage serializer/deserializer. I had to fix DataAtom's String-only semantics to let it handle binary data too...

        Show
        Arun C Murthy added a comment - Updated BinaryStorage serializer/deserializer. I had to fix DataAtom's String-only semantics to let it handle binary data too...
        Hide
        Arun C Murthy added a comment -

        A simple, as-is, BinaryStorage class which treats the entire file as byte stream - no formating or interpretation. This is a heads-up patch, I'll add test-cases etc. soon enough.

        Given that we already have BinStorage, what is a better name for this chappie? smile

        Show
        Arun C Murthy added a comment - A simple, as-is, BinaryStorage class which treats the entire file as byte stream - no formating or interpretation. This is a heads-up patch, I'll add test-cases etc. soon enough. Given that we already have BinStorage, what is a better name for this chappie? smile
        Hide
        Olga Natkovich added a comment -

        +1. Arun, looks good! I will try to work to commit this tomorrow.

        Show
        Olga Natkovich added a comment - +1. Arun, looks good! I will try to work to commit this tomorrow.
        Hide
        Arun C Murthy added a comment -

        Forgot to add that the patch successfully passed all unit tests and end-to-end tests ...

        Show
        Arun C Murthy added a comment - Forgot to add that the patch successfully passed all unit tests and end-to-end tests ...
        Hide
        Arun C Murthy added a comment -

        Updated patch ... minor change to ensure that auto-shipping is turned off whenever a DEFINE has been supplied by the user.

        Show
        Arun C Murthy added a comment - Updated patch ... minor change to ensure that auto-shipping is turned off whenever a DEFINE has been supplied by the user.
        Hide
        Arun C Murthy added a comment -

        Here is an updated, cleaner verison of the patch which implements the following:

        • Support for input/output/error specs (including multiple outputs)
        • Support for ship/cache specs (along with automatic shipping of executables as described in the PigStreamingFunctionalSpec)
        • Enhances the DEFINE operator to support the above features
        • Support for saving task logs on HDFS for debugging/analysing etc.
        • Documentation and test-cases.

        Appreciate feedback ...

        Show
        Arun C Murthy added a comment - Here is an updated, cleaner verison of the patch which implements the following: Support for input/output/error specs (including multiple outputs) Support for ship/cache specs (along with automatic shipping of executables as described in the PigStreamingFunctionalSpec) Enhances the DEFINE operator to support the above features Support for saving task logs on HDFS for debugging/analysing etc. Documentation and test-cases. Appreciate feedback ...
        Hide
        Olga Natkovich added a comment -

        Arun, sorry I did not have time to review the whole things. But here is feedback on what I managed to review

        (1) There are some System.err.println statements. They need to be converted to log statements.
        (2) Pig script can contain multiple output statements which means multiple output files. I see that get/setJobOutputFile is a new API in PigContext. Does the code manages this correctly?
        (3) In StreamSpec, you are using log.fatal. I think in the rest of the code we have been using log.error to indicate fatal errors and log.warn to indicate warning.
        (4) For new files/classes that you added, it would be nice to have a comment header to explain what that class does and where it is used.

        Show
        Olga Natkovich added a comment - Arun, sorry I did not have time to review the whole things. But here is feedback on what I managed to review (1) There are some System.err.println statements. They need to be converted to log statements. (2) Pig script can contain multiple output statements which means multiple output files. I see that get/setJobOutputFile is a new API in PigContext. Does the code manages this correctly? (3) In StreamSpec, you are using log.fatal. I think in the rest of the code we have been using log.error to indicate fatal errors and log.warn to indicate warning. (4) For new files/classes that you added, it would be nice to have a comment header to explain what that class does and where it is used.
        Hide
        Alan Gates added a comment -

        Commens on patch 2_0_20080317

        Looks like there's quite a few leftover printlns in here for debugging. It's probably easiest to search through the patch file and find all of the places.

        I don't understand why the executableManager in StreamSpec is passed in as an ExecutableManager then stored as a string (~line 40) and then later instantiated. Is this because it's stored on the front end and instantiated on the backend?

        In StreamingCommand.java, you are parsing the arguments to the streaming command. Is this something you could have the parser do for you at parse time and package it in a way that StreamingCommand could use them? We want to avoid manual parsing whenever possible.

        It would be nice to see some unit tests added for streaming. That doesn't have to be done in this patch.

        Show
        Alan Gates added a comment - Commens on patch 2_0_20080317 Looks like there's quite a few leftover printlns in here for debugging. It's probably easiest to search through the patch file and find all of the places. I don't understand why the executableManager in StreamSpec is passed in as an ExecutableManager then stored as a string (~line 40) and then later instantiated. Is this because it's stored on the front end and instantiated on the backend? In StreamingCommand.java, you are parsing the arguments to the streaming command. Is this something you could have the parser do for you at parse time and package it in a way that StreamingCommand could use them? We want to avoid manual parsing whenever possible. It would be nice to see some unit tests added for streaming. That doesn't have to be done in this patch.
        Hide
        Arun C Murthy added a comment -

        Still no good solution + we can improve it incrementally.
        Let's keep this as a back burner work then.

        Exactly my feelings too... and these are backed by our experiences with Hadoop Streaming too.

        Thanks for the review Pi!

        Show
        Arun C Murthy added a comment - Still no good solution + we can improve it incrementally. Let's keep this as a back burner work then. Exactly my feelings too... and these are backed by our experiences with Hadoop Streaming too. Thanks for the review Pi!
        Hide
        Pi Song added a comment -

        OK I'm not defining any new terminology. There are 3 cases:-

        (1) 1 input record becomes 1 output record (non-blocking)
        (2) A subset of input records become output records (blocking)
        (3) A subset of input records become output records via input file(blocking)

        For (1) you can do what I suggested.
        For (2),(3) seems what you can do is monitoring if the process still alive (But doesn't help if the process gets stuck in itself)

        Still no good solution + we can improve it incrementally.

        Let's keep this as a back burner work then.

        Your patch looks good.

        Show
        Pi Song added a comment - OK I'm not defining any new terminology. There are 3 cases:- (1) 1 input record becomes 1 output record (non-blocking) (2) A subset of input records become output records (blocking) (3) A subset of input records become output records via input file(blocking) For (1) you can do what I suggested. For (2),(3) seems what you can do is monitoring if the process still alive (But doesn't help if the process gets stuck in itself) Still no good solution + we can improve it incrementally. Let's keep this as a back burner work then. Your patch looks good.
        Hide
        Arun C Murthy added a comment -

        Ah! I was referring to the synchronous/asynchronous modes of Pig Streaming I've introduced in the latest patch... smile

        By synchronous I was referring to the mode where an input Datum is immediately sent to the process via it's stdin and asynchronous means it's sent to the local input file on disk from which the process later reads (i.e. it's stdin stream isn't used to pass input Datums).

        Hope that clarifies things...

        Show
        Arun C Murthy added a comment - Ah! I was referring to the synchronous/asynchronous modes of Pig Streaming I've introduced in the latest patch... smile By synchronous I was referring to the mode where an input Datum is immediately sent to the process via it's stdin and asynchronous means it's sent to the local input file on disk from which the process later reads (i.e. it's stdin stream isn't used to pass input Datums). Hope that clarifies things...
        Hide
        Pi Song added a comment -

        Arun, could you please elaborate what do you mean by "synchronous" ?

        For me, synchronous means the processing is done record by record
        whereas asynchronous means the processing can start only when the whole set of records are available.

        Show
        Pi Song added a comment - Arun, could you please elaborate what do you mean by "synchronous" ? For me, synchronous means the processing is done record by record whereas asynchronous means the processing can start only when the whole set of records are available.
        Hide
        Arun C Murthy added a comment -

        An e.g. for my previous comment is a streaming job which does image processing after collecting a bunch of jpgs.

        Show
        Arun C Murthy added a comment - An e.g. for my previous comment is a streaming job which does image processing after collecting a bunch of jpgs.
        Hide
        Arun C Murthy added a comment -

        You're talking about synchronous and asynchronous mode right?

        No, even in synchronous mode we've seen Hadoop Streaming applications which wait for all input (or a large subset of input) to arrive before starting computation... hence in those cases outputBytes is not a good candidate for measuring progress. Sorry, I should have clarified it better.

        Show
        Arun C Murthy added a comment - You're talking about synchronous and asynchronous mode right? No, even in synchronous mode we've seen Hadoop Streaming applications which wait for all input (or a large subset of input) to arrive before starting computation... hence in those cases outputBytes is not a good candidate for measuring progress. Sorry, I should have clarified it better.
        Hide
        Pi Song added a comment -

        2) You're talking about synchronous and asynchronous mode right?

        • For synchronous mode, outputBytes really seems to be a good candidate
        • For asynchronous mode, how about we also monitor outputBytes but with much higher timeout default?

        So basically, we do the same thing but with different parameter.

        3) I think Pig itself still doesn't have a good way to display execution errors. If there is a log file that we can access, that should be good enough for the time being.

        Show
        Pi Song added a comment - 2) You're talking about synchronous and asynchronous mode right? For synchronous mode, outputBytes really seems to be a good candidate For asynchronous mode, how about we also monitor outputBytes but with much higher timeout default? So basically, we do the same thing but with different parameter. 3) I think Pig itself still doesn't have a good way to display execution errors. If there is a log file that we can access, that should be good enough for the time being.
        Hide
        Arun C Murthy added a comment -

        Thanks for the review Pi... comments inline:

        In the parser, EXECCOMMAND doesn't seem to accept backtick escape characters. Do plan to have it or just say backticks aren't allowed?

        Yes, the plan to say that backticks aren't allowed.

        Agree with Alan to have a timeout mechanism. You already have got the outputBytes variable in PigExecutableManager. Possibly just use it to monitor the movement.

        Yep, as I noted earlier it's the same problem as with Hadoop Streaming. There are applications which just collect all of input data before processing it and in those cases outputBytes isn't useful at all... lets discuss.

        The error handling logic by just printing out on work load nodes will not be visible to Pig user.

        The error is also collected on HDFS in the _logs sub-dir of the final output directory... does that work?

        This following line in HadoopExecutableManager will not work on Windows:-

        It does work on Cygwin on Windows (which is a pre-req for Hadoop).

        Show
        Arun C Murthy added a comment - Thanks for the review Pi... comments inline: In the parser, EXECCOMMAND doesn't seem to accept backtick escape characters. Do plan to have it or just say backticks aren't allowed? Yes, the plan to say that backticks aren't allowed. Agree with Alan to have a timeout mechanism. You already have got the outputBytes variable in PigExecutableManager. Possibly just use it to monitor the movement. Yep, as I noted earlier it's the same problem as with Hadoop Streaming. There are applications which just collect all of input data before processing it and in those cases outputBytes isn't useful at all... lets discuss. The error handling logic by just printing out on work load nodes will not be visible to Pig user. The error is also collected on HDFS in the _logs sub-dir of the final output directory... does that work? This following line in HadoopExecutableManager will not work on Windows:- It does work on Cygwin on Windows (which is a pre-req for Hadoop).
        Hide
        Pi Song added a comment -

        Very good work to review.

        Here are my suggestions:-

        • In the parser, EXECCOMMAND doesn't seem to accept backtick escape characters. Do plan to have it or just say backticks aren't allowed?
        • Agree with Alan to have a timeout mechanism. You already have got the outputBytes variable in PigExecutableManager. Possibly just use it to monitor the movement.
        • The error handling logic by just printing out on work load nodes will not be visible to Pig user.
        • This following line in HadoopExecutableManager will not work on Windows:-
          FileUtil.chmod(executable.toString(), "a+x");
          
        Show
        Pi Song added a comment - Very good work to review. Here are my suggestions:- In the parser, EXECCOMMAND doesn't seem to accept backtick escape characters. Do plan to have it or just say backticks aren't allowed? Agree with Alan to have a timeout mechanism. You already have got the outputBytes variable in PigExecutableManager. Possibly just use it to monitor the movement. The error handling logic by just printing out on work load nodes will not be visible to Pig user. This following line in HadoopExecutableManager will not work on Windows:- FileUtil.chmod(executable.toString(), "a+x");
        Hide
        Arun C Murthy added a comment -

        Here is a patch which implements more of the Streaming spec... specifically it enhances the DEFINE to allow users to specify ship/cache/input/output specs and also saves stderr of the tasks to HDFS.

        Note: This patch needs PIG-154 in-order to get parsing the ship/cache/input/output specs right (it currently just uses square brackets instead of parenthesis).

        E.g.s

        define X `./myscript.py foo bar` ship['./myscript.py'] input['foo' using PigStorage(',')] output['bar' using PigStorage(',')]; 
        IP = load '/pig/in' using PigStorage(',');
        FILTERED_DATA = filter IP by $1 > '3';
        OP = stream FILTERED_DATA through X; 
        dump OP;
        

        As such, this is just for an early review and not ready for committing yet.

        Show
        Arun C Murthy added a comment - Here is a patch which implements more of the Streaming spec... specifically it enhances the DEFINE to allow users to specify ship/cache/input/output specs and also saves stderr of the tasks to HDFS. Note: This patch needs PIG-154 in-order to get parsing the ship/cache/input/output specs right (it currently just uses square brackets instead of parenthesis). E.g.s define X `./myscript.py foo bar` ship['./myscript.py'] input['foo' using PigStorage(',')] output['bar' using PigStorage(',')]; IP = load '/pig/in' using PigStorage(','); FILTERED_DATA = filter IP by $1 > '3'; OP = stream FILTERED_DATA through X; dump OP; As such, this is just for an early review and not ready for committing yet.
        Hide
        Olga Natkovich added a comment -

        The first patch has already been committed. No action needs to be taken till the next patch is submitted.

        Show
        Olga Natkovich added a comment - The first patch has already been committed. No action needs to be taken till the next patch is submitted.
        Hide
        Xu Zhang added a comment -

        Fixed a typo in the test plan

        Show
        Xu Zhang added a comment - Fixed a typo in the test plan
        Hide
        Arun C Murthy added a comment -

        Olga, there was a missing change to StreamSpec.java to ensure the right 'import' for PigExecutableManager... this patch fixes that.

        Show
        Arun C Murthy added a comment - Olga, there was a missing change to StreamSpec.java to ensure the right 'import' for PigExecutableManager... this patch fixes that.
        Hide
        Olga Natkovich added a comment -

        Committed the patch for M1. Xu, it is ready to be QAed.

        Show
        Olga Natkovich added a comment - Committed the patch for M1. Xu, it is ready to be QAed.
        Hide
        Olga Natkovich added a comment -

        I am testing the patch now.

        One comment is that streaming directory should be under impl not under backend. backend directory is for multiple backends that we support. I will re-arrange that.

        Show
        Olga Natkovich added a comment - I am testing the patch now. One comment is that streaming directory should be under impl not under backend. backend directory is for multiple backends that we support. I will re-arrange that.
        Hide
        Xu Zhang added a comment -

        Attach the test plan to jira based on Hadoop tradition.

        Show
        Xu Zhang added a comment - Attach the test plan to jira based on Hadoop tradition.
        Hide
        Alan Gates added a comment -

        I'm fine with committing the patch, as this isn't the final version.

        Show
        Alan Gates added a comment - I'm fine with committing the patch, as this isn't the final version.
        Hide
        Arun C Murthy added a comment -

        Here is a patch which addresses most of concerns raised by Olga/Alan.

        Alan - I haven't done anything to deal with hanging streaming processes... it is hard to come up with a suitable timeout since it is very difficult to predict what will work for everyone. This is similar to what we went through with Hadoop Streaming, and that landed up with the exact behaviour in this patch. Can we continue discussions while we commit this patch and revisit it in subsequent iterations?

        Show
        Arun C Murthy added a comment - Here is a patch which addresses most of concerns raised by Olga/Alan. Alan - I haven't done anything to deal with hanging streaming processes... it is hard to come up with a suitable timeout since it is very difficult to predict what will work for everyone. This is similar to what we went through with Hadoop Streaming, and that landed up with the exact behaviour in this patch. Can we continue discussions while we commit this patch and revisit it in subsequent iterations?
        Hide
        Alan Gates added a comment -

        Comments:

        1) We shouldn't be using printStackTrace() anymore.

        2) There should not be any System.out/err.println's in the code anymore. These should call log.info or log.error, as appropriate.

        3) In PigExecutableManager.splitArgs(), it would be better to use Character.isSpaceChar() or Character.isWhitespace() (I'm not clear on the difference between the two) rather than defining SPACE as ' '.

        4) There's no timeout on waiting for the user process to terminate. This doesn't seem good. If the user process goes into an infinite loop we're stuck. It seems like we need some way to check that it's making progress (still reading input or producing output).

        5) Several places it would be good to add log.debug() statements, such as when the executable is execed, when it returns, etc. This way developers could see the command line being passed, the return code being returned, etc.

        Show
        Alan Gates added a comment - Comments: 1) We shouldn't be using printStackTrace() anymore. 2) There should not be any System.out/err.println's in the code anymore. These should call log.info or log.error, as appropriate. 3) In PigExecutableManager.splitArgs(), it would be better to use Character.isSpaceChar() or Character.isWhitespace() (I'm not clear on the difference between the two) rather than defining SPACE as ' '. 4) There's no timeout on waiting for the user process to terminate. This doesn't seem good. If the user process goes into an infinite loop we're stuck. It seems like we need some way to check that it's making progress (still reading input or producing output). 5) Several places it would be good to add log.debug() statements, such as when the executable is execed, when it returns, etc. This way developers could see the command line being passed, the return code being returned, etc.
        Hide
        Olga Natkovich added a comment -

        Arun, looks good.

        A couple of comments:

        (1) In StreamSpec.java, we check for non-null executor on every add. If it would be better to precreate it upfront if possible. You don't want to have a branch on every tuple.

        (2) In the same code, in catch statement, there is no need to print the stack or System.err. We are currently working on defining the error handling policy and it seems that we are convergin on just throwing exceptions and the outer layer will take care of logging.

        Show
        Olga Natkovich added a comment - Arun, looks good. A couple of comments: (1) In StreamSpec.java, we check for non-null executor on every add. If it would be better to precreate it upfront if possible. You don't want to have a branch on every tuple. (2) In the same code, in catch statement, there is no need to print the stack or System.err. We are currently working on defining the error handling policy and it seems that we are convergin on just throwing exceptions and the outer layer will take care of logging.
        Hide
        Arun C Murthy added a comment -

        Here is a reasonably self-sufficient first-cut patch for streaming support in Pig which implements the following:

        1. Map/Reduce (i.e. after grouping) side streaming in the eval-pipeline using the PigExecutableManager
        2. Support for output schema for the STREAM operator
        3. Test cases

        We can commit this patch while I continue to work on features such as the DEFINE command etc.

        Show
        Arun C Murthy added a comment - Here is a reasonably self-sufficient first-cut patch for streaming support in Pig which implements the following: 1. Map/Reduce (i.e. after grouping) side streaming in the eval-pipeline using the PigExecutableManager 2. Support for output schema for the STREAM operator 3. Test cases We can commit this patch while I continue to work on features such as the DEFINE command etc.
        Hide
        Arun C Murthy added a comment - - edited

        I've put up a first cut here: http://wiki.apache.org/pig/PigStreamingDesign.

        Summary:


        Pig Streaming 1.0 - Design

        The main goal of Pig-Streaming 1.0 is to support a form of processing in which the entire portion of the dataset that corresponds to a task in sent to the task and output streams out. There is no temporal or causal correspondence between an input record and specific output records.

        This document specs out the high-level design of how Pig will support the Streaming concept. It builds off the functional spec documented at: http://wiki.apache.org/pig/PigStreamingFunctionalSpec.

        Main Components:
        1. User-facing changes (e.g. Pig Latin)
        2. Logical Layer
        3. Physical Layer
        4. Streaming Implementation

        1. User-facing changes

        The main changes include the addition of the new STREAM operator and the enhancement of the DEFINE operator to allow alias-ing the actual command to which data is streamed. (See the wiki for details.)

        There are two affected components:

        a) QueryParser

        Straight-forward changes to QueryParser include parsing the STREAM operator and then save relevant details in a StreamEvalSpec. StreamEvalSpec is a sub-class of org.apache.pig.impl.eval.EvalSpec; and it works similar to other Eval operators (FILTER|FOREACH) in the sense that it just takes a bag of tuples and does one operation on each tuple. It also ensures that the STREAM operator can be chained with other Evals in exactly the same manner as in Pig today (by constructing CompositeEvalSpecs).

        StreamEvalSpec also contains necessary details such as:
        i. Actual command and it's arguments, if any.
        ii. Information about the ship-spec and cache-spec which will go through Hadoop's DistributedCache.
        iii. Serializer/Deserializer information.

        b) PigScriptParser

        The PigScriptParser also needs to be enhanced to enable it to process the newer constructs supported by the DEFINE operator. The one change we need to make to PigContext is to add a PigContext.registerStreamingCommand api to enable the PigScriptParser to store the streaming command and relevant information to be passed along to QueryParser and other components.

        Summary:

        New:
        StreamEvalSpec.java (extends EvalSpec)

        Modify:
        QueryParser.jjt
        PigScriptParser.jj
        PigContext.java (add registerStreamingCommand)

        2. Logical Layer

        Since 'streaming' is an eval on each record in the dataset, it should still be a logical Eval operator i.e. LOEval should suffice for streaming operations too.

        3. Physical Layer

        Pig's MapReduce physical layer shouldn't be affected at all, since the StreamEvalSpec neatly fits into the map/reduce pipeline as another CompositeEvalSpec. (StreamEvalSpec.setupDefaultPipe is the critical knob.)

        4. Streaming Implementation

        The main infrastructure to support the notion of data processing by sending dataset to a task's input and collecting its output is a generic manager who takes care of setup/teardown of the streaming task, manages it's stdin/stderr/stdout streams and also does post-processing. The plan is to implement a org.apache.pig.backend.streaming.PigExecutableManager to take over the aforementioned responsibilities. The decision to keep that separate from Hadoop's Streaming component (in contrib/streaming) to ensure that Pig has no extraneous dependency on Hadoop streaming.

        The PigExecutableManager also is responsible for dealing with multiple outputs of the streaming tasks (refer to the functional spec in the wiki).

        New:
        org.apache.pig.backend.streaming.PigExecutableManager

        class PigExecutableManager {
        
            // Configure the executable-manager
            void configure() throws IOException;
        
            // Runtime
            void run() throws IOException;
        
            // Clean-up hook (for e.g. multiple outputs' handling etc.)
            void close() throws IOException;
        
            // Send the Datum to the executable
            void add(Datum d);
          }
        

        The important deviation from current Pig infrastructure is that there isn't a one-to-one mapping between inputs and output records anymore since the user-script could (potentially) consume all the input before it emits any output records.
        The way to get around this is to wrap the DataCollector and hence the next successor in the pipleline in an OutputCollector and pass it along to the ExecutableManager.

        Show
        Arun C Murthy added a comment - - edited I've put up a first cut here: http://wiki.apache.org/pig/PigStreamingDesign . Summary: Pig Streaming 1.0 - Design The main goal of Pig-Streaming 1.0 is to support a form of processing in which the entire portion of the dataset that corresponds to a task in sent to the task and output streams out. There is no temporal or causal correspondence between an input record and specific output records. This document specs out the high-level design of how Pig will support the Streaming concept. It builds off the functional spec documented at: http://wiki.apache.org/pig/PigStreamingFunctionalSpec . Main Components: 1. User-facing changes (e.g. Pig Latin) 2. Logical Layer 3. Physical Layer 4. Streaming Implementation 1. User-facing changes The main changes include the addition of the new STREAM operator and the enhancement of the DEFINE operator to allow alias-ing the actual command to which data is streamed. (See the wiki for details.) There are two affected components: a) QueryParser Straight-forward changes to QueryParser include parsing the STREAM operator and then save relevant details in a StreamEvalSpec. StreamEvalSpec is a sub-class of org.apache.pig.impl.eval.EvalSpec; and it works similar to other Eval operators (FILTER|FOREACH) in the sense that it just takes a bag of tuples and does one operation on each tuple. It also ensures that the STREAM operator can be chained with other Evals in exactly the same manner as in Pig today (by constructing CompositeEvalSpecs). StreamEvalSpec also contains necessary details such as: i. Actual command and it's arguments, if any. ii. Information about the ship-spec and cache-spec which will go through Hadoop's DistributedCache. iii. Serializer/Deserializer information. b) PigScriptParser The PigScriptParser also needs to be enhanced to enable it to process the newer constructs supported by the DEFINE operator. The one change we need to make to PigContext is to add a PigContext.registerStreamingCommand api to enable the PigScriptParser to store the streaming command and relevant information to be passed along to QueryParser and other components. Summary: New: StreamEvalSpec.java (extends EvalSpec) Modify: QueryParser.jjt PigScriptParser.jj PigContext.java (add registerStreamingCommand) 2. Logical Layer Since 'streaming' is an eval on each record in the dataset, it should still be a logical Eval operator i.e. LOEval should suffice for streaming operations too. 3. Physical Layer Pig's MapReduce physical layer shouldn't be affected at all, since the StreamEvalSpec neatly fits into the map/reduce pipeline as another CompositeEvalSpec. (StreamEvalSpec.setupDefaultPipe is the critical knob.) 4. Streaming Implementation The main infrastructure to support the notion of data processing by sending dataset to a task's input and collecting its output is a generic manager who takes care of setup/teardown of the streaming task, manages it's stdin/stderr/stdout streams and also does post-processing. The plan is to implement a org.apache.pig.backend.streaming.PigExecutableManager to take over the aforementioned responsibilities. The decision to keep that separate from Hadoop's Streaming component (in contrib/streaming) to ensure that Pig has no extraneous dependency on Hadoop streaming. The PigExecutableManager also is responsible for dealing with multiple outputs of the streaming tasks (refer to the functional spec in the wiki). New: org.apache.pig.backend.streaming.PigExecutableManager class PigExecutableManager { // Configure the executable-manager void configure() throws IOException; // Runtime void run() throws IOException; // Clean-up hook (for e.g. multiple outputs' handling etc.) void close() throws IOException; // Send the Datum to the executable void add(Datum d); } The important deviation from current Pig infrastructure is that there isn't a one-to-one mapping between inputs and output records anymore since the user-script could (potentially) consume all the input before it emits any output records. The way to get around this is to wrap the DataCollector and hence the next successor in the pipleline in an OutputCollector and pass it along to the ExecutableManager.

          People

          • Assignee:
            Arun C Murthy
            Reporter:
            Olga Natkovich
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development