Pig
  1. Pig
  2. PIG-3478

Make StreamingUDF work for Hadoop 2

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.14.0
    • Component/s: impl
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      PIG-2417 introduced Streaming UDF. However, it does not work under Hadoop 2. Both unit tests/e2e tests under Haodop 2 fails. We need to fix it.

      1. PIG-3478-3.patch
        16 kB
        Lorand Bendig
      2. PIG-3478-2.patch
        15 kB
        Lorand Bendig
      3. PIG-3478.patch
        13 kB
        Jeremy Karn

        Activity

        Hide
        Daniel Dai added a comment -

        Ok, let me commit it to trunk. Thanks Lorand!

        Show
        Daniel Dai added a comment - Ok, let me commit it to trunk. Thanks Lorand!
        Hide
        Lorand Bendig added a comment -

        Daniel Dai, thanks for checking it. Sure I will, but currently I'm experiencing permission issues and can't commit.

        Show
        Lorand Bendig added a comment - Daniel Dai , thanks for checking it. Sure I will, but currently I'm experiencing permission issues and can't commit.
        Hide
        Daniel Dai added a comment -

        +1, PIG-3478-3.patch works for me. Lorand Bendig, would you like to commit it?

        Show
        Daniel Dai added a comment - +1, PIG-3478 -3.patch works for me. Lorand Bendig , would you like to commit it?
        Hide
        Lorand Bendig added a comment -

        Cheolsoo Park, Jeremy Karn thanks for looking at this issue. I updated the patch so that hadoop.tmp.dir will be used if hadoop.log.dir is not writable.

        Show
        Lorand Bendig added a comment - Cheolsoo Park , Jeremy Karn thanks for looking at this issue. I updated the patch so that hadoop.tmp.dir will be used if hadoop.log.dir is not writable.
        Hide
        Cheolsoo Park added a comment -

        Jeremy Karn, that sounds reasonable to me. Thank you for the clarification.

        Show
        Cheolsoo Park added a comment - Jeremy Karn , that sounds reasonable to me. Thank you for the clarification.
        Hide
        Jeremy Karn added a comment -

        Thanks Lorand Bendig!

        Cheolsoo Park, we originally chose the log directory because we wanted users to be able to view their udf output from the Task Tracker in the same place as the rest of the logs for that task. Maybe it makes sense to fall back to temp dir if the user doesn't have permission to write to the log dir?

        Show
        Jeremy Karn added a comment - Thanks Lorand Bendig ! Cheolsoo Park , we originally chose the log directory because we wanted users to be able to view their udf output from the Task Tracker in the same place as the rest of the logs for that task. Maybe it makes sense to fall back to temp dir if the user doesn't have permission to write to the log dir?
        Hide
        Cheolsoo Park added a comment -

        Lorand Bendig, unfortunately, I am getting the following error while running the e2e tests-

        Caused by: java.io.IOException: Could not create directory: /apps/hadoop/2.4.0/logs/udfOutput
                at org.apache.pig.scripting.ScriptingOutputCapturer.getTaskLogDir(ScriptingOutputCapturer.java:103)
                at org.apache.pig.scripting.ScriptingOutputCapturer.getStandardOutputRootWriteLocation(ScriptingOutputCapturer.java:85)
                at org.apache.pig.impl.builtin.StreamingUDF.constructCommand(StreamingUDF.java:186)
                at org.apache.pig.impl.builtin.StreamingUDF.startUdfController(StreamingUDF.java:163)
                at org.apache.pig.impl.builtin.StreamingUDF.initialize(StreamingUDF.java:156)
                at org.apache.pig.impl.builtin.StreamingUDF.exec(StreamingUDF.java:146)
                at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:344)
        

        The problem is that users don't always have write permissions to hadoop.log.dir /apps/hadoop/2.4.0/logs. Shouldn't we write to the temp dir instead?

        Show
        Cheolsoo Park added a comment - Lorand Bendig , unfortunately, I am getting the following error while running the e2e tests- Caused by: java.io.IOException: Could not create directory: /apps/hadoop/2.4.0/logs/udfOutput at org.apache.pig.scripting.ScriptingOutputCapturer.getTaskLogDir(ScriptingOutputCapturer.java:103) at org.apache.pig.scripting.ScriptingOutputCapturer.getStandardOutputRootWriteLocation(ScriptingOutputCapturer.java:85) at org.apache.pig.impl.builtin.StreamingUDF.constructCommand(StreamingUDF.java:186) at org.apache.pig.impl.builtin.StreamingUDF.startUdfController(StreamingUDF.java:163) at org.apache.pig.impl.builtin.StreamingUDF.initialize(StreamingUDF.java:156) at org.apache.pig.impl.builtin.StreamingUDF.exec(StreamingUDF.java:146) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:344) The problem is that users don't always have write permissions to hadoop.log.dir /apps/hadoop/2.4.0/logs . Shouldn't we write to the temp dir instead?
        Hide
        Cheolsoo Park added a comment -

        This is great! I am running the tests to verify your fix.

        Aniket Mokashi, Daniel Dai, can we commit this into branch-0.13 if all the tests pass? I think this is an important bug fix.

        Show
        Cheolsoo Park added a comment - This is great! I am running the tests to verify your fix. Aniket Mokashi , Daniel Dai , can we commit this into branch-0.13 if all the tests pass? I think this is an important bug fix.
        Hide
        Lorand Bendig added a comment -

        Adjustment of the initial patch: StreamingUDF#ensureUserFileAvailable couldn't get the inputStream of the registered python script due to an additional slash in the file's path. Now I managed to run unit and e2e tests (local and cluster mode (pseudo-distributed)) successfully on Hadoop 2.2.0 and 2.4.0.

        Show
        Lorand Bendig added a comment - Adjustment of the initial patch: StreamingUDF#ensureUserFileAvailable couldn't get the inputStream of the registered python script due to an additional slash in the file's path. Now I managed to run unit and e2e tests (local and cluster mode (pseudo-distributed)) successfully on Hadoop 2.2.0 and 2.4.0.
        Hide
        Prashant Kommireddi added a comment -

        Thanks for getting back. I've moved this to 0.13, we could possibly get it in by then.

        Show
        Prashant Kommireddi added a comment - Thanks for getting back. I've moved this to 0.13, we could possibly get it in by then.
        Hide
        Jeremy Karn added a comment -

        I probably won't get a chance to look at this in the next couple of weeks.

        Show
        Jeremy Karn added a comment - I probably won't get a chance to look at this in the next couple of weeks.
        Hide
        Prashant Kommireddi added a comment -

        Russell Jurney Jeremy Karn anyone looking into this?

        Show
        Prashant Kommireddi added a comment - Russell Jurney Jeremy Karn anyone looking into this?
        Hide
        Russell Jurney added a comment -

        Looking into this myself.

        Show
        Russell Jurney added a comment - Looking into this myself.
        Hide
        Rohini Palaniswamy added a comment -

        Cancelling patch till Cheolsoo's comments are addressed.

        Show
        Rohini Palaniswamy added a comment - Cancelling patch till Cheolsoo's comments are addressed.
        Hide
        Cheolsoo Park added a comment -

        Jeremy Karn, I ran e2e tests (StreamingPythonUDFs) on an EMR Hadoop 2.2 cluster and saw two issues as follows:

        1. NPE in StreamingUDF.java
          2013-11-10 22:32:19,694 FATAL [IPC Server handler 11 on 33809] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1383086282107_1892_m_000000_3 - exited : org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing [POUserFunc (Name: POUserFunc(org.apache.pig.impl.builtin.StreamingUDF)[int] - scope-3 Operator Key: scope-3) children: null at []]: java.lang.NullPointerException
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:338)
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378)
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298)
          	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:282)
          	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:277)
          	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
          	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
          	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775)
          	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
          	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
          	at java.security.AccessController.doPrivileged(Native Method)
          	at javax.security.auth.Subject.doAs(Subject.java:415)
          	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
          	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
          Caused by: java.lang.NullPointerException
          	at org.apache.pig.impl.builtin.StreamingUDF.ensureUserFileAvailable(StreamingUDF.java:249)
          	at org.apache.pig.impl.builtin.StreamingUDF.constructCommand(StreamingUDF.java:218)
          	at org.apache.pig.impl.builtin.StreamingUDF.startUdfController(StreamingUDF.java:163)
          	at org.apache.pig.impl.builtin.StreamingUDF.initialize(StreamingUDF.java:156)
          	at org.apache.pig.impl.builtin.StreamingUDF.exec(StreamingUDF.java:146)
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330)
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextInteger(POUserFunc.java:379)
          	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:321)
          	... 13 more
          

          NPE is thrown from udfFileStream.close(); where udfFileStream is null.

        2. After fixing #1 by adding a null check, I ran into this error:
          2013-11-10 23:00:51,402 FATAL [IPC Server handler 11 on 40139] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1383086282107_1905_m_000000_3 - exited : org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: StreamingUDF [Could not create directory: /home/hadoop/.versions/2.2.0/logs/udfOutput]at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:358)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextInteger(POUserFunc.java:379)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:321)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298)
          at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:282)
          at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:277)
          at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
          at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
          at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775)
          at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
          at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
          at java.security.AccessController.doPrivileged(Native Method)
          at javax.security.auth.Subject.doAs(Subject.java:415)
          at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
          at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
          Caused by: java.io.IOException: Could not create directory: /home/hadoop/.versions/2.2.0/logs/udfOutput
          at org.apache.pig.scripting.ScriptingOutputCapturer.getTaskLogDir(ScriptingOutputCapturer.java:104)
          at org.apache.pig.scripting.ScriptingOutputCapturer.getStandardOutputRootWriteLocation(ScriptingOutputCapturer.java:86)
          at org.apache.pig.impl.builtin.StreamingUDF.constructCommand(StreamingUDF.java:187)
          at org.apache.pig.impl.builtin.StreamingUDF.startUdfController(StreamingUDF.java:163)
          at org.apache.pig.impl.builtin.StreamingUDF.initialize(StreamingUDF.java:156)at org.apache.pig.impl.builtin.StreamingUDF.exec(StreamingUDF.java:146)
          at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330)... 15 more
          

        Can you look into these failures? We should also enable StreamingPythonUDFs tests in nightly.conf once they're fixed.

        Show
        Cheolsoo Park added a comment - Jeremy Karn , I ran e2e tests (StreamingPythonUDFs) on an EMR Hadoop 2.2 cluster and saw two issues as follows: NPE in StreamingUDF.java 2013-11-10 22:32:19,694 FATAL [IPC Server handler 11 on 33809] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1383086282107_1892_m_000000_3 - exited : org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing [POUserFunc (Name: POUserFunc(org.apache.pig.impl.builtin.StreamingUDF)[ int ] - scope-3 Operator Key: scope-3) children: null at []]: java.lang.NullPointerException at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:338) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:282) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:277) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) Caused by: java.lang.NullPointerException at org.apache.pig.impl.builtin.StreamingUDF.ensureUserFileAvailable(StreamingUDF.java:249) at org.apache.pig.impl.builtin.StreamingUDF.constructCommand(StreamingUDF.java:218) at org.apache.pig.impl.builtin.StreamingUDF.startUdfController(StreamingUDF.java:163) at org.apache.pig.impl.builtin.StreamingUDF.initialize(StreamingUDF.java:156) at org.apache.pig.impl.builtin.StreamingUDF.exec(StreamingUDF.java:146) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextInteger(POUserFunc.java:379) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:321) ... 13 more NPE is thrown from udfFileStream.close(); where udfFileStream is null. After fixing #1 by adding a null check, I ran into this error: 2013-11-10 23:00:51,402 FATAL [IPC Server handler 11 on 40139] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1383086282107_1905_m_000000_3 - exited : org.apache.pig.backend.executionengine.ExecException: ERROR 2078: Caught error from UDF: StreamingUDF [Could not create directory: /home/hadoop/.versions/2.2.0/logs/udfOutput]at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:358) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextInteger(POUserFunc.java:379) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:321) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:282) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:277) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) Caused by: java.io.IOException: Could not create directory: /home/hadoop/.versions/2.2.0/logs/udfOutput at org.apache.pig.scripting.ScriptingOutputCapturer.getTaskLogDir(ScriptingOutputCapturer.java:104) at org.apache.pig.scripting.ScriptingOutputCapturer.getStandardOutputRootWriteLocation(ScriptingOutputCapturer.java:86) at org.apache.pig.impl.builtin.StreamingUDF.constructCommand(StreamingUDF.java:187) at org.apache.pig.impl.builtin.StreamingUDF.startUdfController(StreamingUDF.java:163) at org.apache.pig.impl.builtin.StreamingUDF.initialize(StreamingUDF.java:156)at org.apache.pig.impl.builtin.StreamingUDF.exec(StreamingUDF.java:146) at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330)... 15 more Can you look into these failures? We should also enable StreamingPythonUDFs tests in nightly.conf once they're fixed.
        Hide
        Jeremy Karn added a comment -

        Here's a patch I tested on a Hadoop 2.1.0 cluster. I was able to run an example script and get the unit tests to pass but I wasn't able to run the e2e tests on the cluster to confirm that those are also fixed.

        Show
        Jeremy Karn added a comment - Here's a patch I tested on a Hadoop 2.1.0 cluster. I was able to run an example script and get the unit tests to pass but I wasn't able to run the e2e tests on the cluster to confirm that those are also fixed.

          People

          • Assignee:
            Lorand Bendig
            Reporter:
            Daniel Dai
          • Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development