Pig
  1. Pig
  2. PIG-114

store one alias/logicalPlan twice leads to instantiation of StoreFunc as LoadFunc

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.0.0
    • Fix Version/s: 0.1.0
    • Component/s: impl
    • Labels:
      None
    • Patch Info:
      Patch Available

      Description

      Calling PigServer#store() twice for an alias results in following exception :

      java.lang.RuntimeException: java.lang.ClassCastException: org.apache.pig.test.DummyStoreFunc cannot be cast to org.apache.pig.LoadFunc
      	at org.apache.pig.backend.local.executionengine.POLoad.<init>(POLoad.java:59)
      	at org.apache.pig.backend.local.executionengine.LocalExecutionEngine.doCompile(LocalExecutionEngine.java:167)
      	at org.apache.pig.backend.local.executionengine.LocalExecutionEngine.doCompile(LocalExecutionEngine.java:184)
      	at org.apache.pig.backend.local.executionengine.LocalExecutionEngine.doCompile(LocalExecutionEngine.java:184)
      	at org.apache.pig.backend.local.executionengine.LocalExecutionEngine.compile(LocalExecutionEngine.java:111)
      	at org.apache.pig.backend.local.executionengine.LocalExecutionEngine.compile(LocalExecutionEngine.java:90)
      	at org.apache.pig.backend.local.executionengine.LocalExecutionEngine.compile(LocalExecutionEngine.java:1)
      	at org.apache.pig.PigServer.store(PigServer.java:330)
      	at org.apache.pig.PigServer.store(PigServer.java:317)
      	at org.apache.pig.test.StoreTwiceTest.testIt(StoreTwiceTest.java:31)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:589)
      	at junit.framework.TestCase.runTest(TestCase.java:164)
      	at junit.framework.TestCase.runBare(TestCase.java:130)
      	at junit.framework.TestResult$1.protect(TestResult.java:110)
      	at junit.framework.TestResult.runProtected(TestResult.java:128)
      	at junit.framework.TestResult.run(TestResult.java:113)
      	at junit.framework.TestCase.run(TestCase.java:120)
      	at junit.framework.TestSuite.runTest(TestSuite.java:228)
      	at junit.framework.TestSuite.run(TestSuite.java:223)
      	at org.junit.internal.runners.OldTestClassRunner.run(OldTestClassRunner.java:35)
      	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:45)
      	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
      	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
      	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
      	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
      	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)
      Caused by: java.lang.ClassCastException: org.apache.pig.test.DummyStoreFunc cannot be cast to org.apache.pig.LoadFunc
      	at org.apache.pig.backend.local.executionengine.POLoad.<init>(POLoad.java:57)
      	... 28 more
      

      I will attach a patch with a test scenario for this. Basically the code is as follow:

      PigServer pig = new PigServer(ExecType.LOCAL);
              pig
                      .registerQuery("A = LOAD 'test/org/apache/pig/test/StoreTwiceTest.java' USING "
                              + DummyLoadFunc.class.getName() + "();");
              pig.registerQuery("B = FOREACH A GENERATE * ;");
              File outputFile = new File("/tmp/testPigOutput");
              outputFile.delete();
              pig.store("A", outputFile.getAbsolutePath(), DummyStoreFunc.class
                      .getName()
                      + "()");
              outputFile.delete();
              pig.store("B", outputFile.getAbsolutePath(), DummyStoreFunc.class
                      .getName()
                      + "()");
              outputFile.delete();
              assertEquals(2, _storedTuples.size());
      
      1. pig_114_optimize_fix_v2.patch
        15 kB
        Pi Song
      2. PIG114_FixOptimize_Sample.patch
        10 kB
        Pi Song
      3. PIG114_FixOptimize1.patch
        15 kB
        Pi Song
      4. pigPatch-storeTwice-620665.patch
        4 kB
        Johannes Zillmann

        Activity

        Hide
        Stefan Groschupf added a comment -

        for reference:
        From a frist look into the code I guess this is what happens.
        The data are stored successfully on disk the first time you call store. So PoStore adds an entry to materializedResults.
        What is basically a hashmap that holds OperatorKey - just a name and LocalResult - a pointer to the file you just wrote.

        If you now trigger store again for the same alias, pig tries to optimize performance bt reusing the output file you just stored.
        This happens by first check if there is already materializedResults entry.
        What is the case - so in theory this could be reused just read and writte again to a new path.
        Now there are a couple of problems. First in your testcase you delete the output file (/tmp/testPigOutput) but pig tries to read in this file again to write it out again. What means you read and write at the same time into the same file. Another problem in your test you delete this file between the store calls , so it can't be read back.

        Now a pig come in. Pig tries to read back in this file with the same object you used for storing this file.
        So the object need to implement LoadFunc und StoreFunc, what is not the case in your test you only implement storefunc, what makes sense from my pov. See POLoad, line 57,
        lf = (LoadFunc) PigContext.instantiateFuncFromSpec(fileSpec.getFuncSpec()); // the return value can be a StoreFunc only as well.

        This worked so far since most of the StoreFunc and LoadFunc are implemented in one class, but not a good idea.

        So now the question to the pig developers, how we can solve that problem?
        Only cache materialized files in case we do have a load and a store func available?
        Re process all required plans in case we can not load a materialized result?

        Show
        Stefan Groschupf added a comment - for reference: From a frist look into the code I guess this is what happens. The data are stored successfully on disk the first time you call store. So PoStore adds an entry to materializedResults. What is basically a hashmap that holds OperatorKey - just a name and LocalResult - a pointer to the file you just wrote. If you now trigger store again for the same alias, pig tries to optimize performance bt reusing the output file you just stored. This happens by first check if there is already materializedResults entry. What is the case - so in theory this could be reused just read and writte again to a new path. Now there are a couple of problems. First in your testcase you delete the output file (/tmp/testPigOutput) but pig tries to read in this file again to write it out again. What means you read and write at the same time into the same file. Another problem in your test you delete this file between the store calls , so it can't be read back. Now a pig come in. Pig tries to read back in this file with the same object you used for storing this file. So the object need to implement LoadFunc und StoreFunc, what is not the case in your test you only implement storefunc, what makes sense from my pov. See POLoad, line 57, lf = (LoadFunc) PigContext.instantiateFuncFromSpec(fileSpec.getFuncSpec()); // the return value can be a StoreFunc only as well. This worked so far since most of the StoreFunc and LoadFunc are implemented in one class, but not a good idea. So now the question to the pig developers, how we can solve that problem? Only cache materialized files in case we do have a load and a store func available? Re process all required plans in case we can not load a materialized result?
        Hide
        Pi Song added a comment -

        Even both StoreFunc and LoadFunc exist in the custom storage class, it doesn't guarantee that

        LoadFunc (StoreFunc(x)) = x
        

        as this is left open to users to implement.

        As the definition of optimization (in this case where we are only interested in output) , the output regardless of doing optimization or not should be the same.

        Reading the output of "Store Operator" is therefore considered "unsafe" for optimization.

        My suggestions would be :-
        1. By default go back to get intermediate result before "Store" as this will rely on StoreFunc and LoadFunc of PigStorage (Supposing that this is not merely load-and-then-store execution plan). Though this will incur some performance hit as the output of MapReduce run associated with "Store operator" cannot be reused.
        2. Provide a way for users implementing storage to tell the execution engine that LoadFunc is truly inverse of StoreFunc in the implementation so that the execution engine can take advantage of that and doesn't have to go to the intermediate result before "Store" .
        3. All the built-in storage implementation should be truly reversible

        Show
        Pi Song added a comment - Even both StoreFunc and LoadFunc exist in the custom storage class, it doesn't guarantee that LoadFunc (StoreFunc(x)) = x as this is left open to users to implement. As the definition of optimization (in this case where we are only interested in output) , the output regardless of doing optimization or not should be the same. Reading the output of "Store Operator" is therefore considered "unsafe" for optimization. My suggestions would be :- 1. By default go back to get intermediate result before "Store" as this will rely on StoreFunc and LoadFunc of PigStorage (Supposing that this is not merely load-and-then-store execution plan). Though this will incur some performance hit as the output of MapReduce run associated with "Store operator" cannot be reused. 2. Provide a way for users implementing storage to tell the execution engine that LoadFunc is truly inverse of StoreFunc in the implementation so that the execution engine can take advantage of that and doesn't have to go to the intermediate result before "Store" . 3. All the built-in storage implementation should be truly reversible
        Hide
        Stefan Groschupf added a comment -

        Hi Pi,
        from my point of view 2. would be the cleanest and best solution. We should add a interface named something like ReversibleStorage (please give me a better name). This interface extends from Load and StoreFunc - java doc clearly mentioned that this interface implementation has to guarantee ruly reversibility.
        Then we change all internal store and load function to implement this interface. Finally we check during materialization if we can reuse output based if this interface is implemented.
        If people agree I would be happy to work on a patch asap - since this is kind of a blocker issue for our project.

        Thoughts?

        Show
        Stefan Groschupf added a comment - Hi Pi, from my point of view 2. would be the cleanest and best solution. We should add a interface named something like ReversibleStorage (please give me a better name). This interface extends from Load and StoreFunc - java doc clearly mentioned that this interface implementation has to guarantee ruly reversibility. Then we change all internal store and load function to implement this interface. Finally we check during materialization if we can reuse output based if this interface is implemented. If people agree I would be happy to work on a patch asap - since this is kind of a blocker issue for our project. Thoughts?
        Hide
        Pi Song added a comment -

        Stefan,
        Sorry I should have used bullets instead of numbers. What I really meant was all have to go together. (2) and (3) will be easy to do but (1) will require a lot more work. In order to do that I want to listen to other people a bit more.

        Show
        Pi Song added a comment - Stefan, Sorry I should have used bullets instead of numbers. What I really meant was all have to go together. (2) and (3) will be easy to do but (1) will require a lot more work. In order to do that I want to listen to other people a bit more.
        Hide
        Alan Gates added a comment -

        I'm working on making changes to the load function interface as part of the types. Based on your discussion, I was thinking of adding a method isReversable to the interface. I think this would meet requirement two. Sound reasonable?

        Show
        Alan Gates added a comment - I'm working on making changes to the load function interface as part of the types. Based on your discussion, I was thinking of adding a method isReversable to the interface. I think this would meet requirement two. Sound reasonable?
        Hide
        Pi Song added a comment -

        Alan,

        Personally, I prefer a new interface.

        So we can have something like

        LoadFunc
        StoreFunc
        ReversibleLoadStoreFunc extends LoadFunc, StoreFunc
        

        This gives better control as someone to implement Load and Store in a reversible way will be enforced to implement both Load and Store. If you just add isReversable in existing interfaces, you cannot enforce this.

        Should we create a sub-issue to deal with (2), (3) first? I think (1) will take a lot more time for testing.

        Show
        Pi Song added a comment - Alan, Personally, I prefer a new interface. So we can have something like LoadFunc StoreFunc ReversibleLoadStoreFunc extends LoadFunc, StoreFunc This gives better control as someone to implement Load and Store in a reversible way will be enforced to implement both Load and Store. If you just add isReversable in existing interfaces, you cannot enforce this. Should we create a sub-issue to deal with (2), (3) first? I think (1) will take a lot more time for testing.
        Hide
        Pi Song added a comment -

        Alan,

        Are you still working on this? If you don't, I may submit an initial patch for discussion tomorrow.

        Show
        Pi Song added a comment - Alan, Are you still working on this? If you don't, I may submit an initial patch for discussion tomorrow.
        Hide
        Pi Song added a comment -

        Here is the implementation of what we've discussed above.

        This patch is only for preview. Open for discussion about the approach.(Not to be committed)
        All the current unit tests passed.

        Known issues

        • Unit test still have to be refactored plus more tests needed (Thanks Johannes for initial test code)
        • PigContext seems to know too much about class instantiation. It may require a refactor. Though let's create a new issue to do it later.

        I also want to mention the concept of using marker interfaces for optimization hints if operators have special properties. For example, in this issue, we might want to do instead:-

        LoadStoreFunc extends LoadFunc, StoreFunc
        PigStorage implements LoadStoreFunc, Reversible
        

        Reversible is a marker interface telling that the applied two-way function is reversible.

        Show
        Pi Song added a comment - Here is the implementation of what we've discussed above. This patch is only for preview. Open for discussion about the approach.( Not to be committed ) All the current unit tests passed. Known issues Unit test still have to be refactored plus more tests needed (Thanks Johannes for initial test code) PigContext seems to know too much about class instantiation. It may require a refactor. Though let's create a new issue to do it later. I also want to mention the concept of using marker interfaces for optimization hints if operators have special properties. For example, in this issue, we might want to do instead:- LoadStoreFunc extends LoadFunc, StoreFunc PigStorage implements LoadStoreFunc, Reversible Reversible is a marker interface telling that the applied two-way function is reversible.
        Hide
        Benjamin Reed added a comment -

        I'm not a big fan of marker interfaces. (That doesn't mean we shouldn't do it of course If we do use a Reversible interface it should extend LoadFunc and StoreFunc since it would be silly to use a Reversible marker and not implement both interfaces.

        Alternatively we could add a method to StoreFunc:

        LoadFunc getReloader();

        that returns a LoadFunc to use to reload the data or null if it cannot be reloaded. This would remove the requirement that a reloadable StoreFunc also implement LoadFunc.

        Show
        Benjamin Reed added a comment - I'm not a big fan of marker interfaces. (That doesn't mean we shouldn't do it of course If we do use a Reversible interface it should extend LoadFunc and StoreFunc since it would be silly to use a Reversible marker and not implement both interfaces. Alternatively we could add a method to StoreFunc: LoadFunc getReloader(); that returns a LoadFunc to use to reload the data or null if it cannot be reloaded. This would remove the requirement that a reloadable StoreFunc also implement LoadFunc.
        Hide
        Pi Song added a comment -

        Ben,

        Sorry that I led to a long story again. I think whether to do marker interfaces or not can be addressed when we implement optimization.
        At this stage I just want to fix this bug which as discussed above will be fixed by
        (1) Add a structure to identify reversible property of StoreFunc
        (2) Fix optimization engine not to do reverse if it's not safe
        (3) Unit testing

        I have implemented (1) using what I've proposed before

        LoadFunc
        StoreFunc
        ReversibleLoadStoreFunc extends LoadFunc, StoreFunc
        

        As you've said

        Alternatively we could add a method to StoreFunc:
        LoadFunc getReloader();

        We still cannot conclude your way or my way which way is better. But since I've already implemented using my way. Let's use my one. ( Meritocrazy )

        For (2) I've fixed optimization in both local and mapreduce engines

        For (3) This time I do an end-to-end test. I'm sorry for that. I find it much more complex to do only plan compiler test (This involves both local and mapreduce engines and physical ops are different). Also a usual way to do optimization test is to compare actual output structure with expected output structure (deterministic optimization engine). We still don't have that framework yet. This should be done as a part of optimization work.

        Show
        Pi Song added a comment - Ben, Sorry that I led to a long story again. I think whether to do marker interfaces or not can be addressed when we implement optimization. At this stage I just want to fix this bug which as discussed above will be fixed by (1) Add a structure to identify reversible property of StoreFunc (2) Fix optimization engine not to do reverse if it's not safe (3) Unit testing I have implemented (1) using what I've proposed before LoadFunc StoreFunc ReversibleLoadStoreFunc extends LoadFunc, StoreFunc As you've said Alternatively we could add a method to StoreFunc: LoadFunc getReloader(); We still cannot conclude your way or my way which way is better. But since I've already implemented using my way. Let's use my one. ( Meritocrazy ) For (2) I've fixed optimization in both local and mapreduce engines For (3) This time I do an end-to-end test. I'm sorry for that. I find it much more complex to do only plan compiler test (This involves both local and mapreduce engines and physical ops are different). Also a usual way to do optimization test is to compare actual output structure with expected output structure (deterministic optimization engine). We still don't have that framework yet. This should be done as a part of optimization work.
        Hide
        Pi Song added a comment -

        Here's the complete patch.

        Unit test included.
        All tests passed.

        Ready to be committed

        Show
        Pi Song added a comment - Here's the complete patch. Unit test included. All tests passed. Ready to be committed
        Hide
        Pi Song added a comment -

        The flow doesn't work. I have to do manual attach.

        Show
        Pi Song added a comment - The flow doesn't work. I have to do manual attach.
        Hide
        Benjamin Reed added a comment -

        +1 Good job. My main concern was that Reversible not just be a marker otherwise programmers could mark classes that could not possibly be reversible (by not implementing LoadFunc for example), so I'm happy with extending LoadFunc and StoreFunc.

        Show
        Benjamin Reed added a comment - +1 Good job. My main concern was that Reversible not just be a marker otherwise programmers could mark classes that could not possibly be reversible (by not implementing LoadFunc for example), so I'm happy with extending LoadFunc and StoreFunc.
        Hide
        Pi Song added a comment -

        That's right. This implementation doesn't rely on marker interface concept.

        However you still cannot make it 100%. There's still risk if someone implements ReversibleLoadStoreFunc without really having "LoadFunc (StoreFunc( x )) = x". Therefore what we can do is just enforce the contract in documentation. In order to do optimization based on UDF, I think there is no 100% safe. (For example, in the current LoadFunc, you intention is to have the class reading data from filename supplied in bind() right? but people can still read from somewhere else. Someone might just open /etc/passwd and send it out )

        Show
        Pi Song added a comment - That's right. This implementation doesn't rely on marker interface concept. However you still cannot make it 100%. There's still risk if someone implements ReversibleLoadStoreFunc without really having "LoadFunc (StoreFunc( x )) = x". Therefore what we can do is just enforce the contract in documentation. In order to do optimization based on UDF, I think there is no 100% safe. (For example, in the current LoadFunc, you intention is to have the class reading data from filename supplied in bind() right? but people can still read from somewhere else. Someone might just open /etc/passwd and send it out )
        Hide
        Benjamin Reed added a comment -

        I completely agree, and you already have the contract documented in the ReversibleLoadStoreFunc interface.

        Show
        Benjamin Reed added a comment - I completely agree, and you already have the contract documented in the ReversibleLoadStoreFunc interface.
        Hide
        Olga Natkovich added a comment -

        Ben, is this patch ready to be committed?

        Show
        Olga Natkovich added a comment - Ben, is this patch ready to be committed?
        Hide
        Benjamin Reed added a comment -

        yes

        Show
        Benjamin Reed added a comment - yes
        Hide
        Alan Gates added a comment -

        Sorry for the way long delay, but I have a question on this code. If I understand it correctly, it forces all load and store functions to be reversible since the engine currently (wrongly) makes that assumption. Is that correct? If so, that doesn't seem good. We have loader functions that users are using that are not store functions. We can't break them now just because internally we try to do the wrong thing. We should fix the underlying problem. I'm not saying the Reversible interface is a bad idea, because that can allow the system to optimize better. But we can't force all load functions to be reversible immediately.

        Show
        Alan Gates added a comment - Sorry for the way long delay, but I have a question on this code. If I understand it correctly, it forces all load and store functions to be reversible since the engine currently (wrongly) makes that assumption. Is that correct? If so, that doesn't seem good. We have loader functions that users are using that are not store functions. We can't break them now just because internally we try to do the wrong thing. We should fix the underlying problem. I'm not saying the Reversible interface is a bad idea, because that can allow the system to optimize better. But we can't force all load functions to be reversible immediately.
        Hide
        Pi Song added a comment -

        Alan,

        This code changes the semantic to not load from previous result by default unless the previous result was stored by a reversible load/store function. It has also changed internal storages (e.g. PigStorage) to implement reversible interface.

        Show
        Pi Song added a comment - Alan, This code changes the semantic to not load from previous result by default unless the previous result was stored by a reversible load/store function. It has also changed internal storages (e.g. PigStorage) to implement reversible interface.
        Hide
        Alan Gates added a comment -

        In MapReducePlanCompiler.java,

        if (PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec()) 
                instanceof ReversibleLoadStoreFunc) {
            POMapreduce pom = new POMapreduce(logicalKey.getScope(),
                                              nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
                                              execEngine.getPhysicalOpTable(),
                                              logicalKey,
                                              pigContext);
            pom.addInputFile(fileSpec);
            pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest);
                        
            return pom.getOperatorKey();      
        }
        

        That looks to me like it won't allow map reduce to start unless the store func
        is reversible. Am I missing something?

        Show
        Alan Gates added a comment - In MapReducePlanCompiler.java, if (PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec()) instanceof ReversibleLoadStoreFunc) { POMapreduce pom = new POMapreduce(logicalKey.getScope(), nodeIdGenerator.getNextNodeId(logicalKey.getScope()), execEngine.getPhysicalOpTable(), logicalKey, pigContext); pom.addInputFile(fileSpec); pom.mapParallelism = Math .max(pom.mapParallelism, materializedResult.parallelismRequest); return pom.getOperatorKey(); } That looks to me like it won't allow map reduce to start unless the store func is reversible. Am I missing something?
        Hide
        Pi Song added a comment -

        Alan,

        That block is basically just a caching hook-up, if the function is not reversible then it will fall through to below which is actually compiling the operator and use it without reading the cached output. My opinion toward the new plan compilation is to move something like this to the optimization stage as an optional filter.

        Here is the block including its context:-

            public OperatorKey compile(OperatorKey logicalKey, 
                                       Map<OperatorKey, LogicalOperator> logicalOpTable, 
                                       HExecutionEngine execEngine) throws IOException {
                
                // check to see if we have materialized results for the logical tree to
                // compile, if so, re-use them...
                //
                Map<OperatorKey, MapRedResult> materializedResults = execEngine.getMaterializedResults();
                
                MapRedResult materializedResult = materializedResults.get(logicalKey);
                
                if ( (materializedResult != null) && 
                     (PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec()) 
                                                                   instanceof ReversibleLoadStoreFunc)  )  {
                    	POMapreduce pom = new POMapreduce(logicalKey.getScope(),
                                     	                 nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
                                        	              execEngine.getPhysicalOpTable(),
                                        	              logicalKey,
                                        	              pigContext);
        
                   		pom.addInputFile(materializedResult.outFileSpec);
                    	pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest);
        
                    	return pom.getOperatorKey();           
                }
                
                // first, compile inputs into MapReduce operators
                OperatorKey[] compiledInputs = new OperatorKey[logicalOpTable.get(logicalKey).getInputs().size()];
                
                for (int i = 0; i < logicalOpTable.get(logicalKey).getInputs().size(); i++)
                    compiledInputs[i] = compile(logicalOpTable.get(logicalKey).getInputs().get(i),
                                                logicalOpTable,
                                                execEngine);
                
                // then, compile this operator; if possible, merge with previous MapReduce
                // operator rather than introducing a new one
                
                LogicalOperator lo = logicalOpTable.get(logicalKey);
                
                if (lo instanceof LOEval) {
                    POMapreduce pom = ((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0]))
                                        .copy(nodeIdGenerator.getNextNodeId(logicalKey.getScope())); // make a copy of the previous
        
                // More and more and more plan compilation here
        
        Show
        Pi Song added a comment - Alan, That block is basically just a caching hook-up, if the function is not reversible then it will fall through to below which is actually compiling the operator and use it without reading the cached output. My opinion toward the new plan compilation is to move something like this to the optimization stage as an optional filter. Here is the block including its context:- public OperatorKey compile(OperatorKey logicalKey, Map<OperatorKey, LogicalOperator> logicalOpTable, HExecutionEngine execEngine) throws IOException { // check to see if we have materialized results for the logical tree to // compile, if so, re-use them... // Map<OperatorKey, MapRedResult> materializedResults = execEngine.getMaterializedResults(); MapRedResult materializedResult = materializedResults.get(logicalKey); if ( (materializedResult != null ) && (PigContext.instantiateFuncFromSpec(materializedResult.outFileSpec.getFuncSpec()) instanceof ReversibleLoadStoreFunc) ) { POMapreduce pom = new POMapreduce(logicalKey.getScope(), nodeIdGenerator.getNextNodeId(logicalKey.getScope()), execEngine.getPhysicalOpTable(), logicalKey, pigContext); pom.addInputFile(materializedResult.outFileSpec); pom.mapParallelism = Math .max(pom.mapParallelism, materializedResult.parallelismRequest); return pom.getOperatorKey(); } // first, compile inputs into MapReduce operators OperatorKey[] compiledInputs = new OperatorKey[logicalOpTable.get(logicalKey).getInputs().size()]; for ( int i = 0; i < logicalOpTable.get(logicalKey).getInputs().size(); i++) compiledInputs[i] = compile(logicalOpTable.get(logicalKey).getInputs().get(i), logicalOpTable, execEngine); // then, compile this operator ; if possible, merge with previous MapReduce // operator rather than introducing a new one LogicalOperator lo = logicalOpTable.get(logicalKey); if (lo instanceof LOEval) { POMapreduce pom = ((POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0])) .copy(nodeIdGenerator.getNextNodeId(logicalKey.getScope())); // make a copy of the previous // More and more and more plan compilation here
        Hide
        Pi Song added a comment -

        BTW, this new patch is in sync with the current trunk.

        Show
        Pi Song added a comment - BTW, this new patch is in sync with the current trunk.
        Hide
        Olga Natkovich added a comment -

        Are we adding test cases with both reversable and non-reversable storage functions? I think this has to be part of this patch.

        Show
        Olga Natkovich added a comment - Are we adding test cases with both reversable and non-reversable storage functions? I think this has to be part of this patch.
        Hide
        Pi Song added a comment -

        Of course, I've included the test cases.

        Show
        Pi Song added a comment - Of course, I've included the test cases.
        Hide
        Alan Gates added a comment -

        Patch checked in at revision 649154. Thanks Pi.

        Show
        Alan Gates added a comment - Patch checked in at revision 649154. Thanks Pi.

          People

          • Assignee:
            Pi Song
            Reporter:
            Johannes Zillmann
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development