Pig
  1. Pig
  2. PIG-240

Support launching concurrent Pig jobs from one VM

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: impl
    • Labels:
      None

      Description

      For some applications it would be convenient to launch concurrent Pig jobs from a single VM. This is currently not possible since Pig has static mutable state.

      1. patch_240.txt
        5 kB
        Jeff Zhang
      2. pig-240.patch
        5 kB
        Tom White
      3. pig-240-1.patch
        2 kB
        Bennie Schut
      4. pig-240-2.patch
        3 kB
        Vincent BARAT

        Activity

        Hide
        Olga Natkovich added a comment -

        For the client side, we are already taken care of this in PIG-1874 which will be fixed in Pig 0.9.

        For the backend side, there is no use-case as hadoop does not run tasks in threads and as far as I know there are no plans to do so.

        If the need does arise, I would be better to open a new bug that addresses that particular use case

        Show
        Olga Natkovich added a comment - For the client side, we are already taken care of this in PIG-1874 which will be fixed in Pig 0.9. For the backend side, there is no use-case as hadoop does not run tasks in threads and as far as I know there are no plans to do so. If the need does arise, I would be better to open a new bug that addresses that particular use case
        Hide
        Thomas Memenga added a comment -

        Additional Note: Applied latest patch to 0.8.0 (release), but executing the following script results in nullpointerexception:

        data = load 'hdfs://host.local:9000/user/hadoop/input/part-r-00000' using PigStorage() as (thed,ts:int,cId,ai,si);
        input_filtered = foreach data generate ts, ai, si;
        pipe_4965 = filter input_filtered by  ts > 1288043999 and  ts < 1288047599;
        pipe_4965_grouped = group pipe_4965 by (ai, si);
        pipe_4965_flat = foreach pipe_4965_grouped generate FLATTEN($0) , COUNT($1) , MIN(pipe_4965.ts) as ts_min, MAX(pipe_4965.ts) as ts_max;
        store pipe_4965_flat INTO '/user/hadoop/output/4965' USING PigStorage();
        
        org.apache.pig.impl.logicalLayer.FrontendException: ERROR 2042: Error in new logical plan. Try -Dpig.usenewlogicalplan=false.
                at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.compile(HExecutionEngine.java:309)
                at org.apache.pig.PigServer.compilePp(PigServer.java:1354)
                at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1196)
                at org.apache.pig.PigServer.execute(PigServer.java:1190)
                at org.apache.pig.PigServer.access$100(PigServer.java:128)
                at org.apache.pig.PigServer$Graph.execute(PigServer.java:1517)
                at org.apache.pig.PigServer.executeBatchEx(PigServer.java:362)
                at org.apache.pig.PigServer.executeBatch(PigServer.java:329)
                at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:112)
                at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:169)
                at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:141)
                at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:90)
                at org.apache.pig.Main.run(Main.java:510)
                at org.apache.pig.Main.main(Main.java:107)
        Caused by: java.lang.NullPointerException
                at org.apache.pig.newplan.ReverseDependencyOrderWalker.walk(ReverseDependencyOrderWalker.java:70)
                at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:50)
                at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:105)
                at org.apache.pig.newplan.logical.relational.LOGenerate.accept(LOGenerate.java:229)
                at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
                at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:94)
                at org.apache.pig.newplan.logical.relational.LOForEach.accept(LOForEach.java:71)
                at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
                at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:50)
                at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.compile(HExecutionEngine.java:261)
                ... 13 more
        

        Script works fine with unpatched 0.8.0

        Show
        Thomas Memenga added a comment - Additional Note: Applied latest patch to 0.8.0 (release), but executing the following script results in nullpointerexception: data = load 'hdfs: //host.local:9000/user/hadoop/input/part-r-00000' using PigStorage() as (thed,ts: int ,cId,ai,si); input_filtered = foreach data generate ts, ai, si; pipe_4965 = filter input_filtered by ts > 1288043999 and ts < 1288047599; pipe_4965_grouped = group pipe_4965 by (ai, si); pipe_4965_flat = foreach pipe_4965_grouped generate FLATTEN($0) , COUNT($1) , MIN(pipe_4965.ts) as ts_min, MAX(pipe_4965.ts) as ts_max; store pipe_4965_flat INTO '/user/hadoop/output/4965' USING PigStorage(); org.apache.pig.impl.logicalLayer.FrontendException: ERROR 2042: Error in new logical plan. Try -Dpig.usenewlogicalplan= false . at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.compile(HExecutionEngine.java:309) at org.apache.pig.PigServer.compilePp(PigServer.java:1354) at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1196) at org.apache.pig.PigServer.execute(PigServer.java:1190) at org.apache.pig.PigServer.access$100(PigServer.java:128) at org.apache.pig.PigServer$Graph.execute(PigServer.java:1517) at org.apache.pig.PigServer.executeBatchEx(PigServer.java:362) at org.apache.pig.PigServer.executeBatch(PigServer.java:329) at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:112) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:169) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:141) at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:90) at org.apache.pig.Main.run(Main.java:510) at org.apache.pig.Main.main(Main.java:107) Caused by: java.lang.NullPointerException at org.apache.pig.newplan.ReverseDependencyOrderWalker.walk(ReverseDependencyOrderWalker.java:70) at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:50) at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:105) at org.apache.pig.newplan.logical.relational.LOGenerate.accept(LOGenerate.java:229) at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75) at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:94) at org.apache.pig.newplan.logical.relational.LOForEach.accept(LOForEach.java:71) at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75) at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:50) at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.compile(HExecutionEngine.java:261) ... 13 more Script works fine with unpatched 0.8.0
        Hide
        Thomas Memenga added a comment -

        Any news on this? We are trying to trigger a lot of pig-jobs simultaneously from a single jvm and we are recieving "Unable to find clone for op" errors from time to time (pig 0.8.0). It's a pitty PigServer isn't threadsafe, spawning jvms for each script seems to be our only option.

        Show
        Thomas Memenga added a comment - Any news on this? We are trying to trigger a lot of pig-jobs simultaneously from a single jvm and we are recieving "Unable to find clone for op" errors from time to time (pig 0.8.0). It's a pitty PigServer isn't threadsafe, spawning jvms for each script seems to be our only option.
        Hide
        Vincent BARAT added a comment -

        Alan, sorry for forgetting this. Yes, I would like you to check the patch. My guess is that, despite it does the work for me, it is not sufficient: there should be other work to do in other classes.

        Show
        Vincent BARAT added a comment - Alan, sorry for forgetting this. Yes, I would like you to check the patch. My guess is that, despite it does the work for me, it is not sufficient: there should be other work to do in other classes.
        Hide
        Alan Gates added a comment -

        Vincent, your last patch is not marked as okay for inclusion nor marked as patch submitted. Did you want us to take a look at it and see if it resolves the issue? If so, please mark it as ok for inclusion.

        Show
        Alan Gates added a comment - Vincent, your last patch is not marked as okay for inclusion nor marked as patch submitted. Did you want us to take a look at it and see if it resolves the issue? If so, please mark it as ok for inclusion.
        Hide
        Vincent BARAT added a comment -

        The last patch trigger a NullPointerException for me.
        I've tried to correct it, but I really don't know if what I do is correct.
        Anyway, I attach my patch, please remove it if it is a misunderstanding.

        Show
        Vincent BARAT added a comment - The last patch trigger a NullPointerException for me. I've tried to correct it, but I really don't know if what I do is correct. Anyway, I attach my patch, please remove it if it is a misunderstanding.
        Hide
        Bennie Schut added a comment -

        I was also getting multithreading issues like "Unable to find clone for op Const 16-169"

        the previous patch didn't help in my case (not sure why) however I changed the LogicalPlanCloneHelper to be used in a non-static way.
        However I have to admit that I'm not 100% clear on how this LogicalPlanCloneHelper is working in the big picture.

        This however completely solved my problem.

        Show
        Bennie Schut added a comment - I was also getting multithreading issues like "Unable to find clone for op Const 16-169" the previous patch didn't help in my case (not sure why) however I changed the LogicalPlanCloneHelper to be used in a non-static way. However I have to admit that I'm not 100% clear on how this LogicalPlanCloneHelper is working in the big picture. This however completely solved my problem.
        Hide
        Alan Gates added a comment -

        Jeff,

        It looks tome like we don't want to share LogicalPlanCloneHelper across threads. Instead, there should be one instance per job. Otherwise separate jobs may mingle their logical plan maps. If we instead had a shared container that tracked LogicalPlanCloneHelper by thread id and then change LogicalPlan and LogicalPlanCloneHelper to fetch the right map, this should work.

        Show
        Alan Gates added a comment - Jeff, It looks tome like we don't want to share LogicalPlanCloneHelper across threads. Instead, there should be one instance per job. Otherwise separate jobs may mingle their logical plan maps. If we instead had a shared container that tracked LogicalPlanCloneHelper by thread id and then change LogicalPlan and LogicalPlanCloneHelper to fetch the right map, this should work.
        Hide
        Jeff Zhang added a comment -

        This is a init patch for supporting mutilthread pig mode. Pig can submit jobs in each Thread.

        I found the mOpToCloneMap in LogicalPlanCloneHelper is static shared, so I made some synchronization in this variable. I am not sure whether this patch will completely solve the multithread problem. Because it is not easy to do test multithread program. But at least it works fine for me at least several weeks. So if you are really want to support mulithread, you can try this patch.

        Show
        Jeff Zhang added a comment - This is a init patch for supporting mutilthread pig mode. Pig can submit jobs in each Thread. I found the mOpToCloneMap in LogicalPlanCloneHelper is static shared, so I made some synchronization in this variable. I am not sure whether this patch will completely solve the multithread problem. Because it is not easy to do test multithread program. But at least it works fine for me at least several weeks. So if you are really want to support mulithread, you can try this patch.
        Hide
        Pi Song added a comment -

        Dear Tom,

        Thanks a lot for your suggestions. However, these things are being completely rewritten in our new branch (under branch/type) that should come out within a few weeks.

        There was a recent discussing about whether to make the launching method static in the new design. Problaby you could join our discussion at PIG-162.

        Pi

        Show
        Pi Song added a comment - Dear Tom, Thanks a lot for your suggestions. However, these things are being completely rewritten in our new branch (under branch/type) that should come out within a few weeks. There was a recent discussing about whether to make the launching method static in the new design. Problaby you could join our discussion at PIG-162 . Pi
        Hide
        Tom White added a comment -

        Patch with some of the simpler fixes.

        Show
        Tom White added a comment - Patch with some of the simpler fixes.
        Hide
        Tom White added a comment -

        The following classes have shared state in (non-final) static fields. (I used FindBugs to get these, it would be nice if it was run automatically like Hadoop.)

        BagFactory has a static SpillableMemoryManager. Since BagFactory is a singleton, SpillableMemoryManager can just be an instance variable.

        MapReduceLauncher has several static fields and associated setters. POMapreduce has a static instance of MapReduceLauncher. This can be fixed by making HExecutionEngine create a MapReduceLauncher instance, set its non-static fields and set the instance on POMapreduce.

        The static field totalHadoopTimeSpent on MapReduceLauncher cannot be dealt with in this way since it is used by PigServer to accumulate the time spent on jobs. This can be fixed by keeping it static (but accessing through a method rather than the field) and using AtomicLong for thread-safety. Longer term it would be better to have MapReduceLauncher.launchPig return a result object that PigServer gets the time from.

        LogicalPlanBuilder has a static classloader field which is set by PigContext.addJar() and Main.main(). This field is widely used. It is used by the static method resolveClassName() on PigContext which is widely used via instantiateFuncFromSpec(). I think the proper approach is to make the classloader field an instance variable of PigContext, and make the PigContext available as needed.

        PigMapReduce has two static fields: reporter and pigContext. DataBag.reportProgress uses reporter - DataBags should be constructed with a PigContext so they can get its non-static reporter. HadoopExecutableManager.configure uses pigContext - but it could just be given a JobConf in its constructor.

        PigInputFormat has a static activeSplit field. Making the RecordReader hold a reference to the activeSplit would help here, but that might not be a solution for everywhere that uses the static field (e.g. FileLocalizer.openDFSFile, but that might not matter since it doesn't work locally anyway).

        Show
        Tom White added a comment - The following classes have shared state in (non-final) static fields. (I used FindBugs to get these, it would be nice if it was run automatically like Hadoop.) BagFactory has a static SpillableMemoryManager. Since BagFactory is a singleton, SpillableMemoryManager can just be an instance variable. MapReduceLauncher has several static fields and associated setters. POMapreduce has a static instance of MapReduceLauncher. This can be fixed by making HExecutionEngine create a MapReduceLauncher instance, set its non-static fields and set the instance on POMapreduce. The static field totalHadoopTimeSpent on MapReduceLauncher cannot be dealt with in this way since it is used by PigServer to accumulate the time spent on jobs. This can be fixed by keeping it static (but accessing through a method rather than the field) and using AtomicLong for thread-safety. Longer term it would be better to have MapReduceLauncher.launchPig return a result object that PigServer gets the time from. LogicalPlanBuilder has a static classloader field which is set by PigContext.addJar() and Main.main(). This field is widely used. It is used by the static method resolveClassName() on PigContext which is widely used via instantiateFuncFromSpec(). I think the proper approach is to make the classloader field an instance variable of PigContext, and make the PigContext available as needed. PigMapReduce has two static fields: reporter and pigContext. DataBag.reportProgress uses reporter - DataBags should be constructed with a PigContext so they can get its non-static reporter. HadoopExecutableManager.configure uses pigContext - but it could just be given a JobConf in its constructor. PigInputFormat has a static activeSplit field. Making the RecordReader hold a reference to the activeSplit would help here, but that might not be a solution for everywhere that uses the static field (e.g. FileLocalizer.openDFSFile, but that might not matter since it doesn't work locally anyway).

          People

          • Assignee:
            Jeff Zhang
            Reporter:
            Tom White
          • Votes:
            4 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development