Pig
  1. Pig
  2. PIG-979

Acummulator Interface for UDFs

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.4.0
    • Fix Version/s: 0.6.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      Add an accumulator interface for UDFs that would allow them to take a set number of records at a time instead of the entire bag.

      1. PIG-979.patch
        200 kB
        Ying He
      2. PIG-979.patch
        166 kB
        Ying He

        Activity

        Hide
        Daniel Dai added a comment -

        Patch committed, thanks Ying, Alan!

        Show
        Daniel Dai added a comment - Patch committed, thanks Ying, Alan!
        Hide
        Ying He added a comment -

        the release audit warnings are all from html files.

        Show
        Ying He added a comment - the release audit warnings are all from html files.
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12424621/PIG-979.patch
        against trunk revision 835284.

        +1 @author. The patch does not contain any @author tags.

        +1 tests included. The patch appears to include 15 new or modified tests.

        +1 javadoc. The javadoc tool did not generate any warning messages.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        +1 findbugs. The patch does not introduce any new Findbugs warnings.

        -1 release audit. The applied patch generated 350 release audit warnings (more than the trunk's current 318 warnings).

        +1 core tests. The patch passed core unit tests.

        +1 contrib tests. The patch passed contrib unit tests.

        Test results: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/151/testReport/
        Release audit warnings: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/151/artifact/trunk/patchprocess/releaseAuditDiffWarnings.txt
        Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/151/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
        Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/151/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12424621/PIG-979.patch against trunk revision 835284. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 15 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. -1 release audit. The applied patch generated 350 release audit warnings (more than the trunk's current 318 warnings). +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/151/testReport/ Release audit warnings: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/151/artifact/trunk/patchprocess/releaseAuditDiffWarnings.txt Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/151/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/151/console This message is automatically generated.
        Hide
        Alan Gates added a comment -

        New patch looks good. +1

        Show
        Alan Gates added a comment - New patch looks good. +1
        Hide
        Ying He added a comment -

        performance tests doesn't show noticeable difference between trunk and accumulator patch when calling no-accumulator udfs.

        the script to test performance is:

        register /homes/yinghe/pig_test/pigperf.jar;
        register /homes/yinghe/pig_test/string.jar;
        register /homes/yinghe/pig_test/piggybank.jar;

        A = load '/user/pig/tests/data/pigmix_large/page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader() as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links);

        B = foreach A generate user, org.apache.pig.piggybank.evaluation.string.STRINGCAT(user, ip_addr) as id;

        C = group B by id parallel 10;

        D = foreach C

        { generate group, string.BagCount2(B)*string.ColumnLen2(B, 0); }

        store D into 'test2';

        The input data has 100M rows, output has 57M rows, so the UDFs are called 57M times.
        The result is

        with patch: 5min 14sec
        w/o patch: 5min 17sec

        Show
        Ying He added a comment - performance tests doesn't show noticeable difference between trunk and accumulator patch when calling no-accumulator udfs. the script to test performance is: register /homes/yinghe/pig_test/pigperf.jar; register /homes/yinghe/pig_test/string.jar; register /homes/yinghe/pig_test/piggybank.jar; A = load '/user/pig/tests/data/pigmix_large/page_views' using org.apache.pig.test.utils.datagen.PigPerformanceLoader() as (user, action, timespent, query_term, ip_addr, timestamp, estimated_revenue, page_info, page_links); B = foreach A generate user, org.apache.pig.piggybank.evaluation.string.STRINGCAT(user, ip_addr) as id; C = group B by id parallel 10; D = foreach C { generate group, string.BagCount2(B)*string.ColumnLen2(B, 0); } store D into 'test2'; The input data has 100M rows, output has 57M rows, so the UDFs are called 57M times. The result is with patch: 5min 14sec w/o patch: 5min 17sec
        Hide
        Ying He added a comment -

        patch to address Alan's comments.

        Show
        Ying He added a comment - patch to address Alan's comments.
        Hide
        Ying He added a comment -

        Alan, thanks for the feedback.

        1. A test case is already created to test mix of accumulator UDF with regular UDF, it is in testAccumBasic().

        2. The optimizer can't be applied when inner is set to POPackage, because if an inner is set, POPackage checks the bag for that input is NULL, if it is, POPackage returns NULL. This can only be done when all the tuples are retrieved and put into a bag.

        3 & 4, will fix that

        5. needs performance testing.

        6. The reducer get results from POPackage and pass it to root, which is POForEach, to process. From POForEach perspective, it gets a tuple with bags in it from POPackage. Then POForEach retrieves tuples off iterator and pass to UDFs in multiple cycles. Because only POPackage knows how to read tuples out of iterator and put in proper bags, AccumulativeTupleBuffer and AccumulativeBag are created to communicate between POPackage and POForEach. Every time POForEach calls getNextBatch() on AccumulativeTupleBuffer, it in effects calls inner class of POPackage to retrieve tuples out of iterator.

        POPackage can not be the one to block the reading of tuples, because it is only called once from reducer. I also thought of changing reducer to call POPackage multiple times to process each batch of data, then it becomes tricky to maintain correct states of operators, and all operators in reducer plan would have to support partial data, which is not necessary.

        Show
        Ying He added a comment - Alan, thanks for the feedback. 1. A test case is already created to test mix of accumulator UDF with regular UDF, it is in testAccumBasic(). 2. The optimizer can't be applied when inner is set to POPackage, because if an inner is set, POPackage checks the bag for that input is NULL, if it is, POPackage returns NULL. This can only be done when all the tuples are retrieved and put into a bag. 3 & 4, will fix that 5. needs performance testing. 6. The reducer get results from POPackage and pass it to root, which is POForEach, to process. From POForEach perspective, it gets a tuple with bags in it from POPackage. Then POForEach retrieves tuples off iterator and pass to UDFs in multiple cycles. Because only POPackage knows how to read tuples out of iterator and put in proper bags, AccumulativeTupleBuffer and AccumulativeBag are created to communicate between POPackage and POForEach. Every time POForEach calls getNextBatch() on AccumulativeTupleBuffer, it in effects calls inner class of POPackage to retrieve tuples out of iterator. POPackage can not be the one to block the reading of tuples, because it is only called once from reducer. I also thought of changing reducer to call POPackage multiple times to process each batch of data, then it becomes tricky to maintain correct states of operators, and all operators in reducer plan would have to support partial data, which is not necessary.
        Hide
        Alan Gates added a comment -

        A test should be added that checks that when accumulator UDFs are mixed with non-accumulator UDFs it works properly.

        Why is the optimization not applied in the case that inner is set on POPackage? It seems the accumulator interface should still work in this case.

        Some comments on what AccumulatorOptimizer.check() is and what it allows would be helpful.

        The code contains tabs in some spots instead of 4 spaces.

        The cases in which the accumulator interface can be used has been greatly extended by adding the support for unary and binary operators. But this comes at a cost. Every binary and unary comparison now has to make the accumChild call. 99% of the time this will be false. It's not clear to me how often users will do things like:

        foreach C generate accumfunc1(A) + accumfunc2(A) OR
        foreach C generate (accumfunc1(A) > 100 ? 0 : 1)
        

        which is the only time I can see where this additional functionality is useful, since we don't currently allow these functions in filters. It's possible that JIT along with branch prediction will remove this extra cost, since the branch will always be one way or another for a given query. But I'd like to see this tested. It would be interesting to compare a query with heavy use of binary operators (but no accumulator UDFs) with and without this change.

        I don't understand why you need the new interface AccumulativeTupleBuffer and class AccumulativeBag. Why can't the block of tuples read off of the iterator just be put in a regular bag and then passed to the UDFs?

        In all the sum implementations of accumulate you calculate the sum of the block of tuples twice. It should be done once and cached.

        In COUNT.accumulate rather than making intermediateCount a Long and then forcing the creation of a new Long each time you add one you should instead keep it as a long and depend on boxing to convert it to Long when you return it in getValue. Same in COUNT_STAR.accumulate

        Show
        Alan Gates added a comment - A test should be added that checks that when accumulator UDFs are mixed with non-accumulator UDFs it works properly. Why is the optimization not applied in the case that inner is set on POPackage? It seems the accumulator interface should still work in this case. Some comments on what AccumulatorOptimizer.check() is and what it allows would be helpful. The code contains tabs in some spots instead of 4 spaces. The cases in which the accumulator interface can be used has been greatly extended by adding the support for unary and binary operators. But this comes at a cost. Every binary and unary comparison now has to make the accumChild call. 99% of the time this will be false. It's not clear to me how often users will do things like: foreach C generate accumfunc1(A) + accumfunc2(A) OR foreach C generate (accumfunc1(A) > 100 ? 0 : 1) which is the only time I can see where this additional functionality is useful, since we don't currently allow these functions in filters. It's possible that JIT along with branch prediction will remove this extra cost, since the branch will always be one way or another for a given query. But I'd like to see this tested. It would be interesting to compare a query with heavy use of binary operators (but no accumulator UDFs) with and without this change. I don't understand why you need the new interface AccumulativeTupleBuffer and class AccumulativeBag. Why can't the block of tuples read off of the iterator just be put in a regular bag and then passed to the UDFs? In all the sum implementations of accumulate you calculate the sum of the block of tuples twice. It should be done once and cached. In COUNT.accumulate rather than making intermediateCount a Long and then forcing the creation of a new Long each time you add one you should instead keep it as a long and depend on boxing to convert it to Long when you return it in getValue. Same in COUNT_STAR.accumulate
        Hide
        Ying He added a comment -

        Without patch from PIG-1038, this patch won't compile. So all tests would fail.

        Show
        Ying He added a comment - Without patch from PIG-1038 , this patch won't compile. So all tests would fail.
        Hide
        Ying He added a comment -

        Without patch from PIG-1038, this patch won't compile. So all tests would fail.

        Show
        Ying He added a comment - Without patch from PIG-1038 , this patch won't compile. So all tests would fail.
        Hide
        Ying He added a comment -

        Without patch from PIG-1038, this patch won't compile. So all tests would fail.

        Show
        Ying He added a comment - Without patch from PIG-1038 , this patch won't compile. So all tests would fail.
        Hide
        Ying He added a comment -

        Without patch from PIG-1038, this patch won't compile. So all tests would fail.

        Show
        Ying He added a comment - Without patch from PIG-1038 , this patch won't compile. So all tests would fail.
        Hide
        Daniel Dai added a comment -

        This patch depends on PIG-1038. It is not directly patchable on its own. That's why tests are failed.

        Show
        Daniel Dai added a comment - This patch depends on PIG-1038 . It is not directly patchable on its own. That's why tests are failed.
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12424249/PIG-979.patch
        against trunk revision 833549.

        +1 @author. The patch does not contain any @author tags.

        +1 tests included. The patch appears to include 20 new or modified tests.

        +1 javadoc. The javadoc tool did not generate any warning messages.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        -1 findbugs. The patch appears to cause Findbugs to fail.

        +1 release audit. The applied patch does not increase the total number of release audit warnings.

        -1 core tests. The patch failed core unit tests.

        +1 contrib tests. The patch passed contrib unit tests.

        Test results: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/144/testReport/
        Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/144/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12424249/PIG-979.patch against trunk revision 833549. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 20 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to cause Findbugs to fail. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/144/testReport/ Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/144/console This message is automatically generated.
        Hide
        Ying He added a comment -

        Initial patch. Not fully tested yet. It depends on patch attached with PIG-1038.

        Show
        Ying He added a comment - Initial patch. Not fully tested yet. It depends on patch attached with PIG-1038 .
        Hide
        Alan Gates added a comment -

        Ciemo,

        In your comment above, you indicate you'd like functions like cumulative sum to be able to emit a value each time a record is added. But how does that work with something like:

        A = load 'bla';
        B = group A by $0;
        C = foreach B generate {
               D = order A by $1;
               generate CUMULATIVE_SUM(D.$2), SUM(D.$2);
        }
        

        SUM can't output a value until it's seen everything, but CUMULATIVE_SUM will have an output on every record. The way Pig's data model handles this with bags. The other possibility I can see is that Pig handles this as having an implicit flatten, so output from above would look like:

        1 10
        3 10
        6 10
        10 10

        Are you proposing that we create a way to streamline output of these types of functions to STORE (or DUMP) so that the bag never need be materialized? Or do you want a UDF type that takes a bag and produces multiple outputs along with an implicit flatten? Or are you suggesting a change in the data model?

        Show
        Alan Gates added a comment - Ciemo, In your comment above, you indicate you'd like functions like cumulative sum to be able to emit a value each time a record is added. But how does that work with something like: A = load 'bla'; B = group A by $0; C = foreach B generate { D = order A by $1; generate CUMULATIVE_SUM(D.$2), SUM(D.$2); } SUM can't output a value until it's seen everything, but CUMULATIVE_SUM will have an output on every record. The way Pig's data model handles this with bags. The other possibility I can see is that Pig handles this as having an implicit flatten, so output from above would look like: 1 10 3 10 6 10 10 10 Are you proposing that we create a way to streamline output of these types of functions to STORE (or DUMP) so that the bag never need be materialized? Or do you want a UDF type that takes a bag and produces multiple outputs along with an implicit flatten? Or are you suggesting a change in the data model?
        Hide
        Alan Gates added a comment -

        Jeff, thanks for the paper. I looked over it and I'm not certain it directly applies. They are measuring both the aggregation time (sort or hash) and how it is passed to the user defined aggregate (iterate or accumulate). Being in Hadoop we already have the aggregation done. So it's just a question of the fastest way to make the data available to the UDF. As I said above, we want to test the performance of this and prove its worth before we add it.

        As a general complaint, they used a fairly old revision of Pig code in their paper, even though it appears it was published in the last few months.

        Show
        Alan Gates added a comment - Jeff, thanks for the paper. I looked over it and I'm not certain it directly applies. They are measuring both the aggregation time (sort or hash) and how it is passed to the user defined aggregate (iterate or accumulate). Being in Hadoop we already have the aggregation done. So it's just a question of the fastest way to make the data available to the UDF. As I said above, we want to test the performance of this and prove its worth before we add it. As a general complaint, they used a fairly old revision of Pig code in their paper, even though it appears it was published in the last few months.
        Hide
        Jeff Hammerbacher added a comment -

        One could also cite the SOSP paper from MSR this year comparing the iterator to the accumulator interface, though I have a hard time concisely stating their conclusions: http://sigops.org/sosp/sosp09/papers/yu-sosp09.pdf.

        Show
        Jeff Hammerbacher added a comment - One could also cite the SOSP paper from MSR this year comparing the iterator to the accumulator interface, though I have a hard time concisely stating their conclusions: http://sigops.org/sosp/sosp09/papers/yu-sosp09.pdf .
        Hide
        David Ciemiewicz added a comment -

        This JIRA doesn't quite get the gist of why I believe the Accumulator interface is of interest. It isn't just about performance and avoiding retreading over the same data over and over again.

        It is also about providing an interface to support CUMMULATIVE_SUM, RANK, and other functions of it's ilk.

        A better code example for justifying this would be:

        A = load 'data' using PigStorage() as ( query: chararray, int: count );
        B = order A by count desc parallel 1;
        C = foreach B generate
                query,
                count,
                CUMULATIVE_SUM(count) as cumulative_count,
                RANK(count) as rank;
        

        These functions RANK and CUMULATIVE_SUM would have persistent state and yet would emit a value per value or tuple passed. Bags would not be appropriate as coded.

        Additionally, the reason for the Accumulator inteface is to avoid multiple passes over the same data:

        For instance, consider the example:

        A = load 'data' using PigStorage() as ( query: chararray, int: count );
        B = group A all;
        C = foreach B generate
                group,
                SUM(A.count),
                AVG(A.count),
                VAR(A.count),
                STDEV(A.count),
                MIN(A.count),
                MAX(A.count),
                MEDIAN(A.count);
        

        Repeatedly shuffling the same values just isn't an optimal way to process data.

        Show
        David Ciemiewicz added a comment - This JIRA doesn't quite get the gist of why I believe the Accumulator interface is of interest. It isn't just about performance and avoiding retreading over the same data over and over again. It is also about providing an interface to support CUMMULATIVE_SUM, RANK, and other functions of it's ilk. A better code example for justifying this would be: A = load 'data' using PigStorage() as ( query: chararray, int : count ); B = order A by count desc parallel 1; C = foreach B generate query, count, CUMULATIVE_SUM(count) as cumulative_count, RANK(count) as rank; These functions RANK and CUMULATIVE_SUM would have persistent state and yet would emit a value per value or tuple passed. Bags would not be appropriate as coded. Additionally, the reason for the Accumulator inteface is to avoid multiple passes over the same data: For instance, consider the example: A = load 'data' using PigStorage() as ( query: chararray, int : count ); B = group A all; C = foreach B generate group, SUM(A.count), AVG(A.count), VAR(A.count), STDEV(A.count), MIN(A.count), MAX(A.count), MEDIAN(A.count); Repeatedly shuffling the same values just isn't an optimal way to process data.
        Hide
        Alan Gates added a comment -

        Consider a Pig script like the following:

        A = load 'bla';
        B = group A by $0;
        C = foreach B {
            D = order A by $1;
            generate CUMMULATIVE_SUM(D);
        }
        

        Because the UDF needs to see this data in an ordered fashion, it cannot be done using Pig's Algebraic interface. But it
        does not need to see all the contents of the bag together.

        One way to address this is to add an Accumulator interface that UDFs could implement.

        interface Accumulator<T> {
        
            /**
             * Pass tuples to the UDF.  The passed in bag will contain only records from one
             * key.  It may not contain all the records for one key.  This function will
             * be called repeatedly until all records from one key are provided
             * to the UDF.
             * @param 1 or more tuples, all sharing the same key.
             */
            void accumulate(Bag b);
        
            /**
             * Called when all records from a key have been passed to accumulate.
             * @return the value for the UDF for this key.
             */
            T getValue();
        }
        

        In cases where all UDFs in a given foreach implement this accumulate interface, then Pig could choose to use this method to
        push records to the UDFs. Then it would not need to read all records from the Reduce iterator and cache them in memory or
        on disk.

        Before we commit to adding this new level of complexity to the langauge, we should performance test it. Given that we have
        recently made a change aimed at addressing Pig's problem of dying during large non-algebraic group bys (see PIG-975), this
        needs to perform significantly better than that to justify adding it.

        Show
        Alan Gates added a comment - Consider a Pig script like the following: A = load 'bla'; B = group A by $0; C = foreach B { D = order A by $1; generate CUMMULATIVE_SUM(D); } Because the UDF needs to see this data in an ordered fashion, it cannot be done using Pig's Algebraic interface. But it does not need to see all the contents of the bag together. One way to address this is to add an Accumulator interface that UDFs could implement. interface Accumulator<T> { /** * Pass tuples to the UDF. The passed in bag will contain only records from one * key. It may not contain all the records for one key. This function will * be called repeatedly until all records from one key are provided * to the UDF. * @param 1 or more tuples, all sharing the same key. */ void accumulate(Bag b); /** * Called when all records from a key have been passed to accumulate. * @ return the value for the UDF for this key. */ T getValue(); } In cases where all UDFs in a given foreach implement this accumulate interface, then Pig could choose to use this method to push records to the UDFs. Then it would not need to read all records from the Reduce iterator and cache them in memory or on disk. Before we commit to adding this new level of complexity to the langauge, we should performance test it. Given that we have recently made a change aimed at addressing Pig's problem of dying during large non-algebraic group bys (see PIG-975 ), this needs to perform significantly better than that to justify adding it.

          People

          • Assignee:
            Ying He
            Reporter:
            Alan Gates
          • Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development